本课程将对飞行准点率数据集,通过一些简单的分析任务来学习 DataFrame 的由来、构建方式以及一些常用操作。在本课程中,你可以了解到 Spark 生态体系中,核心的 RDD 与 DataFrame 之间的区别和联系。同时,你还可以学习到在 Spark 中加载数据集的方式、如何配置和使用第三方库等等。
DataFrame 的基本概念 DataFrame 的构建方式 DataFrame 的基本操作 在 Spark 中加载第三方库 ###1.3 实验环境
Spark 1.6.1 Xfce 终端
启动 Spark Shell 并加载相关的包。
spark-shell --packages com.databricks:spark-csv_2.11:1.1.0
copy
在 modules in use 信息中,如果能看到如下三个包的名字,则说明之前的第三方模块的安装是成功的。
使用 Spark 的一个 CSV 包来直接读取数据集。这样做能够直接将 CSV 文件读入为 DataFrame 。
在 Spark Shell 中输入下面的代码。
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/home/shiyanlou/1987.csv")
// 此处的文件路径请根据实际情况修改
copy
尝试取出这个数据集中的前 5 个数据,看一下是否读入成功。 在 Spark Shell 中输入下面的代码。
df.take(5)
copy
在引入数据集之后,下一步要做的便是确保我们所有数据的格式是正确的。可以通过下面的代码来检查其格式。
请在 Spark Shell 中输入下面的代码。
df.printSchema()
copy
得到的输出如下图所示:
每一项都是 String 类型。在处理数据时多少有一点不方便。我们可以尝试将某些列转换成其它的类型。方法是提取出某列然后转换。例如:
在 Spark Shell 中输入下面的代码。
df.col("Year").cast("int")
copy
执行结果如图:
当我们提取出某一列之后,又怎样将其放回去呢?下面我们提供一种 Spark 里改变 DataFrame 列类型的行之有效的办法。 在 Spark Shell 中输入下面的代码。
val df_1 = df.withColumnRenamed("Year","oldYear")
val df_2 = df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear")
copy
执行结果如图:(用具有新格式类型的列替换了旧的列)
要对很多列都进行格式的转换时,用一个函数来转换列的格式类型
在 Spark Shell 中输入下面的代码。
// 这里的类型转换参数就可以由我们自己随意决定了
def convertColumn(df: org.apache.spark.sql.DataFrame, name:String, newType:String) = {
val df_1 = df.withColumnRenamed(name, "swap")
df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap")
}
copy
定义该函数的结果如下图所示:
在 Spark Shell 中输入下面的代码调用该函数。
val df_3 = convertColumn(df_2, "ArrDelay", "int")
val df_4 = convertColumn(df_2, "DepDelay", "int")
copy
执行结果如图:
下面的代码演示了如何计算数据集中,每个航班的平均延迟时间。 在 Spark Shell 中输入下面的代码。
val averageDelays = df_4.groupBy(df_4.col("FlightNum")).agg(avg(df_4.col("ArrDelay")), avg(df_4.col("DepDelay")))
copy
执行结果如下图所示。
(注,实验环境中df误打成de了,后续操作注意即可)
对于 averageDelays 这个临时结果,作为更加长远的考虑,你应该将数据集中一些重要的计算结果缓存下来,以加速后续的计算过程。如果你想知道为什么要这样做,可以查阅 Spark 的“懒加载”相关资料。
请输入下面的代码来缓存我们刚刚得到的数据。
averageDelays.cache()
copy
在缓存之后,我们如果要基于 averageDelays 做后续的计算,那么它的运算速度就会非常快了。 Spark 对于所有的变量几乎都是懒计算的,如果你不缓存的话,只有在执行行动操作(Action)时,它们才会被真正地计算。
下面我们来用一个行动操作看一下刚刚的计算结果。请输入这些代码。
averageDelays.show()
copy
可以看到由于数据量比较大,前台显示了运算过程分为多个 Stage ,只有调用了行动操作时,这些值才被真正地计算出来。计算结果如下图所示:
对平均延迟时间进行排序,看哪个航班的平均延迟最低或者最高。下面的代码是按照升序排列平均延迟时间。
averageDelays.orderBy("AVG(ArrDelay)").show()
copy
按照降序排列,则请输入下面的代码:
averageDelays.sort($"AVG(ArrDelay)".desc).show()
copy
排列多个列的数据并显示结果。 请在 Spark Shell 中输入下面的代码。
averageDelays.sort($"AVG(ArrDelay)".desc, $"AVG(DepDelay)".desc).show()
copy
在本课程中,我们使用了 CSV 格式的数据集来创建 DataFrame 。如果是 JSON 格式的数据,则可以通过
val df = sqlCoontext.read.json(filePath)
copy
这样的形式来读取。 总而言之,DataFrame 不仅是 Spark 中的一种常用的数据载体,也是展开各种分析工作的基本对象。我们在课程中较为详细地讲述了它与弹性分布式数据集 RDD 之间的区别与联系,目的是为了让大家在合适的时候能够选用合适的数据类型去表达数据,最终来优化我们的分析流程。
降序排列ArrTime,AirTime的数据并显示结果。 请在 Spark Shell 中输入下面的代码。
//此处沿用了误写的de
val averageTime = de_4.groupBy(de_4.col("FlightNum")).agg(avg(de_4.col("ArrTime")), avg(de_4.col("AirTime")))
//缓存
averageTime.cache()
//降序排列ArrTime,AirTime
averageTime.sort($"AVG(ArrTime)".desc, $"AVG(AirTime)".desc).show()
copy
结果如下:
可见数据中的AirTime缺失。
学习时间 393分钟
操作时间 131分钟
按键次数 2251次
实验次数 6次
报告字数 4271字
是否完成 完成