1.RDD 的聚合方法 aggregate
val rdd = List(1,2,3,4).toDS().rdd
val i = rdd.aggregate(3)(_ * _, _ + _)
println(s"i = $i") //output: i = 75
copy
2.DataFrame 中对列的聚合 agg
val df = List(1,2,3,4).toDF()
df.agg("value" -> "max").show() //output: 4
copy
column 是什么? column 实际上是一个字段相关的字符串,类似于 sql 中的字段特征,可以使用 + > === 与排序。 创建方法:
* df("columnName") // On a specific `df` DataFrame.
* col("columnName") // A generic column not yet associated with a DataFrame.
* col("columnName.field") // Extracting a struct field
* col("`a.column.with.dots`") // Escape `.` in column names.
* $"columnName"
copy
表达式运算
* $"a" + 1
* $"a" === $"b"
copy
示例
val ds = List(1,2,3,4).toDS()
ds.agg(avg(ds("value"))).show()
copy
完整示例:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
object DataFrameBasic {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("Simple Application").
master("local")
.getOrCreate()
val sc = spark.sparkContext
val df = spark
.read
.option("header", value = true)
.csv("/home/lg/Documents/data/1987.csv")
//字段基本类型转换(注:csv 默认字段都是String类型)
// 以下是一些惯用法 idiom
//①提取某个 column
val df_1 = df.withColumnRenamed("Year","oldYear")
val df_2 = df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear")
//②修改某个,保留原来的 columns
val df_3 = convertColumn(df_2, "ArrDelay", "int")
val df_4 = convertColumn(df_3, "DepDelay", "int")
df_4.show()
val averageDelays = df_4.groupBy(df_4.col("FlightNum"))
.agg(avg(df_4.col("ArrDelay")), avg(df_4.col("DepDelay")))
averageDelays.cache()
averageDelays
.orderBy(avg("ArrDelay").desc)
.show(5)
}
/**
* 修改列名与列数据类型
*
* @param df 初始 ddtaFrame
* @param name column 名字
* @param newType 新的 column 类型
* @return 修改过的 dataFrame
*/
def convertColumn(df: org.apache.spark.sql.DataFrame, name: String, newType: String): DataFrame = {
val df_tmp = df.withColumnRenamed(name, "swap")
df_tmp.withColumn(name, df_tmp.col("swap").cast(newType)).drop("swap")
}
}
copy
学习时间 0分钟
操作时间 0分钟
按键次数 0次
实验次数 6次
报告字数 2190字
是否完成 完成