SparkSQL之RDD转换DataFrame

文章目录

    • 1 、SparkSQL读取Json文件
    • 2、RDD转换成DataFrame
      • 2.1 用toDF()方式
      • 2.2 把原生RDD转换成RDD[Row],再和定义好的StructType匹配

1 、SparkSQL读取Json文件

先随便造两份Json格式数据。

[hadoop@vm01 data]$ vi stu1.json 
{"id":"1","name":"zhangsan","phone":"13721442689","email":"1@qq.com"}
{"id":"2","name":"lisi","phone":"13721442687","email":"2@qq.com"}
{"id":"3","name":"wangwu","phone":"13721442688","email":"3@qq.com"}
{"id":"4","name":"xiaoming","phone":"13721442686","email":"4@qq.com"}
{"id":"5","name":"xiaowang","phone":"13721442685","email":"5@qq.com"}
[hadoop@vm01 data]$ vi banji.json 
{"id":"1","banji":"601"}
{"id":"2","banji":"602"}
{"id":"3","banji":"603"}
{"id":"4","banji":"604"}
{"id":"5","banji":"605"}
[hadoop@vm01 bin]$  ./spark-shell \
--jars /home/hadoop/app/hive-1.1.0-cdh5.7.0/lib/mysql-connector-java-5.1.47.jarscala> val df=spark.read.json("file:///home/hadoop/data/stu1.json")
scala> df.show
+--------+---+--------+-----------+
|    emal| id|    name|      phone|
+--------+---+--------+-----------+
|1@qq.com|  1|zhangsan|13721442689|
|2@qq.com|  2|    lisi|13721442687|
|3@qq.com|  3|  wangwu|13721442688|
|4@qq.com|  4|xiaoming|13721442686|
|5@qq.com|  5|xiaowang|13721442685|
+--------+---+--------+-----------+#打印Schema信息
scala> df.printSchema
root|-- email: string (nullable = true)|-- id: string (nullable = true)|-- name: string (nullable = true)|-- phone: string (nullable = true)

如果是读Text文件,那么读取出来就是一行记录,只有一个String类型的字段

scala> val emp=spark.read.text("file:///home/hadoop/data/test.txt")
emp: org.apache.spark.sql.DataFrame = [value: string]scala> emp.show
+-----------+
|      value|
+-----------+
|hello spark|
|   hello mr|
| hello yarn|
| hello hive|
|hello spark|
+-----------+scala> emp.printSchema
root|-- value: string (nullable = true)

注册成临时表,用Select查询

scala> df.createOrReplaceTempView("people")
scala> spark.sql("select * from people").show
+--------+---+--------+-----------+
|   email| id|    name|      phone|
+--------+---+--------+-----------+
|1@qq.com|  1|zhangsan|13721442689|
|2@qq.com|  2|    lisi|13721442687|
|3@qq.com|  3|  wangwu|13721442688|
|4@qq.com|  4|xiaoming|13721442686|
|5@qq.com|  5|xiaowang|13721442685|
|5@qq.com|  5|    null|13721442685|
+--------+---+--------+-----------+scala> df.select("name").show 
+--------+
|    name|
+--------+
|zhangsan|
|    lisi|
|  wangwu|
|xiaoming|
|xiaowang|
|    null|
+--------+

第二种写法,需要导入隐式转换

scala> import spark.implicits._
scala> df.select($"name").showscala> df.filter(df("id")>2).show
+--------+---+--------+-----------+
|   email| id|    name|      phone|
+--------+---+--------+-----------+
|3@qq.com|  3|  wangwu|13721442688|
|4@qq.com|  4|xiaoming|13721442686|
|5@qq.com|  5|xiaowang|13721442685|
+--------+---+--------+-----------+

2、RDD转换成DataFrame

先随便造两份数据

[hadoop@vm01 data]$ cat stu1.txt
1|zhangsan|13721442689|1@qq.com
2|lisi|13721442687|2@qq.com
3|wangwu|13721442688|3@qq.com
4|xiaoming|13721442686|4@qq.com
5|xiaowang|13721442685|5@qq.com
[hadoop@vm01 data]$ cat banji.txt
1|601
2|602
3|603
4|604
5|605

2.1 用toDF()方式

先case class定义好一个表的字段和类型

package com.ruozedata.spark
import org.apache.spark.sql.SparkSessionobject SparkSessionApp {def main(args: Array[String]): Unit = {val spark=SparkSession.builder().master("local[2]").appName("SparkSeesionApp").getOrCreate()//需要导入隐式转换import spark.implicits._  // 要用两个反斜杠转移val stu1=spark.sparkContext.textFile("file:///home/hadoop/data/stu1.txt").map(_.split("\\|")).map(x=>Student(x(0),x(1),x(2),x(3))).toDF() //最后一步,RDD转成DataFramestu1.show(50,false)stu1.printSchema()stu1.take(3).foreach(println)stu1.first()stu1.head(3)stu1.filter("id>2").show()stu1.sort($"name".desc,$"id".asc).show()stu1.select($"phone".as("mobile")).show()val stu2=spark.sparkContext.textFile("file:///home/hadoop/data/banji.txt").map(_.split("\\|")).map(x=>BanJi(x(0),x(1))).toDF()//stu1.join(stu2).show(false)  //没有关联条件,笛卡尔积stu1.join(stu2,stu1("id")===stu2("id"),"outer").show(false)spark.stop()}case class Student(id:String,name:String,phone:String,email:String)case class BanJi(id:String,rid:String)
}

然后在spark-shell里面验证整个过程

scala> import spark.implicits._
import spark.implicits._scala>   case class Student(id:String,name:String,phone:String,email:String)
defined class Student
scala>   case class BanJi(id:String,rid:String)
defined class BanJiscala> val stu1=spark.sparkContext.textFile("file:///home/hadoop/data/stu1.txt").map(_.split("\\|")).map(x=>Student(x(0),x(1),x(2),x(3))).toDF()
stu1: org.apache.spark.sql.DataFrame = [id: string, name: string ... 2 more fields]scala> stu1.show(false)
+---+--------+-----------+--------+
|id |name    |phone      |email   |
+---+--------+-----------+--------+
|1  |zhangsan|13721442689|1@qq.com|
|2  |lisi    |13721442687|2@qq.com|
|3  |wangwu  |13721442688|3@qq.com|
|4  |xiaoming|13721442686|4@qq.com|
|5  |xiaowang|13721442685|5@qq.com|
+---+--------+-----------+--------+scala> stu1.printSchema()
root|-- id: string (nullable = true)|-- name: string (nullable = true)|-- phone: string (nullable = true)|-- email: string (nullable = true)scala> stu1.take(3).foreach(println)
[1,zhangsan,13721442689,1@qq.com]
[2,lisi,13721442687,2@qq.com]
[3,wangwu,13721442688,3@qq.com]scala> stu1.first()
res36: org.apache.spark.sql.Row = [1,zhangsan,13721442689,1@qq.com]scala> stu1.head(3)
res37: Array[org.apache.spark.sql.Row] = Array([1,zhangsan,13721442689,1@qq.com], [2,lisi,13721442687,2@qq.com], [3,wangwu,13721442688,3@qq.com])scala> stu1.filter("id>2").show()
+---+--------+-----------+--------+
| id|    name|      phone|   email|
+---+--------+-----------+--------+
|  3|  wangwu|13721442688|3@qq.com|
|  4|xiaoming|13721442686|4@qq.com|
|  5|xiaowang|13721442685|5@qq.com|
+---+--------+-----------+--------+scala> stu1.sort($"name".desc,$"id".asc).show()
+---+--------+-----------+--------+
| id|    name|      phone|   email|
+---+--------+-----------+--------+
|  1|zhangsan|13721442689|1@qq.com|
|  5|xiaowang|13721442685|5@qq.com|
|  4|xiaoming|13721442686|4@qq.com|
|  3|  wangwu|13721442688|3@qq.com|
|  2|    lisi|13721442687|2@qq.com|
+---+--------+-----------+--------+scala> stu1.select($"phone".as("mobile")).show()
+-----------+
|     mobile|
+-----------+
|13721442689|
|13721442687|
|13721442688|
|13721442686|
|13721442685|
+-----------+scala> val stu2=spark.sparkContext.textFile("file:///home/hadoop/data/banji.txt").map(_.split("\\|")).map(x=>BanJi(x(0),x(1))).toDF()
stu2: org.apache.spark.sql.DataFrame = [id: string, rid: string]scala> stu1.join(stu2,stu1("id")===stu2("id"),"outer").show(false)
+---+--------+-----------+--------+---+---+                                     
|id |name    |phone      |email   |id |rid|
+---+--------+-----------+--------+---+---+
|3  |wangwu  |13721442688|3@qq.com|3  |603|
|5  |xiaowang|13721442685|5@qq.com|5  |605|
|1  |zhangsan|13721442689|1@qq.com|1  |601|
|4  |xiaoming|13721442686|4@qq.com|4  |604|
|2  |lisi    |13721442687|2@qq.com|2  |602|
+---+--------+-----------+--------+---+---+

2.2 把原生RDD转换成RDD[Row],再和定义好的StructType匹配

package com.ruozedata.sparkimport org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}object SparkSessionApp2 {def main(args: Array[String]): Unit = {val spark=SparkSession.builder().master("local[2]").appName("SparkSeesionApp").getOrCreate()//把原生RDD转成RDD[Row]val rdd=spark.sparkContext.textFile("file:///home/hadoop/data/stu1.txt").map(_.split("\\|")).map(x=>Row(x(0),x(1),x(2),x(3)))val rdd2=spark.sparkContext.textFile("file:///home/hadoop/data/banji.txt").map(_.split("\\|")).map(x=>Row(x(0),x(1)))//创建structTypeval structType=StructType(Array(StructField("id",StringType,true),StructField("name",StringType,true),StructField("phone",StringType,true),StructField("email",StringType,true)))val structType2=StructType(Array(StructField("id",StringType,true),StructField("rid",StringType,true)))//把RDD和structType匹配val df=spark.createDataFrame(rdd,structType)val df2=spark.createDataFrame(rdd2,structType2)import spark.implicits._df.show(50,false)df.printSchema()df.take(3).foreach(println)df.first()df.head(3)df.filter("id>2").show()df.sort($"name".desc,$"id".asc).show()df.select($"phone".as("mobile")).show()//joindf.join(df2,df("id")===df2("id"),"outer").show(false)spark.stop()}
}

然后用spark-shell验证,验证之前要先导入包,结果和toDF()是一样的。

import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部