实验、DataFrame基本概念学习
一、简介 1、内容:1)DataFrame由来、构建方式、常用操作; 2)核心RDD与DataFrame之间区别和联系。 Spark加载数据集方式、配置和使用第三方库等; 3)分析飞行准点率数据集。
2、知识点:DataFrame基本概念、构建方式、基本操作、加载第三方库。
copy
二、实验原理
1、DataFrame
1.1)是什么?
是一种跨语言、通用的数据科学抽象,DataFrame通过实验现实世界中的数据集,涵盖许多最基本的概念和操作。毫不夸张地说,仅用极少资金甚至是以免费的方式,通过市场获取到一些科研界最新最权威的数据。
1.2)解决什么问题?
Spark里已有最常用的RDD,从宏观来说,DataFrame是为了帮助建立Spark生态系统。 DataFrame是RDD基础核心的一种扩展。对于数据科学家门来说,DataFrame能较好地从R语言或Python来进行转换。因此最好将其理解为一种结构化的RDD,能帮助你处理数据而不用花太多精力在数据的各种转换中。这也是为什么DataFrame能与结构化数据(甚至非结构化数据)建立密切联系,通过它,能够利用Spark引擎带来的类型和优化上的长处。
DataFrame是一个按照指定列来组织的分布式数据集合,可以通过Parquet、Hive、MySQL、PostgreSQL、JSON、S3和HDFS等多种数据源来构建。DataFrame常用于SparkSQL中,如果是在这样一个数据库系统中,你可以将其理解为其中的一个表。
虽然传统平面RDD支持上述数据源格式,但在Python Pandas中,它们被集成在一起,成为DataFrame。这是最早起源,在MLLib中,它们与机器去学习的流水线API集成在一起。这样做的意义在于能让你读取一些数据、指定要使用的操作和高效率地训练你的模型。当然,引擎背后,也有许多优化能让代码执行更快更高效。简言之,DataFrame可由下面公式给出:
DataFrame = RDD + 模式&&优化
它们除了之前各种接口,还应该遵守一些RDD具有的规则,如之前提到的转换和行动范式。因此,DataFrame就是一种以RDD为基础,带有模式信息(Schema)的分布式数据集。DataFrame比RDD更好的一点在于具有额外的SQL接口,从而让查询高度结构化的数据成为可能。
我们无法得知RDD内部数据元素的结构,但可以通过Spark SQL来得知DataFrame中更为结构信息。综上所述,Spark中的DataFrame发展过程中,其目标成为事实上的大数据数据框,适用于各种分布式大数据的应用场景。
2、开发准备
2.1)su -l hadoop # 密码hadoop
2.2)数据集
2009. Data expo - Airline on-time performance-- http://stat-computing.org/dataexpo/2009/1987.csv.bz2
wget http://labfile.oss.aliyuncs.com/courses/536/1987.csv.bz2
bunzip2 1987.csv.bz2
# 格式 含义
copy
2.3)下载支持库(实验楼会员用户具有连接外网权限,可跳过此步)
wget http://labfile.oss.aliyuncs.com/courses/610/spark_csv.tar
tar -xvf file -C /home/shiyanlou/.ivy2/jars/ #待验证,确保下面目录含以下三个jar包
copy
3、实验步骤
3.1)启动Spark Shell
spark-shell --packages com.databricks:spark-csv_2.11:1.1.0
copy
3.2)启动完毕,使用Spark的CSV包直接读取数据集,直接将CSV文件读入DataFrame
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/home/hadoop/1987.csv")
// 此处的文件路径请根据实际情况修改
// JSON格式可用 val df = sqlContext.read.json(filePath) 形式读取
3.3)取出前5个数据,看是否读入成功
df.take(5)
copy
3.4)检查格式
df.printSchema()
3.5)每一项均为String类型,处理数据多少有点不方便,尝试将某些列转为其他类型。 方法:提取出某列后转换
df.col("Year").cast("int")
val df_1 = df.withColumnRenamed("Year","oldYear")
val df_2 = df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear")
copy
3.6)定义转换函数,转换列格式类型
// 这里的类型转换参数就可以由我们自己随意决定了
def convertColumn(df: org.apache.spark.sql.DataFrame, name:String, newType:String) = {
val df_1 = df.withColumnRenamed(name, "swap")
df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap")
}
copy
val df_3 = convertColumn(df_2, "ArrDelay", "int")
val df_4 = convertColumn(df_2, "DepDelay", "int")
copy
3.7)演示:如何计算数据集中,每个航班的平均延迟时间
3.7.1)val averageDelays = df_4.groupBy(df_4.col("FlightNum")).agg(avg(df_4.col("ArrDelay")), avg(df_4.col("DepDelay")))
copy
3.7.2)缓存刚得到的数据,有利于后续的快速运算。因为Spark对于所有变量都是懒计算的,不缓存的话,只有执行行动操作(Action)时,才会被真正计算:
averageDelays.cache()
copy
3.7.3)显示
averageDelays.show()
copy
averageDelays.orderBy("AVG(ArrDelay)").show()
copy
averageDelays.sort($"AVG(ArrDelay)".desc).show()
copy
averageDelays.sort($"AVG(ArrDelay)".desc, $"AVG(DepDelay)".desc).show()
copy
三、总结 本课程主要学习了DataFrame的由来、与RDD的区别&&联系。然后是实际分析案例的各个详细步骤:启动Spark、读入数据集、转换数据格式、计算数据集中平均分析时间(各种转换操作【Action】需要进一步理解与使用)、显示。 通过实验可以看到如果想真正掌握,还需要好好看官方文档,真正在各种数据集上联系各种分析操作。
简言之,DataFrame是Spark的一种常用数据载体,也是展开各种数据分析工作的基本对象。本课程较为详细地讲述了与RDD之间的区别与联系,目的是为了在合适时能够选用合适的数据类型去表达数据。最终优化我们的分析流程。
copy
四、参考资料 1、Spark SQL, DataFrames and Datasets Guide: Spark官方指导手册,涵盖DataFrame操作、与RDD相互转换等知识点,十分推荐阅读。 链接:http://spark.apache.org/docs/latest/sql-programming-guide.html#dataframe-operations
2、DataFrame API Documentation:官方API手册(1.6.1版本)。列举了所有与DataFrame相关操作,包括DataFrame类中成员变量和成员方法。适合开发时作为参考。
链接:http://spark.apache.org/docs/1.6.1/api/scala/index.html#org.apache.spark.sql.DataFrame
copy
学习时间 270分钟
操作时间 78分钟
按键次数 3144次
实验次数 9次
报告字数 6004字
是否完成 完成