Spark3.x入门到精通-阶段四(SparkSql详解javascala实战)

Spark SQL简介

Spark SQL 是 Spark 中的一个子模块,主要用于操作结构化数据。它具有以下特点:

  • 能够将 SQL 查询与 Spark 程序无缝混合,允许您使用 SQL 或 DataFrame API 对结构化数据进行查询;
  • 支持多种开发语言;
  • 支持多达上百种的外部数据源,包括 Hive,Avro,Parquet,ORC,JSON 和 JDBC 等;
  • 支持 HiveQL 语法以及 Hive SerDes 和 UDF,允许你访问现有的 Hive 仓库;
  • 支持标准的 JDBC 和 ODBC 连接;
  • 支持优化器,列式存储和代码生成等特性;
  • 支持扩展并能保证容错。

DataFrame 

为了支持结构化数据的处理,Spark SQL 提供了新的数据结构 DataFrame。DataFrame 是一个由具名列组成的数据集。它在概念上等同于关系数据库中的表或 R/Python 语言中的 data frame。 由于 Spark SQL 支持多种语言的开发,所以每种语言都定义了 DataFrame 的抽象,主要如下:

语言主要抽象
ScalaDataset[T] & DataFrame (Dataset[Row] 的别名)
JavaDataset[T]
PythonDataFrame
R

DataFrame

DataFrame 对比 RDDs 

DataFrame 和 RDDs 最主要的区别在于一个面向的是结构化数据,一个面向的是非结构化数据,它们内部的数据结构如下:

DataFrame 内部的有明确 Scheme 结构,即列名、列字段类型都是已知的,这带来的好处是可以减少数据读取以及更好地优化执行计划,从而保证查询效率。

DataFrame 和 RDDs 应该如何选择?

  • 如果你想使用函数式编程而不是 DataFrame API,则使用 RDDs;
  • 如果你的数据是非结构化的 (比如流媒体或者字符流),则使用 RDDs,
  • 如果你的数据是结构化的 (如 RDBMS 中的数据) 或者半结构化的 (如日志),出于性能上的考虑,应优先使用 DataFrame。

DataFrame的使用

准备数据person.json

{"id": "1","name": "zhangsan","sorce": 90}
{"id": "2","name": "lishi","sorce": 100}
{"id": "3","name": "wanwu","sorce": 800}

 上传到hdfs

hadoop dfs -put person.json /

下面是知道把数据变成什么类型的时候可以使用下面的方式

java版本

public class JavaDataFrameStart {public static void main(String[] args) {SparkConf sparkConf = new SparkConf();sparkConf.setAppName("JavaDataFrameStart").setMaster("local");SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example").config(sparkConf).getOrCreate();Dataset json = spark.read().json("hdfs://master:8020/person.json");json.show();spark.close();}
}

打包上传到服务器

bin/spark-submit --class com.zhang.one.JavaDataFrameStart --master yarn /home/bigdata/shell/original-sparkstart-1.0-SNAPSHOT.jar

scala版本

object ScalaDataFrame {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("ScalaDataFrame")conf.setMaster("local")val spark = SparkSession.builder().appName("Spark SQL basic example").config(conf).getOrCreate()val dataFrame: DataFrame = spark.read.json("data/person.json")dataFrame.show()//打印元数据信息dataFrame.printSchema()//查询指定的列dataFrame.select(dataFrame.col("name")).show()//对于分数加一操作dataFrame.select(dataFrame.col("sorce").plus(1)).show()dataFrame.groupBy(dataFrame.col("id")).count().show()spark.close()}
}

RDD和DataFrame互相装换

准备数据person.txt

1 zhangsan 90
2 lishi 100
3 wanwu 20

java版本

public class JavaStudent implements Serializable {private Integer id;private String name;private Integer score;public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getScore() {return score;}public void setScore(Integer score) {this.score = score;}@Overridepublic String toString() {return "JavaStudent{" +"id=" + id +", name='" + name + '\'' +", score=" + score +'}';}
}

操作的代码

public class JavaRDDToDataframe {public static void main(String[] args) throws AnalysisException {SparkConf sparkConf = new SparkConf();sparkConf.setAppName("JavaRDDToDataframe").setMaster("local[2]");JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);//创建sparksql上下文SQLContext sqlContext = new SQLContext(javaSparkContext);JavaRDD personRDD = javaSparkContext.textFile("data/person.txt");JavaRDD studentJavaRDD = personRDD.map(new Function() {@Overridepublic JavaStudent call(String lineText) throws Exception {String[] initLine = lineText.split(" ");JavaStudent javaStudent = new JavaStudent();javaStudent.setId(Integer.valueOf(initLine[0]));javaStudent.setName(initLine[1]);javaStudent.setScore(Integer.valueOf(initLine[2]));return javaStudent;}});//将RDD转化成DataFrame,这里是通过反射的方式,返回的RDD数据要和对应类的属性名对应Dataset personDataFrame = sqlContext.createDataFrame(studentJavaRDD, JavaStudent.class);//对于DataFrame数据创建一张临时表personDataFrame.createTempView("student");//对于创建的表执行sql操作sqlContext.sql("select * from student").show();//将DataFrame装换成RDDJavaRDD resultRDD = personDataFrame.javaRDD().map(new Function() {@Overridepublic JavaStudent call(Row rowData) throws Exception {JavaStudent javaStudent = new JavaStudent();javaStudent.setId(rowData.getInt(0));javaStudent.setName(rowData.getString(1));javaStudent.setScore(rowData.getInt(2)+100);return javaStudent;}});resultRDD.foreach(new VoidFunction() {@Overridepublic void call(JavaStudent javaStudent) throws Exception {System.out.println(javaStudent);}});javaSparkContext.close();}
}

执行后的结果

+---+--------+-----+
| id|    name|score|
+---+--------+-----+
|  1|zhangsan|   90|
|  2|   lishi|  100|
|  3|   wanwu|   20|
+---+--------+-----+
JavaStudent{id=1, name='zhangsan', score=190}
JavaStudent{id=2, name='lishi', score=200}
JavaStudent{id=3, name='wanwu', score=120}

scala版本

object ScalaRDDToDataFrame {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[2]").setAppName("ScalaRDDToDataFrame")val session: SparkSession = SparkSession.builder().config(conf).getOrCreate()val lineRDD: RDD[String] = session.sparkContext.textFile("data/person.txt")//引入隐私装换(这里的session就是什么创建的session:SparkSession的变量)import session.implicits._//RDD装换成DataFrameval personDataFrame: DataFrame = lineRDD.map(_.split(" ")).map(item => {Student(item(0).toInt, item(1).trim, item(2).toInt)}).toDF()//创建一个临时视图personDataFrame.createTempView("person")val filterData: DataFrame = session.sql("select * from person where score >= 100")//把过滤出来的数据变成RDD处理val rdd: RDD[Row] = filterData.rdd//后面多次使用缓存下rdd.cache()//下面是根据ROW的位置得到数据rdd.map(row=>{Student(row.getInt(0),row.getString(1),row.getInt(2))}).foreach(println)//下面可以根据指定的列得到数据rdd.map(row=>{Student(row.getAs[Int]("id"),row.getAs[String]("name"),row.getAs[Int]("score"))}).foreach(println)session.stop()}
}case class Student(val id:Int,val name:String,val score:Int)

这里的装换是动态数据装换(就是不知道要返回的是什么类的时候)

java版本

public class JavaRDDToDataFrameMethodTow {public static void main(String[] args) throws AnalysisException {SparkConf sparkConf = new SparkConf();sparkConf.setMaster("local").setAppName("JavaRDDToDataFrameMethodTow");JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);SQLContext sqlContext = new SQLContext(javaSparkContext);JavaRDD personInitRDD = javaSparkContext.textFile("data/person.txt");JavaRDD personRow = personInitRDD.map(new Function() {@Overridepublic Row call(String lineText) throws Exception {String[] strings = lineText.split(" ");//使用动态类型构建的时候,要保证返回的数据类型和后面的structFields类型一致return RowFactory.create(Integer.valueOf(strings[0]), strings[1], Integer.valueOf(strings[2]));}});//对于上面返回的RDD匹配字段元数据ArrayList structFields = new ArrayList<>();structFields.add(DataTypes.createStructField("id",DataTypes.IntegerType,true));structFields.add(DataTypes.createStructField("name",DataTypes.StringType,true));structFields.add(DataTypes.createStructField("score",DataTypes.IntegerType,true));StructType structType = DataTypes.createStructType(structFields);//构建RDD和structType的关系得到dataFrame,注意Dataset的别名就是DataFrameDataset dataFrame = sqlContext.createDataFrame(personRow, structType);dataFrame.createTempView("person");//        sqlContext.sql("select * from person").show();Dataset targetDataFrame = sqlContext.sql("select * from person where score >=100");//dataFrame装换成RDDJavaRDD rowJavaRDD = targetDataFrame.javaRDD();rowJavaRDD.foreach(new VoidFunction() {@Overridepublic void call(Row row) throws Exception {System.out.println(row);}});javaSparkContext.close();}
}

scala版本

object ScalaRDDDataFrame {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("ScalaRDDDataFrame").setMaster("local")val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()//第一步得到数据val initDataRDD: RDD[String] = sparkSession.sparkContext.textFile("data/person.txt")//第二步装换成ROWval initDataRowRDD: RDD[Row] = initDataRDD.map(line => {val strings: Array[String] = line.split(" ")Row(Integer.valueOf(strings(0)), strings(1), Integer.valueOf(strings(2)))})//第三步建立对应的元数据信息val dataTypes=StructType(//这里的元数据类型和上面的数据要对应Array(StructField("id",DataTypes.IntegerType,true),StructField("name",DataTypes.StringType,true),StructField("score",DataTypes.IntegerType,true),))//第四步,得到DataFrameval dataFrame: DataFrame = sparkSession.createDataFrame(initDataRowRDD, dataTypes)dataFrame.createTempView("person")val filterDataFrame: DataFrame = sparkSession.sql("select * from person where score>=100")filterDataFrame.rdd.foreach(println)sparkSession.stop()}
}

SparkSql外部数据源

读数据格式

所有读取 API 遵循以下调用格式:

// 格式
DataFrameReader.format(...).option("key", "value").schema(...).load()// 示例
spark.read.format("csv")
.option("mode", "FAILFAST")          // 读取模式
.option("inferSchema", "true")       // 是否自动推断 schema
.option("path", "path/to/file(s)")   // 文件路径
.schema(someSchema)                  // 使用预定义的 schema      
.load()

读取模式有以下三种可选项:

读模式描述
permissive当遇到损坏的记录时,将其所有字段设置为 null,并将所有损坏的记录放在名为 _corruption t_record 的字符串列中
dropMalformed删除格式不正确的行
failFast遇到格式不正确的数据时立即失败

写数据格式

// 格式
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()//示例
dataframe.write.format("csv")
.option("mode", "OVERWRITE")         //写模式
.option("dateFormat", "yyyy-MM-dd")  //日期格式
.option("path", "path/to/file(s)")
.save()

写数据模式有以下四种可选项:

Scala/Java描述
SaveMode.ErrorIfExists如果给定的路径已经存在文件,则抛出异常,这是写数据默认的模式
SaveMode.Append数据以追加的方式写入
SaveMode.Overwrite数据以覆盖的方式写入
SaveMode.Ignore如果给定的路径已经存在文件,则不做任何操作

CSV

自动推断类型读取读取示例:

spark.read.format("csv")
.option("header", "false")        // 文件中的第一行是否为列的名称
.option("mode", "FAILFAST")      // 是否快速失败
.option("inferSchema", "true")   // 是否自动推断 schema
.load("/usr/file/csv/dept.csv")
.show()

使用预定义类型:

import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType}
//预定义数据格式
val myManualSchema = new StructType(Array(StructField("deptno", LongType, nullable = false),StructField("dname", StringType,nullable = true),StructField("loc", StringType,nullable = true)
))
spark.read.format("csv")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("/usr/file/csv/dept.csv")
.show()

 写入CSV文件

df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")

也可以指定具体的分隔符:

df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2")

DataFrame的load和save

可以实现不同数据之间的转换

java版本

public class JavaJsonParquet {public static void main(String[] args) {SparkConf sparkConf = new SparkConf();sparkConf.setMaster("local").setAppName("JavaJsonParquet");JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);SQLContext sqlContext = new SQLContext(javaSparkContext);//加载指定的Json文件直接得到RowDataset jsonDataFrame = sqlContext.read().format("json").load("data/person.json");jsonDataFrame.show();//得到的数据保存为parquetjsonDataFrame.write().format("parquet").save("data/person.parquet");javaSparkContext.close();}
}

scala版本

object ScalaJsonParquet {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local").setAppName("ScalaJsonParquet")val session: SparkSession = SparkSession.builder().config(conf).getOrCreate()session.read.format("json").load("data/person.json").write.format("parquet").save("data/parquet")session.stop()}
}

加入写入模式的操作

java版本

public class JavaJsonParquet {public static void main(String[] args) {SparkConf sparkConf = new SparkConf();sparkConf.setMaster("local").setAppName("JavaJsonParquet");JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);SQLContext sqlContext = new SQLContext(javaSparkContext);//加载指定的Json文件直接得到RowDataset jsonDataFrame = sqlContext.read().format("json").load("data/person.json");jsonDataFrame.show();//得到的数据保存为parquet,下面是使用追加的方式jsonDataFrame.write().mode(SaveMode.Append).format("parquet").save("data/person.parquet");javaSparkContext.close();}
}

scala版本

object ScalaJsonParquet {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[1]").setAppName("ScalaJsonParquet")val session: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//使用追加的方式写入session.read.format("json").load("data/person.json").write.mode(SaveMode.Append).format("parquet").save("data/parquet")session.stop()}
}

json和parquet基本操作

由于比较简单这里就使用scala了

object ScalaReadParquet {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("ScalaReadParquet").setMaster("local")val session: SparkSession = SparkSession.builder().config(conf).getOrCreate()session.read.parquet("data/person.parquet").show()session.read.json("data/person.json").show()session.stop()}
}

读取JSON文件

spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)

需要注意的是:默认不支持一条数据记录跨越多行 (如下),可以通过配置 multiLine 为 true 来进行更改,其默认值为 false

// 默认支持单行
{"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"}//默认不支持多行
{"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"
}

 写入JSON文件

df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")

两种获取json数据执行join操作 

准备数据person.json

{"id": "1","name": "zhangsan","courceid": 1}
{"id": "2","name": "lishi","courceid": 2}
{"id": "3","name": "wanwu","courceid": 1}

java版本

public class JavaJsonJoin {public static void main(String[] args) throws AnalysisException {SparkConf sparkConf = new SparkConf();sparkConf.setMaster("local").setAppName("JavaJsonJoin");JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);SQLContext sqlContext = new SQLContext(javaSparkContext);//得到PersonDataFrameDataset personDataFrame = sqlContext.read().json("data/person.json");personDataFrame.createTempView("person");//里面保存的是json字符串ArrayList courseJson = new ArrayList<>();courseJson.add("{\"id\":\"1\",\"score\":\"90\"}");courseJson.add("{\"id\":\"2\",\"score\":\"190\"}");courseJson.add("{\"id\":\"3\",\"score\":\"190\"}");JavaRDD courceRDD = javaSparkContext.parallelize(courseJson);//根据JavaRDD得到DataFrameDataset courceDataFrame = sqlContext.read().json(courceRDD);courceDataFrame.createTempView("cource");//join两张表sqlContext.sql("select * from person left join cource on person.courceid=cource.id").show();javaSparkContext.close();}
}

得到的结果是

+--------+---+--------+---+-----+
|courceid| id|    name| id|score|
+--------+---+--------+---+-----+
|       1|  1|zhangsan|  1|   90|
|       1|  3|   wanwu|  1|   90|
|       2|  2|   lishi|  2|  190|
+--------+---+--------+---+-----+

scala版本

object ScalaJsonJoin {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local").setAppName("ScalaJsonJoin")val session: SparkSession = SparkSession.builder().config(conf).getOrCreate()//得到person的临时表session.read.json("data/person.json").createTempView("person")val courceJsonString: Array[String] = Array("{\"id\":\"1\",\"score\":\"90\"}","{\"id\":\"2\",\"score\":\"190\"}","{\"id\":\"3\",\"score\":\"190\"}")//得到cource的临时表val courceRDD: RDD[String] = session.sparkContext.parallelize(courceJsonString)session.read.json(courceRDD).createTempView("cource")session.sql("select * from person left join cource on person.courceid=cource.id").show()session.close()}
}

输出结果

+--------+---+--------+---+-----+
|courceid| id|    name| id|score|
+--------+---+--------+---+-----+
|       1|  1|zhangsan|  1|   90|
|       1|  3|   wanwu|  1|   90|
|       2|  2|   lishi|  2|  190|
+--------+---+--------+---+-----+

Hbase案例

pojo

@Data
public class VirtualPojo {private String ORDER_ID;private String STARTING_LNG;private String STARTING_LAT;}

HbaseUtil

public class HbaseUtil {private static Connection connection = null;public HbaseUtil(){if(connection==null||connection.isClosed()){Configuration config = HBaseConfiguration.create();config.set("hbase.zookeeper.quorum", "hadoop102");try {connection=ConnectionFactory.createConnection(config);} catch (IOException e) {e.printStackTrace();}}}public Connection getConnection(){return connection;}/*** 创建表* @param tableName* @param columeFamily*/public void createTable(String tableName,String columeFamily){try {TableName tableName1 = TableName.valueOf(tableName);Admin admin = connection.getAdmin();if (admin.tableExists(tableName1)) {admin.disableTable(tableName1);admin.deleteTable(tableName1);}HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName1);HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(columeFamily);hTableDescriptor.addFamily(hColumnDescriptor);admin.createTable(hTableDescriptor);} catch (IOException e) {e.printStackTrace();}}/*** 生成put* @param rowKey* @param columeFamily* @param dataColumeAndValue* @return*/public Put createPut(String rowKey, String columeFamily, Map dataColumeAndValue){Put put = new Put(rowKey.getBytes());Set> entries = dataColumeAndValue.entrySet();for (Map.Entry entry : entries) {put.addColumn(columeFamily.getBytes(),entry.getKey().getBytes(),entry.getValue().getBytes());}return put;}public Table getTable(String tableName){Table table = null;try {table = connection.getTable(TableName.valueOf(tableName));} catch (IOException e) {e.printStackTrace();}return table;}public  List getRest(String tableName,Class clazz) throws Exception {Table table = null;Scan scanner = null;List restList = null;try {restList = new ArrayList<>();table = getTable(tableName);scanner = new Scan();try (ResultScanner rs = table.getScanner(scanner)) {for (Result r : rs) {//反射形成对象//获取字节码对象类的所有字段属性Field[] fields = clazz.getDeclaredFields();T t = clazz.newInstance();NavigableMap familyMap = r.getFamilyMap("info".getBytes());for (Map.Entry entry : familyMap.entrySet()) {String colName = Bytes.toString(entry.getKey());String colValue = Bytes.toString(entry.getValue());for (Field field : fields) {String fieldName = field.getName();field.setAccessible(true);if (fieldName.equalsIgnoreCase(colName)) {String fieldType = field.getType().toString();if (fieldType.equalsIgnoreCase("class java.lang.String")) {field.set(t, colValue);} else if (fieldType.equalsIgnoreCase("class java.lang.Integer")){field.set(t, Integer.parseInt(colValue));} else if (fieldType.equalsIgnoreCase("class java.lang.Long")){field.set(t, Long.parseLong(colValue));} else if (fieldType.equalsIgnoreCase("class java.lang.Double")) {field.set(t, Double.parseDouble(colValue));} else if (fieldType.equalsIgnoreCase("class java.util.Date")) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");field.set(t, sdf.format(new Date(Long.parseLong(colValue + "000"))));} else if (fieldType.equalsIgnoreCase("java.lang.Boolean")) {field.set(t, Boolean.parseBoolean(colValue));} else {field.set(t, null);}}}}restList.add(t);}}} catch (Exception e) {e.printStackTrace();}return restList;}public static void main(String[] args) throws IOException {HbaseUtil hbaseUtil = new HbaseUtil();try {List rest = hbaseUtil.getRest("hai_kou_virtual_parkp", VirtualPojo.class);for (VirtualPojo virtualPojo : rest) {System.out.println(virtualPojo);}} catch (Exception e) {e.printStackTrace();}}/*** 根据表名开始时间和结束时间查询轨迹** @param tableName* @param orderId* @param startTimestampe* @param endTimestampe* @return*/public  List getRest(String tableName, String orderId,String startTimestampe, String endTimestampe, Class clazz) throws Exception {Table table = null;Scan scanner = null;List restList = null;try {restList = new ArrayList<>();table = getTable(tableName);String startRowKey = orderId + "_" + startTimestampe;String endRowKey = orderId + "_" + endTimestampe;scanner = new Scan();scanner.setStartRow(startRowKey.getBytes());scanner.setStopRow(endRowKey.getBytes());try (ResultScanner rs = table.getScanner(scanner)) {for (Result r : rs) {//反射形成对象//获取字节码对象类的所有字段属性Field[] fields = clazz.getDeclaredFields();T t = clazz.newInstance();NavigableMap familyMap = r.getFamilyMap(Constants.DEFAULT_FAMILY.getBytes());for (Map.Entry entry : familyMap.entrySet()) {String colName = Bytes.toString(entry.getKey());String colValue = Bytes.toString(entry.getValue());for (Field field : fields) {String fieldName = field.getName();field.setAccessible(true);if (fieldName.equalsIgnoreCase(colName)) {String fieldType = field.getType().toString();if (fieldType.equalsIgnoreCase("class java.lang.String")) {field.set(t, colValue);} else if (fieldType.equalsIgnoreCase("class java.lang.Integer")){field.set(t, Integer.parseInt(colValue));} else if (fieldType.equalsIgnoreCase("class java.lang.Long")){field.set(t, Long.parseLong(colValue));} else if (fieldType.equalsIgnoreCase("class java.lang.Double")) {field.set(t, Double.parseDouble(colValue));} else if (fieldType.equalsIgnoreCase("class java.util.Date")) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");field.set(t, sdf.format(new Date(Long.parseLong(colValue + "000"))));} else if (fieldType.equalsIgnoreCase("java.lang.Boolean")) {field.set(t, Boolean.parseBoolean(colValue));} else {field.set(t, null);}}}}restList.add(t);}}} catch (Exception e) {e.printStackTrace();}return restList;}
}

SparkAnalyHbase

object SparkAnalyHbase {private val core: H3Core = H3Core.newInstance()def uberh3(lat: Double, lng: Double, res: Int): String = {core.geoToH3Address(lat, lng, res)}def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName(this.getClass.getSimpleName).setMaster("local")//读取hbase的时候要有网络传输所以要序列化.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))val sparlSql: SparkSession = SparkSession.builder().config(conf).getOrCreate()val sparkContext: SparkContext = sparlSql.sparkContext//开始处理hbase,是根据hfile来读取分析val scanner = new Scanval hbaseConfig: Configuration = HBaseConfiguration.create//ORDER_ID String,CITY_ID String,STARTING_LNG String,STARTING_LAT Stringscanner.addFamily("info".getBytes)scanner.addColumn("info".getBytes, "ORDER_ID".getBytes)scanner.addColumn("info".getBytes, "CITY_ID".getBytes)scanner.addColumn("info".getBytes, "STARTING_LNG".getBytes)scanner.addColumn("info".getBytes, "STARTING_LAT".getBytes)import org.apache.hadoop.hbase.protobuf.ProtobufUtilval proto = ProtobufUtil.toScan(scanner)val scanToString = Base64.encodeBytes(proto.toByteArray)import org.apache.hadoop.hbase.mapreduce.TableInputFormat//设置hbase表hbaseConfig.set(TableInputFormat.INPUT_TABLE, "hai_kou_order_topic")hbaseConfig.set(TableInputFormat.SCAN, scanToString)hbaseConfig.set("hbase.zookeeper.quorum", "hadoop102")val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sparkContext.newAPIHadoopRDD(hbaseConfig, classOf[TableInputFormat],classOf[ImmutableBytesWritable], classOf[Result])//得到的结果装换成RDDval orderRDD: RDD[Row] = hbaseRDD.mapPartitions(iterator => {val newItems = new ListBuffer[Row]()var row: Row = null;while (iterator.hasNext) {val result = iterator.next()._2row = Row.fromSeq(Seq(new String(result.getValue("info".getBytes, "ORDER_ID".getBytes)),new String(result.getValue("info".getBytes, "CITY_ID".getBytes)),new String(result.getValue("info".getBytes, ("STARTING_LNG".getBytes))),new String(result.getValue("info".getBytes, "STARTING_LAT".getBytes))))newItems.append(row)}Iterator(newItems)}).flatMap(item => item)//field里面指定的类型要和Row里面的类型保持一致val fields = Array(new StructField("ORDER_ID", StringType),new StructField("CITY_ID", StringType),new StructField("STARTING_LNG", StringType),new StructField("STARTING_LAT", StringType))val structType = new StructType(fields)val orderDataFrame: DataFrame = sparlSql.createDataFrame(orderRDD, structType)//这里获取到了数据//    orderDataFrame.show()//这里使用自定义函数sparlSql.udf.register("locationToH3", uberh3 _)orderDataFrame.createTempView("order")//这里根据地理坐标得到Uberh3val selectOrderSql ="""|select ORDER_ID,CITY_ID,STARTING_LNG,STARTING_LAT|,locationToH3(STARTING_LAT,STARTING_LNG,12) uberh3address| from order|""".stripMarginval orderInfoAndUberH3Address: DataFrame = sparlSql.sql(selectOrderSql)orderInfoAndUberH3Address.createTempView("orderInfoAndUberH3Address")//选出Uberh3索引里面大于10的作为虚拟车站import org.apache.spark.sql.functions._import sparlSql.implicits._val groupUberh3: Dataset[Row] = orderInfoAndUberH3Address.groupBy("uberh3address").count().filter("count>3")groupUberh3.createTempView("groupuberh3")//这里使用得到的uberh3得到订单的坐标val virtualPark: Dataset[Row] = sparlSql.sql(s"""|select|ORDER_ID,|CITY_ID,|STARTING_LNG,|STARTING_LAT,|row_number() over(partition by orderInfoAndUberH3Address.uberh3address order by STARTING_LNG,STARTING_LAT asc) rn|from orderInfoAndUberH3Address join groupuberh3 on orderInfoAndUberH3Address.uberh3address = groupuberh3.uberh3address|""".stripMargin).filter("rn<=1")//这里就是得到的虚拟车站的信息val resultShema = new StructType(Array(StructField("ORDER_ID", StringType, true),StructField("STARTING_LNG", StringType, true),StructField("STARTING_LAT", StringType, true)))val resultDataFrame: DataFrame = virtualPark.select("ORDER_ID", "STARTING_LNG", "STARTING_LAT")//保存数据val hbConf = HBaseConfiguration.createhbConf.set("hbase.zookeeper.quorum", "hadoop102")hbConf.set("hbase.zookeeper.property.clientPort", "2181")//这里是要输出到的表hbConf.set(TableOutputFormat.OUTPUT_TABLE, "hai_kou_virtual_parkp")val job = new Job(hbConf);//ImmutableBytesWritable理解为hbase表中的rowkey(ImmutableBytesWritable,Result)job.setOutputKeyClass(classOf[ImmutableBytesWritable])job.setOutputValueClass(classOf[Result])job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])val target: RDD[(ImmutableBytesWritable, Put)] = resultDataFrame.rdd.mapPartitions(iterator => {val newItems = new ListBuffer[(ImmutableBytesWritable, Put)]()while (iterator.hasNext) {val next: Row = iterator.next()val put = new Put(next.getString(0).getBytes)//1.设置订单IDput.addColumn("info".getBytes,"ORDER_ID".getBytes,next.getAs[String]("ORDER_ID").getBytes)//2.设置经度put.addColumn("info".getBytes, "STARTING_LNG".getBytes,next.getAs[String]("STARTING_LNG").getBytes)//3.设置维度put.addColumn("info".getBytes, "STARTING_LAT".getBytes,next.getAs[String]("STARTING_LAT").getBytes)//6.添加到集合中newItems.append((new ImmutableBytesWritable, put))}Iterator(newItems)}).flatMap(item => item)//    这里得到自己写的hbase工具类创建表val hba = new HbaseUtilhba.createTable( "hai_kou_virtual_parkp","info")target.saveAsNewAPIHadoopDataset(job.getConfiguration)sparkContext.stop()sparlSql.stop()sparlSql.close()}
}

SQL Databases

Spark 同样支持与传统的关系型数据库进行数据读写。但是 Spark 程序默认是没有提供数据库驱动的,所以在使用前需要将对应的数据库驱动上传到安装目录下的 jars 目录中。下面示例使用的是 Mysql 数据库,使用前需要将对应的 mysql-connector-java-x.x.x.jar 上传到 jars 目录下。

读取数据

读取全表数据示例如下,这里的 help_keyword 是 mysql 内置的字典表,只有 help_keyword_id 和 name 两个字段。

spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")            //驱动
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")   //数据库地址
.option("dbtable", "help_keyword")                    //表名
.option("user", "root").option("password","root").load().show(10)

从查询结果读取数据:

val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"""
spark.read.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root").option("password", "root")
.option("dbtable", pushDownQuery)
.load().show()//输出
+---------------+-----------+
|help_keyword_id|       name|
+---------------+-----------+
|              0|         <>|
|              1|     ACTION|
|              2|        ADD|
|              3|AES_DECRYPT|
|              4|AES_ENCRYPT|
|              5|      AFTER|
|              6|    AGAINST|
|              7|  AGGREGATE|
|              8|  ALGORITHM|
|              9|        ALL|
|             10|      ALTER|
|             11|    ANALYSE|
|             12|    ANALYZE|
|             13|        AND|
|             14|    ARCHIVE|
|             15|       AREA|
|             16|         AS|
|             17|   ASBINARY|
|             18|        ASC|
|             19|     ASTEXT|
+---------------+-----------+

也可以使用如下的写法进行数据的过滤:

val props = new java.util.Properties
props.setProperty("driver", "com.mysql.jdbc.Driver")
props.setProperty("user", "root")
props.setProperty("password", "root")
val predicates = Array("help_keyword_id < 10  OR name = 'WHEN'")   //指定数据过滤条件
spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql", "help_keyword", predicates, props).show() //输出:
+---------------+-----------+
|help_keyword_id|       name|
+---------------+-----------+
|              0|         <>|
|              1|     ACTION|
|              2|        ADD|
|              3|AES_DECRYPT|
|              4|AES_ENCRYPT|
|              5|      AFTER|
|              6|    AGAINST|
|              7|  AGGREGATE|
|              8|  ALGORITHM|
|              9|        ALL|
|            604|       WHEN|
+---------------+-----------+

可以使用 numPartitions 指定读取数据的并行度:

option("numPartitions", 10)

在这里,除了可以指定分区外,还可以设置上界和下界,任何小于下界的值都会被分配在第一个分区中,任何大于上界的值都会被分配在最后一个分区中。

val colName = "help_keyword_id"   //用于判断上下界的列
val lowerBound = 300L    //下界
val upperBound = 500L    //上界
val numPartitions = 10   //分区综述
val jdbcDf = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql","help_keyword",colName,lowerBound,upperBound,numPartitions,props)

想要验证分区内容,可以使用 mapPartitionsWithIndex 这个算子,代码如下:

jdbcDf.rdd.mapPartitionsWithIndex((index, iterator) => {val buffer = new ListBuffer[String]while (iterator.hasNext) {buffer.append(index + "分区:" + iterator.next())}buffer.toIterator
}).foreach(println)

执行结果如下:help_keyword 这张表只有 600 条左右的数据,本来数据应该均匀分布在 10 个分区,但是 0 分区里面却有 319 条数据,这是因为设置了下限,所有小于 300 的数据都会被限制在第一个分区,即 0 分区。同理所有大于 500 的数据被分配在 9 分区,即最后一个分区。

 写入数据

val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("user", "root").option("password", "root")
.option("dbtable", "emp")
.save()

数据读写高级特性

并行读

多个 Executors 不能同时读取同一个文件,但它们可以同时读取不同的文件。这意味着当您从一个包含多个文件的文件夹中读取数据时,这些文件中的每一个都将成为 DataFrame 中的一个分区,并由可用的 Executors 并行读取。

并行写

写入的文件或数据的数量取决于写入数据时 DataFrame 拥有的分区数量。默认情况下,每个数据分区写一个文件。

分区写入

分区和分桶这两个概念和 Hive 中分区表和分桶表是一致的。都是将数据按照一定规则进行拆分存储。需要注意的是 partitionBy 指定的分区和 RDD 中分区不是一个概念:这里的分区表现为输出目录的子目录,数据分别存储在对应的子目录中。

val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")

输出结果如下:可以看到输出被按照部门编号分为三个子目录,子目录中才是对应的输出文件。

分桶写入

分桶写入就是将数据按照指定的列和桶数进行散列,目前分桶写入只支持保存为表,实际上这就是 Hive 的分桶表。

val numberBuckets = 10
val columnToBucketBy = "empno"
df.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")

文件大小管理

如果写入产生小文件数量过多,这时会产生大量的元数据开销。Spark 和 HDFS 一样,都不能很好的处理这个问题,这被称为“small file problem”。同时数据文件也不能过大,否则在查询时会有不必要的性能开销,因此要把文件大小控制在一个合理的范围内。

在上文我们已经介绍过可以通过分区数量来控制生成文件的数量,从而间接控制文件大小。Spark 2.2 引入了一种新的方法,以更自动化的方式控制文件大小,这就是 maxRecordsPerFile 参数,它允许你通过控制写入文件的记录数来控制文件大小。

 // Spark 将确保文件最多包含 5000 条记录
df.write.option(“maxRecordsPerFile”, 5000)

可选配置附录

CSV读写可选配置

读\写操作配置项可选值默认值描述
Bothseq任意字符,(逗号)分隔符
Bothheadertrue, falsefalse文件中的第一行是否为列的名称。
Readescape任意字符\转义字符
ReadinferSchematrue, falsefalse是否自动推断列类型
ReadignoreLeadingWhiteSpacetrue, falsefalse是否跳过值前面的空格
BothignoreTrailingWhiteSpacetrue, falsefalse是否跳过值后面的空格
BothnullValue任意字符“”声明文件中哪个字符表示空值
BothnanValue任意字符NaN声明哪个值表示 NaN 或者缺省值
BothpositiveInf任意字符Inf正无穷
BothnegativeInf任意字符-Inf负无穷
Bothcompression or codecNone,
uncompressed,
bzip2, deflate,
gzip, lz4, or
snappy
none文件压缩格式
BothdateFormat任何能转换为 Java 的
SimpleDataFormat 的字符串
yyyy-MM-dd日期格式
BothtimestampFormat任何能转换为 Java 的
SimpleDataFormat 的字符串
yyyy-MMdd’T’HH:mm:ss.SSSZZ时间戳格式
ReadmaxColumns任意整数20480声明文件中的最大列数
ReadmaxCharsPerColumn任意整数1000000声明一个列中的最大字符数。
ReadescapeQuotestrue, falsetrue是否应该转义行中的引号。
ReadmaxMalformedLogPerPartition任意整数10声明每个分区中最多允许多少条格式错误的数据,超过这个值后格式错误的数据将不会被读取
WritequoteAlltrue, falsefalse指定是否应该将所有值都括在引号中,而不只是转义具有引号字符的值。
ReadmultiLinetrue, falsefalse是否允许每条完整记录跨域多行

JSON读写可选配置 

读\写操作配置项可选值默认值
Bothcompression or codecNone,
uncompressed,
bzip2, deflate,
gzip, lz4, or
snappy
none
BothdateFormat任何能转换为 Java 的 SimpleDataFormat 的字符串yyyy-MM-dd
BothtimestampFormat任何能转换为 Java 的 SimpleDataFormat 的字符串yyyy-MMdd’T’HH:mm:ss.SSSZZ
ReadprimitiveAsStringtrue, falsefalse
ReadallowCommentstrue, falsefalse
ReadallowUnquotedFieldNamestrue, falsefalse
ReadallowSingleQuotestrue, falsetrue
ReadallowNumericLeadingZerostrue, falsefalse
ReadallowBackslashEscapingAnyCharactertrue, falsefalse
ReadcolumnNameOfCorruptRecordtrue, falseValue of spark.sql.column&NameOf
ReadmultiLinetrue, falsefalse

数据库读写可选配置 

属性名称含义
url数据库地址
dbtable表名称
driver数据库驱动
partitionColumn,
lowerBound, upperBoun
分区总数,上界,下界
numPartitions可用于表读写并行性的最大分区数。如果要写的分区数量超过这个限制,那么可以调用 coalesce(numpartition) 重置分区数。
fetchsize每次往返要获取多少行数据。此选项仅适用于读取数据。
batchsize每次往返插入多少行数据,这个选项只适用于写入数据。默认值是 1000。
isolationLevel事务隔离级别:可以是 NONE,READ_COMMITTED, READ_UNCOMMITTED,REPEATABLE_READ 或 SERIALIZABLE,即标准事务隔离级别。
默认值是 READ_UNCOMMITTED。这个选项只适用于数据读取。
createTableOptions写入数据时自定义创建表的相关配置
createTableColumnTypes写入数据时自定义创建列的列类型

SparkSql内置函数

简单聚合

数据准备

{"id": "1","name": "zhangsan","courceid": 1}
{"id": "2","name": "lishi","courceid": 2}
{"id": "3","name": "wanwu","courceid": 1}

读取文件

object ScalaHanshu {def main(args: Array[String]): Unit = {val c = new SparkConf()c.setAppName("ScalaHanshu").setMaster("local[2]")//使用函数之前要引入隐私装换import org.apache.spark.sql.functions._val session: SparkSession = SparkSession.builder().config(c).getOrCreate()session.read.json("data/person.json").show()session.close()}
}

count

session.read.json("data/person.json").select(count("name")).show()

countDistinct 

session.read.json("data/person.json").select(countDistinct("courceid")).show()

approx_count_distinct

通常在使用大型数据集时,你可能关注的只是近似值而不是准确值,这时可以使用 approx_count_distinct 函数,并可以使用第二个参数指定最大允许误差。

session.read.json("data/person.json").select(approx_count_distinct ("courceid",0.1)).show()

first & last

获取 DataFrame 中指定列的第一个值或者最后一个值。

session.read.json("data/person.json").select(first("courceid"),last("courceid")).show()

min & max

获取 DataFrame 中指定列的最小值或者最大值。

empDF.select(min("sal"),max("sal")).show()

sum & sumDistinct

求和以及求指定列所有不相同的值的和。

empDF.select(sum("sal")).show()
empDF.select(sumDistinct("sal")).show()

avg

内置的求平均数的函数。

empDF.select(avg("sal")).show()

数学函数

Spark SQL 中还支持多种数学聚合函数,用于通常的数学计算,以下是一些常用的例子:

// 1.计算总体方差、均方差、总体标准差、样本标准差
empDF.select(var_pop("sal"), var_samp("sal"), stddev_pop("sal"), stddev_samp("sal")).show()// 2.计算偏度和峰度
empDF.select(skewness("sal"), kurtosis("sal")).show()// 3. 计算两列的皮尔逊相关系数、样本协方差、总体协方差。
empDF.select(corr("empno", "sal"), covar_samp("empno", "sal"),covar_pop("empno", "sal")).show()

聚合数据到集合

empDF.agg(collect_set("job"), collect_list("ename")).show()

分组聚合

简单分组

empDF.groupBy("deptno", "job").count().show()
//等价 SQL
spark.sql("SELECT deptno, job, count(*) FROM emp GROUP BY deptno, job").show()

分组聚合

empDF.groupBy("deptno").agg(count("ename").alias("人数"), sum("sal").alias("总工资")).show()
// 等价语法
empDF.groupBy("deptno").agg("ename"->"count","sal"->"sum").show()
// 等价 SQL
spark.sql("SELECT deptno, count(ename) ,sum(sal) FROM emp GROUP BY deptno").show()

自定义函数

UDF(单行输入一个输出)

object ScalaHanshu {//定义一个函数def myudf(name:String):Int={name.length}def main(args: Array[String]): Unit = {val c = new SparkConf()c.setAppName("ScalaHanshu").setMaster("local[2]")import org.apache.spark.sql.functions._val session: SparkSession = SparkSession.builder().config(c).getOrCreate()session.read.json("data/person.json").createTempView("person")//注册函数//第一种方式函数已经定义出来了,后面就是函数名加一个空格然后一个下划线session.udf.register("myudf",myudf _)//第二种就是直接定义一个匿名内部类session.udf.register("myudf2",(name:String)=>name.length)session.sql("select name,myudf(name),myudf2(name) from person").show()session.close()}
}

UDAF(多个输入一个输出)

聚合函数

上面是一个好理解的图,我们自己定义一个求总数的聚合函数

object MyUDAF extends UserDefinedAggregateFunction {//这里是输入数据,定义它输入数据的类型override def inputSchema: StructType = {StructType(Array(StructField("", StringType)))}//这里是中间缓存区的类型override def bufferSchema: StructType = {StructType(Array(StructField("", LongType)))}//这里是输出数据的类型override def dataType: DataType = DoubleType//  此函数是否始终在相同输入上返回相同的输出,通常为 trueoverride def deterministic: Boolean = true//缓冲区的初始化操作override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0)=0L}//同一个分区的reduce操作,buffer是缓存区的数据,input表示迭代过来的值override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {if (!input.isNullAt(0)) {//由于我们定义的是count的作用,所以每来一条是数据那么就加一buffer(0) = buffer.getLong(0) + 1L}}//不同分区的数据进行聚合操作override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0)=buffer1.getLong(0) + buffer2.getLong(0)}//计算最终的输出值,这里就是中间的结果怎么结算得到最后输出的结果override def evaluate(buffer: Row): Any = {buffer.getLong(0).toDouble}
}

 使用

object ScalaHanshu {//定义一个函数def myudf(name:String):Int={name.length}def main(args: Array[String]): Unit = {val c = new SparkConf()c.setAppName("ScalaHanshu").setMaster("local[2]")import org.apache.spark.sql.functions._val session: SparkSession = SparkSession.builder().config(c).getOrCreate()session.read.json("data/person.json").createTempView("person")session.udf.register("mycount", MyUDAF)session.sql("select mycount(courceid) as result from person group by courceid").show()session.close()}
}

SparkSql原理

案例实战(每日TOP2热点搜索词统计)

数据格式

日期 用户 搜索词 城市 平台 版本

2015-10-01 leo barbecue beijing android 1.0
2015-10-01 leo barbecue beijing android 1.0
2015-10-01 tom barbecue beijing android 11.0
2015-10-01 jack barbecue beijing android 1.0
2015-10-01 marry barbecue beijing android 1.0
2015-10-01 tom seafood beijing android 1.2
2015-10-01 leo seafood beijing android 1.2
2015-10-01 jack seafood beijing android 1.2
2015-10-01 jack seafood beijing android 1.2
2015-10-01 marytoy beijing android 1.5
2015-10-01 leo toy beijing android 1.5
2015-10-01 leo toy beijing android 1.5
2015-10-01 jack water beijing android 2.0
2015-10-01 whitebarbecue nanjing iphone 2.0
2015-10-02 white seafood beijing android 1.0
2015-10-02 leo seafood beijing android 1.0
2015-10-02 may seafood beijing android 1.0
2015-10-02 tom seafood beijing android 1.0
2015-10-02 jack seafood beijing android 1.0
2015-10-02 jack seafood beijing android 1.0
2015-10-02 tom water beijing android 1.2
2015-10-02 1e0 beijing android 1.2
2015-10-02 jack water beijing android 1.2
2015-10-02 jack water beijing android 1.2
2015-10-02 leo barbecue beijing android 1.5
2015-10-02 marry barbecue beijing android 1.5
2015-10-02 marry barbecue beijing android 1.5
2015-10-02 jack toy beijin android 2.0
2015-10-02 white tour nanjing iphone 2.0

需求

  • 筛选出符合条件(城市,平台,版本)的数据
  • 统计每天的uv排名前3的搜索词
  • 按照每天的top2搜索词搜索总次数,倒叙排序

java实现代码

public class JavaUV {public static void main(String[] args) {SparkConf sparkConf = new SparkConf();sparkConf.setMaster("local").setAppName("sparkConf");JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);SQLContext sqlContext = new SQLContext(javaSparkContext);//第一步得到数据JavaRDD uvInitRDD = javaSparkContext.textFile("data/uvdata.txt");//这里定义查询条件,比方这里用城市,手机型号,版本作为条件HashMap> queryParams = new HashMap<>();queryParams.put("city", Arrays.asList("beijing"));queryParams.put("phone", Arrays.asList("android"));queryParams.put("version", Arrays.asList("1.0", "1.2", "1.5"));//由于查询的数据数量不太大,但是每一个分区的数据都要过滤,那我们就把他做成广播变量Broadcast>> queryParamBroadcast = javaSparkContext.broadcast(queryParams);//第二步对于查询的条件对于数据进行过滤JavaRDD filterLogData = uvInitRDD.filter(new Function() {@Overridepublic Boolean call(String logData) throws Exception {String[] logItem = logData.split(" ");//对于不符合的数据直接过滤if (logItem.length != 6) {return false;}//在分区里面得到广播变量HashMap> queryParamsb = queryParamBroadcast.value();//得到所有的条件数据List city = queryParamsb.get("city");List phone = queryParamsb.get("phone");List version = queryParamsb.get("version");String logCity = logItem[3];String logPhone = logItem[4];String logVersion = logItem[5];if (!city.contains(logCity)) {return false;}if (!phone.contains(logPhone)) {return false;}if (!version.contains(logVersion)) {return false;}return true;}});//第三步对于过滤的数据,由于统计的是每天热点uv所以要对词的搜索,所以我想的是(日期_词,用户)//对于日期_词的key进行分组那么我们就得到了每天每一个词访问的用户,我们还要对用户进行去重操作JavaPairRDD dataKeyWordUser = filterLogData.mapToPair(new PairFunction() {@Overridepublic Tuple2 call(String filterLogDataItem) throws Exception {String[] logDataItem = filterLogDataItem.split(" ");String logdata = logDataItem[0];String logUserName = logDataItem[1];String logdataKeyWord = logDataItem[2];return new Tuple2(logdata + "_" +logdataKeyWord , logUserName);}});//第四步,根据Key分组,把RDD里面所有相同的key的数据shuffle到一起JavaPairRDD> dataKeyWordUsers = dataKeyWordUser.groupByKey();//第五步,把分组以后的数据求出对应的UVJavaPairRDD dataKeyWordUV = dataKeyWordUsers.mapToPair(new PairFunction>, String, Integer>() {@Overridepublic Tuple2 call(Tuple2> initData) throws Exception {String dataKeyWord = initData._1;Iterable strings = initData._2;HashSet res = new HashSet<>();for (String string : strings) {res.add(string);}int size = res.size();return new Tuple2(dataKeyWord, size);}});//第六步统计每天的UV的前3名,我想用sparksql对于每天的数据开窗函数以后求每天的前3的数据JavaRDD dataKeyWordUVRow = dataKeyWordUV.map(new Function, Row>() {@Overridepublic Row call(Tuple2 v1) throws Exception {String dataKeyWord = v1._1;String[] initDataKeyWord = dataKeyWord.split("_");String data = initDataKeyWord[0];String keyWord = initDataKeyWord[1];Integer uv = v1._2;return RowFactory.create(data, keyWord, Integer.valueOf(uv));}});//第七步根据上面的Row构建对于的元数据信息List dataKeyWordStruct = Arrays.asList(DataTypes.createStructField("data", DataTypes.StringType, true),DataTypes.createStructField("keyword", DataTypes.StringType, true),DataTypes.createStructField("uv", DataTypes.IntegerType, true));StructType structType = DataTypes.createStructType(dataKeyWordStruct);//第八步,得到对应的表Dataset dataFrame = sqlContext.createDataFrame(dataKeyWordUVRow, structType);dataFrame.registerTempTable("datakeyuv");//这里得到了每天top3的uv热词//+----------+--------+---+----+//|      data| keyword| uv|rank|//+----------+--------+---+----+//|2015-10-02| seafood|  5|   1|//|2015-10-02|barbecue|  2|   2|//|2015-10-01|barbecue|  4|   1|//|2015-10-01| seafood|  3|   2|//+----------+--------+---+----+Dataset rowDataset = sqlContext.sql("select * from" +"(" +"select " +" data," +" keyword," +" uv," +" row_number() over (PARTITION BY data ORDER BY uv DESC) as rank" +" from datakeyuv" +") where rank<=2");//第九步,对于每天的top3的uv求总数,我们用到(日期,uv)分组进行计算rowDataset.javaRDD().mapToPair(new PairFunction() {@Overridepublic Tuple2 call(Row row) throws Exception {return new Tuple2(row.getString(0),row.getInt(2));}}).reduceByKey(new Function2() {@Overridepublic Integer call(Integer num1, Integer num2) throws Exception {return num1+num2;}}).foreach(new VoidFunction>() {@Overridepublic void call(Tuple2 result) throws Exception {//foreach是在每一个分区执行的//最后得到的数据是//(2015-10-01,7)//(2015-10-02,7)System.out.println(result);}});javaSparkContext.close();}
}

scala版本实现

object ScalaUv {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("ScalaUv").setMaster("local")val sparkSql: SparkSession = SparkSession.builder().config(conf).getOrCreate()val sparkContext: SparkContext = sparkSql.sparkContext//这个是用来查询数据的参数val queryParams: mutable.Map[String, List[String]] = scala.collection.mutable.Map[String, List[String]]()queryParams("city") = List("beijing")queryParams("phone") = List("android")queryParams("version") = List("1.0", "1.2", "1.5")val broadValues: Broadcast[mutable.Map[String, List[String]]] = sparkContext.broadcast(queryParams)//得到过滤后的数据val filterLogData: RDD[String] = sparkContext.textFile("data/uvdata.txt").filter(logData => {var flag: Boolean = trueval logDataItem: Array[String] = logData.split(" ")if (logDataItem.length != 6) {false} else {val city: String = logDataItem(3)val phone: String = logDataItem(4)val version: String = logDataItem(5)//对于查询数据不满足条件的删除val queryParam: mutable.Map[String, List[String]] = broadValues.valueif (!queryParam("city").contains(city)) {flag = false}if (!queryParam("phone").contains(phone)) {flag = false}if (!queryParam("version").contains(version)) {flag = false}flag}})//得到(日期_搜索词,用户名)val dataKeyWordUser: RDD[(String, String)] = filterLogData.map(filterLogDataItem => {val logdata: Array[String] = filterLogDataItem.split(" ")val data: String = logdata(0)val userName: String = logdata(1)val keyWord: String = logdata(2)(data + "_" + keyWord, userName)})val dataKeyWordGroup: RDD[(String, Iterable[String])] = dataKeyWordUser.groupByKey()//对于每一天搜索相同的搜索词的用户进行求和对于用户去重val dataKeyWorkUV: RDD[(String, Int)] = dataKeyWordGroup.map(dataKeyWordGroupItem => {val dataKeyWord: String = dataKeyWordGroupItem._1val users: Iterable[String] = dataKeyWordGroupItem._2val distinctUser = scala.collection.mutable.Set[String]()for (user <- users) {distinctUser.add(user)}(dataKeyWord, distinctUser.size)})//映射成Rowval dataKeyWordRow: RDD[Row] = dataKeyWorkUV.map(dataKeyWorkUVItem => {val dateKeyWord: String = dataKeyWorkUVItem._1val dateKeyWordStrings: Array[String] = dateKeyWord.split("_")val date: String = dateKeyWordStrings(0)val keyWord: String = dateKeyWordStrings(1)Row(date, keyWord, dataKeyWorkUVItem._2)})//创建一个元数据对象val dataKeyWordStructType: StructType = StructType(Array(StructField("date", DataTypes.StringType, true),StructField("keyword", DataTypes.StringType, true),StructField("uv", DataTypes.IntegerType, true)))//得到对应的DataFrameval dataKeyWordframe: DataFrame = sparkSql.createDataFrame(dataKeyWordRow, dataKeyWordStructType)//这个创建映射表的作用就是,sparksql最后到RDD找数据的关键dataKeyWordframe.createTempView("dataKeyWord")val rankDataFrame: DataFrame = sparkSql.sql("""|select * from|(| select| date,| keyword,| uv,| row_number() over (PARTITION BY date ORDER BY uv DESC) as rank| from dataKeyWord|) where rank<=2|""".stripMargin)//得到的数据
//    +----------+--------+---+----+
//    |      date| keyword| uv|rank|
//    +----------+--------+---+----+
//    |2015-10-02| seafood|  5|   1|
//    |2015-10-02|barbecue|  2|   2|
//    |2015-10-01|barbecue|  4|   1|
//    |2015-10-01| seafood|  3|   2|
//    +----------+--------+---+----+//得到最后的结果rankDataFrame.rdd.map(row=>{(row.getAs[String]("date"),row.getAs[Int]("uv"))}).reduceByKey(_+_).foreach(println)
//    (2015-10-01,7)
//    (2015-10-02,7)sparkContext.stop()sparkSql.stop()}
}

纯sql实现

object ScalaSparkSqlTop {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("ScalaSparkSqlTop").setMaster("local")val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContextval queryParams: mutable.Map[String, List[String]] = scala.collection.mutable.Map[String, List[String]]()queryParams("city") = List("beijing")queryParams("phone") = List("android")queryParams("version") = List("1.0", "1.2", "1.5")val broadValues: Broadcast[mutable.Map[String, List[String]]] = sparkContext.broadcast(queryParams)//得到过滤后的数据val filterLogData: RDD[String] = sparkContext.textFile("data/uvdata.txt").filter(logData => {var flag: Boolean = trueval logDataItem: Array[String] = logData.split(" ")if (logDataItem.length != 6) {false} else {val city: String = logDataItem(3)val phone: String = logDataItem(4)val version: String = logDataItem(5)//对于查询数据不满足条件的删除val queryParam: mutable.Map[String, List[String]] = broadValues.valueif (!queryParam("city").contains(city)) {flag = false}if (!queryParam("phone").contains(phone)) {flag = false}if (!queryParam("version").contains(version)) {flag = false}flag}})//得到ROWval logDataRDDRow: RDD[Row] = filterLogData.map(filterLogDataItem => {val logdata: Array[String] = filterLogDataItem.split(" ")val date: String = logdata(0)val userName: String = logdata(1)val keyWord: String = logdata(2)val city: String = logdata(3)val phone: String = logdata(4)val version: String = logdata(5)Row(date, userName, keyWord, city, phone, version)})//创建对应的元数据信息val logDataStructType=StructType(Array(StructField("date",DataTypes.StringType,true),StructField("userName",DataTypes.StringType,true),StructField("keyWord",DataTypes.StringType,true),StructField("city",DataTypes.StringType,true),StructField("phone",DataTypes.StringType,true),StructField("version",DataTypes.StringType,true),))val logDataDataFrame: DataFrame = sparkSession.createDataFrame(logDataRDDRow, logDataStructType)logDataDataFrame.createTempView("logData")//对表进行查询sparkSession.sql("""|select date,sum(uv) from|(|	select * from|	(|		select date,KeyWord,uv,row_number() over (partition by date order by uv desc) rank from|		(|			select date,KeyWord,count(distinct userName) as uv|			from logData|			group by date,KeyWord|		)|	) where rank<=2|) group by date|""".stripMargin).show()
//    得到的结果
//    +----------+-------+
//    |      date|sum(uv)|
//    +----------+-------+
//    |2015-10-02|      7|
//    |2015-10-01|      7|
//    +----------+-------+sparkContext.stop()sparkSession.stop()}
}


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部