“Spark 基础之 DataFrame 基本概念学习”实验报告

DataFrame 基本概念学习

Spark 基础之 DataFrame 基本概念学习

一、实验介绍

1.1 实验内容

本课程将对飞行准点率数据集,通过一些简单的分析任务来学习 DataFrame 的由来、构建方式以及一些常用操作。在本课程中,你可以了解到 Spark 生态体系中,核心的 RDD 与 DataFrame 之间的区别和联系。同时,你还可以学习到在 Spark 中加载数据集的方式、如何配置和使用第三方库等等。

1.2 实验知识点

DataFrame 的基本概念 DataFrame 的构建方式 DataFrame 的基本操作 在 Spark 中加载第三方库 ###1.3 实验环境

Spark 1.6.1 Xfce 终端

3.1 导入数据集

实验楼

四、实验步骤

4.1 在 Spark 中使用 DataFrame

启动 Spark Shell 并加载相关的包。

spark-shell --packages com.databricks:spark-csv_2.11:1.1.0
copy

在 modules in use 信息中,如果能看到如下三个包的名字,则说明之前的第三方模块的安装是成功的。 实验楼 使用 Spark 的一个 CSV 包来直接读取数据集。这样做能够直接将 CSV 文件读入为 DataFrame 。 在 Spark Shell 中输入下面的代码。

val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/home/shiyanlou/1987.csv")
// 此处的文件路径请根据实际情况修改
copy

尝试取出这个数据集中的前 5 个数据,看一下是否读入成功。 在 Spark Shell 中输入下面的代码。

df.take(5)
copy

实验楼

在引入数据集之后,下一步要做的便是确保我们所有数据的格式是正确的。可以通过下面的代码来检查其格式。

请在 Spark Shell 中输入下面的代码。

df.printSchema()
copy

得到的输出如下图所示: 实验楼 每一项都是 String 类型。在处理数据时多少有一点不方便。我们可以尝试将某些列转换成其它的类型。方法是提取出某列然后转换。例如: 在 Spark Shell 中输入下面的代码。

df.col("Year").cast("int")
copy

执行结果如图: 实验楼

当我们提取出某一列之后,又怎样将其放回去呢?下面我们提供一种 Spark 里改变 DataFrame 列类型的行之有效的办法。 在 Spark Shell 中输入下面的代码。

val df_1 = df.withColumnRenamed("Year","oldYear")
val df_2 = df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear")
copy

执行结果如图:(用具有新格式类型的列替换了旧的列) 实验楼 要对很多列都进行格式的转换时,用一个函数来转换列的格式类型 在 Spark Shell 中输入下面的代码。

// 这里的类型转换参数就可以由我们自己随意决定了
def convertColumn(df: org.apache.spark.sql.DataFrame, name:String, newType:String) = {
  val df_1 = df.withColumnRenamed(name, "swap")
  df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap")
}
copy

定义该函数的结果如下图所示: 实验楼

在 Spark Shell 中输入下面的代码调用该函数。

val df_3 = convertColumn(df_2, "ArrDelay", "int")
val df_4 = convertColumn(df_2, "DepDelay", "int")
copy

执行结果如图: 实验楼

如何对 DataFrame 进行计数操作

下面的代码演示了如何计算数据集中,每个航班的平均延迟时间。 在 Spark Shell 中输入下面的代码。

val averageDelays = df_4.groupBy(df_4.col("FlightNum")).agg(avg(df_4.col("ArrDelay")), avg(df_4.col("DepDelay")))
copy

执行结果如下图所示。 实验楼 (注,实验环境中df误打成de了,后续操作注意即可)

对于 averageDelays 这个临时结果,作为更加长远的考虑,你应该将数据集中一些重要的计算结果缓存下来,以加速后续的计算过程。如果你想知道为什么要这样做,可以查阅 Spark 的“懒加载”相关资料。

请输入下面的代码来缓存我们刚刚得到的数据。

averageDelays.cache()
copy

在缓存之后,我们如果要基于 averageDelays 做后续的计算,那么它的运算速度就会非常快了。 Spark 对于所有的变量几乎都是懒计算的,如果你不缓存的话,只有在执行行动操作(Action)时,它们才会被真正地计算。

下面我们来用一个行动操作看一下刚刚的计算结果。请输入这些代码。

averageDelays.show()
copy

可以看到由于数据量比较大,前台显示了运算过程分为多个 Stage ,只有调用了行动操作时,这些值才被真正地计算出来。计算结果如下图所示: 实验楼

对平均延迟时间进行排序,看哪个航班的平均延迟最低或者最高。下面的代码是按照升序排列平均延迟时间。

averageDelays.orderBy("AVG(ArrDelay)").show()
copy

实验楼

按照降序排列,则请输入下面的代码:

averageDelays.sort($"AVG(ArrDelay)".desc).show()
copy

实验楼

排列多个列的数据并显示结果。 请在 Spark Shell 中输入下面的代码。

averageDelays.sort($"AVG(ArrDelay)".desc, $"AVG(DepDelay)".desc).show()
copy

实验楼

五、实验总结

在本课程中,我们使用了 CSV 格式的数据集来创建 DataFrame 。如果是 JSON 格式的数据,则可以通过

val df = sqlCoontext.read.json(filePath)
copy

这样的形式来读取。 总而言之,DataFrame 不仅是 Spark 中的一种常用的数据载体,也是展开各种分析工作的基本对象。我们在课程中较为详细地讲述了它与弹性分布式数据集 RDD 之间的区别与联系,目的是为了让大家在合适的时候能够选用合适的数据类型去表达数据,最终来优化我们的分析流程。

扩展练习

降序排列ArrTime,AirTime的数据并显示结果。 请在 Spark Shell 中输入下面的代码。

//此处沿用了误写的de
val averageTime = de_4.groupBy(de_4.col("FlightNum")).agg(avg(de_4.col("ArrTime")), avg(de_4.col("AirTime")))

//缓存
averageTime.cache()

//降序排列ArrTime,AirTime
averageTime.sort($"AVG(ArrTime)".desc, $"AVG(AirTime)".desc).show()
copy

结果如下: 实验楼 可见数据中的AirTime缺失。

最新评论
暂无评论~