“Spark 基础之 DataFrame 基本概念学习”实验报告

DataFrame 基本概念学习

1.RDD 的聚合方法 aggregate

val rdd = List(1,2,3,4).toDS().rdd
val i = rdd.aggregate(3)(_ * _, _ + _)
println(s"i = $i") //output: i = 75
copy

2.DataFrame 中对列的聚合 agg

val df = List(1,2,3,4).toDF()
df.agg("value" -> "max").show() //output: 4
copy

column 是什么? column 实际上是一个字段相关的字符串,类似于 sql 中的字段特征,可以使用 + > === 与排序。 创建方法:

 *   df("columnName")            // On a specific `df` DataFrame.
 *   col("columnName")           // A generic column not yet associated with a DataFrame.
 *   col("columnName.field")     // Extracting a struct field
 *   col("`a.column.with.dots`") // Escape `.` in column names.
 *   $"columnName" 
copy

表达式运算

 *   $"a" + 1
 *   $"a" === $"b"
copy

示例

val ds = List(1,2,3,4).toDS()
ds.agg(avg(ds("value"))).show()
copy

完整示例:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}

object DataFrameBasic {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("Simple Application").
      master("local")
      .getOrCreate()
    val sc = spark.sparkContext

    val df = spark
      .read
      .option("header", value = true)
      .csv("/home/lg/Documents/data/1987.csv")

    //字段基本类型转换(注:csv 默认字段都是String类型)
    // 以下是一些惯用法 idiom
    //①提取某个 column
    val df_1 = df.withColumnRenamed("Year","oldYear")
    val df_2 = df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear")
    //②修改某个,保留原来的 columns
    val df_3 = convertColumn(df_2, "ArrDelay", "int")
    val df_4 = convertColumn(df_3, "DepDelay", "int")
    df_4.show()
    val averageDelays = df_4.groupBy(df_4.col("FlightNum"))
      .agg(avg(df_4.col("ArrDelay")), avg(df_4.col("DepDelay")))
    averageDelays.cache()
    averageDelays
        .orderBy(avg("ArrDelay").desc)
      .show(5)

  }

  /**
   * 修改列名与列数据类型
   *
   * @param df      初始 ddtaFrame
   * @param name    column 名字
   * @param newType 新的 column 类型
   * @return 修改过的 dataFrame
   */
  def convertColumn(df: org.apache.spark.sql.DataFrame, name: String, newType: String): DataFrame = {
    val df_tmp = df.withColumnRenamed(name, "swap")
    df_tmp.withColumn(name, df_tmp.col("swap").cast(newType)).drop("swap")
  }
}
copy
最新评论
暂无评论~