Spark Streaming实现实时WordCount,DStream的使用,updateStateByKey(func)实现累计计算单词出现频率

一、 实战

1.用Spark Streaming实现实时WordCount
架构图:
这里写图片描述
说明:在hadoop1:9999下的nc上发送消息,消费端接收消息,然后并进行单词统计计算。

* 2.安装并启动生成者 *
首先在一台Linux(ip:192.168.10.101)上用YUM安装nc工具
yum install -y nc

启动一个服务端并监听9999端口
nc -lk 9999

2.编写Spark Streaming程序
编写Pom文件


<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0modelVersion><groupId>cn.toto.sparkgroupId><artifactId>bigdataartifactId><version>1.0-SNAPSHOTversion><properties><maven.compiler.source>1.7maven.compiler.source><maven.compiler.target>1.7maven.compiler.target><encoding>UTF-8encoding><scala.version>2.10.6scala.version><spark.version>1.6.2spark.version><hadoop.version>2.6.4hadoop.version>properties><dependencies><dependency><groupId>org.scala-langgroupId><artifactId>scala-libraryartifactId><version>${scala.version}version>dependency><dependency><groupId>org.scala-langgroupId><artifactId>scala-compilerartifactId><version>${scala.version}version>dependency><dependency><groupId>org.scala-langgroupId><artifactId>scala-reflectartifactId><version>${scala.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-core_2.10artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.hadoopgroupId><artifactId>hadoop-clientartifactId><version>${hadoop.version}version>dependency><dependency><groupId>mysqlgroupId><artifactId>mysql-connector-javaartifactId><version>5.1.38version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-sql_2.10artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-mllib_2.10artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-hive_2.10artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-streaming_2.10artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-streaming-flume_2.10artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-streaming-kafka_2.10artifactId><version>${spark.version}version>dependency>dependencies><build><sourceDirectory>src/main/scalasourceDirectory><testSourceDirectory>src/test/scalatestSourceDirectory><plugins><plugin><groupId>net.alchim31.mavengroupId><artifactId>scala-maven-pluginartifactId><version>3.2.2version><executions><execution><goals><goal>compilegoal><goal>testCompilegoal>goals><configuration><args><arg>-make:transitivearg><arg>-dependencyfilearg><arg>${project.build.directory}/.scala_dependenciesarg>args>configuration>execution>executions>plugin><plugin><groupId>org.apache.maven.pluginsgroupId><artifactId>maven-surefire-pluginartifactId><version>2.18.1version><configuration><useFile>falseuseFile><disableXmlReport>truedisableXmlReport><includes><include>**/*Test.*include><include>**/*Suite.*include>includes>configuration>plugin><plugin><groupId>org.apache.maven.pluginsgroupId><artifactId>maven-shade-pluginartifactId><version>2.4.3version><executions><execution><phase>packagephase><goals><goal>shadegoal>goals><configuration><filters><filter><artifact>*:*artifact><excludes><exclude>META-INF/*.SFexclude><exclude>META-INF/*.DSAexclude><exclude>META-INF/*.RSAexclude>excludes>filter>filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>cn.toto.spark.JdbcRDDDemomainClass>transformer>transformers>configuration>execution>executions>plugin>plugins>build>project>
package cn.toto.spark.streamsimport org.apache.log4j.{Level, Logger}
import org.apache.spark.Loggingimport org.apache.log4j.{Logger, Level}
import org.apache.spark.Loggingobject LoggerLevels extends Logging {def setStreamingLogLevels() {val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElementsif (!log4jInitialized) {logInfo("Setting log level to [WARN] for streaming example." +" To override add a custom log4j.properties to the classpath.")Logger.getRootLogger.setLevel(Level.WARN)}}
}package cn.toto.sparkimport cn.toto.spark.streams.LoggerLevels
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by toto on 2017/7/13.*/
object NetworkWordCount {def main(args: Array[String]) {//设置日志级别LoggerLevels.setStreamingLogLevels()//创建SparkConf并设置为本地模式运行//注意local[2]代表开两个线程val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")//设置DStream批次时间间隔为5秒val ssc = new StreamingContext(conf, Seconds(5))//通过网络读取数据val lines = ssc.socketTextStream("hadoop1", 9999)//将读到的数据用空格切成单词val words = lines.flatMap(_.split(" "))//将单词和1组成一个pairval pairs = words.map(word => (word, 1))//按单词进行分组求相同单词出现的次数val wordCounts = pairs.reduceByKey(_ + _)//打印结果到控制台wordCounts.print()//开始计算ssc.start()//等待停止ssc.awaitTermination()}
}

3.启动Spark Streaming程序:由于使用的是本地模式”local[2]”所以可以直接在本地运行该程序
注意: 要指定并行度,如在本地运行设置setMaster(“local[2]”),相当于启动两个线程,一个给receiver,一个给computer。如果是在集群中运行,必须要求集群中可用core数大于1
这里写图片描述

4.在Linux端命令行中输入单词
这里写图片描述

5.在IDEA控制台中查看结果
这里写图片描述

二、DStream的使用

package cn.toto.sparkimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by toto on 2017/7/13.*/
object StreamingWordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[2]")//创建StreamingContext并设置产生批次的间隔时间val ssc = new StreamingContext(conf, Seconds(5))//从Socket端口中创建RDDval lines:ReceiverInputDStream[String] = ssc.socketTextStream("hadoop1",9999)val words: DStream[String] = lines.flatMap(_.split(" "))val wordAndOne: DStream[(String, Int)] = words.map((_, 1))val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)//打印result.print()//开启程序ssc.start()//等待结束ssc.awaitTermination()}
}

运行结果:
这里写图片描述


上面的案例中,所有的都是临时计算,然后获得到结果内容,第二次计算的时候结果值不是在上一次基础上进行累加的。下面的案例中将实现累加的效果:

在上述的wordCount案例中,每次在Linux端输入的单词次数都被正确的统计出来,但是结果不能累加,如果需要累加需要使用updateStateByKey(func)来更新状态

package cn.toto.sparkimport cn.toto.spark.streams.LoggerLevels
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}object NetworkUpdateStateWordCount {/*** String : 单词* Seq[Int] :单词在当前批次出现的次数* Option[Int] : 历史结果*/val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {//iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))iter.flatMap{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))}}def main(args: Array[String]) {LoggerLevels.setStreamingLogLevels()val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkUpdateStateWordCount")val ssc = new StreamingContext(conf, Seconds(5))//做checkpoint 写入共享存储中ssc.checkpoint("E://workspace//netresult")val lines = ssc.socketTextStream("hadoop1", 9999)//reduceByKey 结果不累加//val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)//updateStateByKey结果可以累加但是需要传入一个自定义的累加函数:updateFuncval results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true)results.print()ssc.start()ssc.awaitTermination()}}

在nc上输入内容:
这里写图片描述

运行结果如下:
这里写图片描述


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部