Spark DataSet 和 DataFrame
简介
DataSet 是一个分布式数据的集合。
DataFrame 则是按照列名进行整理后的 DataSet,在概念上更贴近于传统关系型数据库中的表或是 R/Python 中的 DataFrame 但在底层进行了更丰富的优化。
简单使用
DataFrame
读取数据
从 parquet 导入 DataFrame
1
| val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
|
从 json 导入 DataFrame
1
| val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
|
从 csv 导入 DataFrame
1 2 3 4 5
| val peopleDFCsv = spark.read.format("csv") .option("sep", ";") .option("inferSchema", "true") .option("header", "true") .load("examples/src/main/resources/people.csv")
|
从 orc 导入 DataFrame
1
| val parDF=spark.read.orc("/tmp/orc/data.orc/gender=M")
|
从 rdd 导入 DataFrame
1
| val rdd = spark.sparkContext.makeRDD(List(1,2,3))
|
从 DataSet 转为 DataFrame
从 Hive 读取数据
1
| val df = spark.read().table("person");
|
从 JDBC 读取数据
1 2 3 4 5 6 7
| val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .load()
|
输出
写入 Parquet
1
| peopleDF.write.parquet("people.parquet")
|
写入 ORC
1 2 3 4 5
| usersDF.write.format("orc") .option("orc.bloom.filter.columns", "favorite_color") .option("orc.dictionary.key.threshold", "1.0") .option("orc.column.encoding.direct", "name") .save("users_with_options.orc")
|
写入 JSON
1
| allDF.write.json("src/main/other_resources/all_json_file.json")
|
写入 CSV
1
| df.write.format("csv").save("/tmp/spark_output/datacsv")
|
写入文本文件
写入 Hive 表
1
| df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
|
写入 JDBC 链接的数据库
1 2 3 4 5 6 7
| jdbcDF.write .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save()
|
写入 Avro
1
| df.write.format("avro").save("namesAndFavColors.avro")
|
DataSet
从其他源转换
从集合创建
1 2 3
| case class Person(name: String, age: Long) val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show()
|
从 DataFrame 创建
1 2
| case class Person(name: String, age: Long) val ds: DataSet[Person] = df.as[User]
|
从 RDD 创建
1 2 3 4 5 6
| case class Person(name: String, age: Long) val ds: DataSet[Person] = rdd.map => { case (name,age) => { Person(name,age) } }.toDS()
|
参考资料
官方文档
样例教程