Spark Streaming Join

多数据源Join思路

多数据源Join大致有以下三种思路:

  • 数据源端Join,如Android/IOS客户端在上报用户行为数据时就获取并带上用户基础信息。

  • 计算引擎上Join,如用Spark Streaming、Flink做Join。

  • 结果端Join,如用HBase/ES做Join,Join键做Rowkey/_id,各字段分别写入列簇、列或field。

三种思路各有优劣,使用时注意一下。这里总结在计算引擎Spark Streaming上做Join。

Stream-Static Join

流与完全静态数据Join

流与完全静态数据Join。有两种方式,一种是RDD Join方式,另一种是Broadcast Join(也叫Map-Side Join)方式。

RDD Join 方式

思路:RDD Join RDD 。

package com.bigData.sparkimport com.alibaba.fastjson.{JSON, JSONException, JSONObject}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Durations, StreamingContext}/*** Author: Wang Pei* License: Copyright(c) Pei.Wang* Summary:** Stream-Static Join** spark 2.2.2**/
case class UserInfo(userID:String,userName:String,userAddress:String)
object StreamStaicJoin {def main(args: Array[String]): Unit = {//设置日志等级Logger.getLogger("org").setLevel(Level.WARN)//Kafka 参数val kafkaParams= Map[String, Object]("bootstrap.servers" -> "localhost:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"auto.offset.reset" -> "latest","enable.auto.commit" -> (true: java.lang.Boolean),"group.id" -> "testTopic3_consumer_v1")//spark环境val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.replace("$","")).setMaster("local[3]")val ssc = new StreamingContext(sparkConf,Durations.seconds(10))/** 1) 静态数据: 用户基础信息*/val userInfo=ssc.sparkContext.parallelize(Array(UserInfo("user_1","name_1","address_1"),UserInfo("user_2","name_2","address_2"),UserInfo("user_3","name_3","address_3"),UserInfo("user_4","name_4","address_4"),UserInfo("user_5","name_5","address_5"))).map(item=>(item.userID,item))/** 2) 流式数据: 用户发的tweet数据*//** 数据示例:* eventTime:事件时间、retweetCount:转推数、language:语言、userID:用户ID、favoriteCount:点赞数、id:事件ID* {"eventTime": "2018-11-05 10:04:00", "retweetCount": 1, "language": "chinese", "userID": "user_1", "favoriteCount": 1, "id": 4909846540155641457} */val kafkaDStream=KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Set("testTopic3"),kafkaParams)).map(item=>parseJson(item.value())).map(item=>{val userID = item.getString("userID")val eventTime = item.getString("eventTime")val language= item.getString("language")val favoriteCount = item.getInteger("favoriteCount")val retweetCount = item.getInteger("retweetCount")(userID,(userID,eventTime,language,favoriteCount,retweetCount))})/** 3) 流与静态数据做Join (RDD Join 方式)*/kafkaDStream.foreachRDD(_.join(userInfo).foreach(println))ssc.start()ssc.awaitTermination()}/**json解析*/def parseJson(log:String):JSONObject={var ret:JSONObject=nulltry{ret=JSON.parseObject(log)}catch {//异常json数据处理case e:JSONException => println(log)}ret}}

stream_static_rdd_join.png

Broadcast Join 方式

思路:RDD遍历每一条数据,去匹配广播变量中的值。

package com.bigData.sparkimport com.alibaba.fastjson.{JSON, JSONException, JSONObject}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Durations, StreamingContext}/*** Author: Wang Pei* License: Copyright(c) Pei.Wang* Summary:** Stream-Static Join** spark 2.2.2**/
case class UserInfo(userID:String,userName:String,userAddress:String)
object StreamStaticJoin2 {def main(args: Array[String]): Unit = {//设置日志等级Logger.getLogger("org").setLevel(Level.WARN)//Kafka 参数val kafkaParams= Map[String, Object]("bootstrap.servers" -> "localhost:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"auto.offset.reset" -> "latest","enable.auto.commit" -> (true: java.lang.Boolean),"group.id" -> "testTopic3_consumer_v1")//spark环境val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.replace("$","")).setMaster("local[3]")val ssc = new StreamingContext(sparkConf,Durations.seconds(10))/** 1) 静态数据: 用户基础信息。 将用户基础信息广播出去。*/val broadcastUserInfo=ssc.sparkContext.broadcast(Map("user_1"->UserInfo("user_1","name_1","address_1"),"user_2"->UserInfo("user_2","name_2","address_2"),"user_3"->UserInfo("user_3","name_3","address_3"),"user_4"->UserInfo("user_4","name_4","address_4"),"user_5"->UserInfo("user_5","name_5","address_5")))/** 2) 流式数据: 用户发的tweet数据*//** 数据示例:* eventTime:事件时间、retweetCount:转推数、language:语言、userID:用户ID、favoriteCount:点赞数、id:事件ID* {"eventTime": "2018-11-05 10:04:00", "retweetCount": 1, "language": "chinese", "userID": "user_1", "favoriteCount": 1, "id": 4909846540155641457} */val kafkaDStream=KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](List("testTopic3"),kafkaParams)).map(item=>parseJson(item.value())).map(item=>{val userID = item.getString("userID")val eventTime = item.getString("eventTime")val language= item.getString("language")val favoriteCount = item.getInteger("favoriteCount")val retweetCount = item.getInteger("retweetCount")(userID,(userID,eventTime,language,favoriteCount,retweetCount))})/** 3) 流与静态数据做Join (Broadcast Join 方式)*/val result=kafkaDStream.mapPartitions(part=>{val userInfo = broadcastUserInfo.valuepart.map(item=>{(item._1,(item._2,userInfo.getOrElse(item._1,null)))})})result.foreachRDD(_.foreach(println))ssc.start()ssc.awaitTermination()}/**json解析*/def parseJson(log:String):JSONObject={var ret:JSONObject=nulltry{ret=JSON.parseObject(log)}catch {//异常json数据处理case e:JSONException => println(log)}ret}}

stream_static_rdd_join2.png

流与半静态数据Join

半静态数据指的是放在Redis等的数据,会被更新。

思路:RDD 每个Partition连接一次Redis,遍历Partition中每条数据,根据k,去Redis中查找v。

package com.bigData.sparkimport com.alibaba.fastjson.{JSON, JSONException, JSONObject}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Durations, StreamingContext}
import redis.clients.jedis.Jedis/*** Author: Wang Pei* License: Copyright(c) Pei.Wang* Summary:** Stream-Static Join** spark 2.2.2**/
object StreamStaicJoin3 {def main(args: Array[String]): Unit = {//设置日志等级Logger.getLogger("org").setLevel(Level.WARN)//Kafka 参数val kafkaParams= Map[String, Object]("bootstrap.servers" -> "localhost:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"auto.offset.reset" -> "latest","enable.auto.commit" -> (true: java.lang.Boolean),"group.id" -> "testTopic3_consumer_v1")//spark环境val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.replace("$","")).setMaster("local[3]")val ssc = new StreamingContext(sparkConf,Durations.seconds(10))/** 1) 半静态数据: 用户基础信息,在Redis中*//** HMSET user_1 userID "user_1" name "name_1" address "address_1" *//** HMSET user_2 userID "user_2" name "name_2" address "address_2" *//** 2) 流式数据: 用户发的tweet数据*//** 数据示例:* eventTime:事件时间、retweetCount:转推数、language:语言、userID:用户ID、favoriteCount:点赞数、id:事件ID* {"eventTime": "2018-11-05 10:04:00", "retweetCount": 1, "language": "chinese", "userID": "user_1", "favoriteCount": 1, "id": 4909846540155641457} */val kafkaDStream=KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Set("testTopic3"),kafkaParams)).map(item=>parseJson(item.value())).map(item=>{val userID = item.getString("userID")val eventTime = item.getString("eventTime")val language= item.getString("language")val favoriteCount = item.getInteger("favoriteCount")val retweetCount = item.getInteger("retweetCount")(userID,(userID,eventTime,language,favoriteCount,retweetCount))})/** 3) 流与半静态数据做Join (RDD Join 方式)*/val result=kafkaDStream.mapPartitions(part=>{val redisCli=connToRedis("localhost",6379,3000,10)part.map(item=>{(item._1,(item._2,redisCli.hmget(item._1,"userID","name","address")))})})result.foreachRDD(_.foreach(println))ssc.start()ssc.awaitTermination()}/**json解析*/def parseJson(log:String):JSONObject={var ret:JSONObject=nulltry{ret=JSON.parseObject(log)}catch {//异常json数据处理case e:JSONException => println(log)}ret}/**连接到redis*/def connToRedis(redisHost:String,redisPort:Int,timeout:Int,dbNum:Int): Jedis ={val redisCli=new Jedis(redisHost,redisPort,timeout)redisCli.connect()redisCli.select(dbNum)redisCli}}

stream_static_join3.png

Stream-Stream Join

流与流Join。

思路:DStream Join DStream。

package com.bigData.sparkimport com.alibaba.fastjson.{JSON, JSONException, JSONObject}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Durations, StreamingContext}/*** Author: Wang Pei* License: Copyright(c) Pei.Wang* Summary:** Stream-Stream Join** spark 2.2.2**/
object StreamStreamJoin {def main(args: Array[String]): Unit = {//设置日志等级Logger.getLogger("org").setLevel(Level.WARN)//Kafka 参数val kafkaParams1= Map[String, Object]("bootstrap.servers" -> "localhost:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"auto.offset.reset" -> "latest","enable.auto.commit" -> (true: java.lang.Boolean),"group.id" -> "testTopic3_consumer_v1")val kafkaParams2= Map[String, Object]("bootstrap.servers" -> "localhost:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"auto.offset.reset" -> "latest","enable.auto.commit" -> (true: java.lang.Boolean),"group.id" -> "testTopic4_consumer_v1")//spark环境val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.replace("$","")).setMaster("local[3]")val ssc = new StreamingContext(sparkConf,Durations.seconds(10))/** 1) 流式数据: 用户发的tweet数据*//** 数据示例:* eventTime:事件时间、retweetCount:转推数、language:语言、userID:用户ID、favoriteCount:点赞数、id:事件ID* {"eventTime": "2018-11-05 10:04:00", "retweetCount": 1, "language": "chinese", "userID": "user_1", "favoriteCount": 1, "id": 4909846540155641457} */val kafkaDStream1=KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](List("testTopic3"),kafkaParams1)).map(item=>parseJson(item.value())).map(item=>{val userID = item.getString("userID")val eventTime = item.getString("eventTime")val language= item.getString("language")val favoriteCount = item.getInteger("favoriteCount")val retweetCount = item.getInteger("retweetCount")(userID,(userID,eventTime,language,favoriteCount,retweetCount))})/** 2) 流式数据: 用户发的tweet数据*//** 数据示例:* eventTime:事件时间、retweetCount:转推数、language:语言、userID:用户ID、favoriteCount:点赞数、id:事件ID* {"eventTime": "2018-11-05 10:04:00", "retweetCount": 1, "language": "chinese", "userID": "user_1", "favoriteCount": 1, "id": 4909846540155641457} */val kafkaDStream2=KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](List("testTopic4"),kafkaParams2)).map(item=>parseJson(item.value())).map(item=>{val userID = item.getString("userID")val eventTime = item.getString("eventTime")val language= item.getString("language")val favoriteCount = item.getInteger("favoriteCount")val retweetCount = item.getInteger("retweetCount")(userID,(userID,eventTime,language,favoriteCount,retweetCount))})/** 3) Stream-Stream Join*/val joinedDStream = kafkaDStream1.leftOuterJoin(kafkaDStream2)joinedDStream.foreachRDD(_.foreach(println))ssc.start()ssc.awaitTermination()}/**json解析*/def parseJson(log:String):JSONObject={var ret:JSONObject=nulltry{ret=JSON.parseObject(log)}catch {//异常json数据处理case e:JSONException => println(log)}ret}}

stream_stream_join.png


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部