SparkStreaming(二)入门案例

2、入门案例

2.1、计算单词的数量

Java版本jdk.1.8以下:

public class WordCountOnline {public static void main(String[] args) throws InterruptedException {SparkConf conf = new SparkConf();/** 1、配置应用名称以及配置两个线程(注意要大于等于两个线程)*/conf.setAppName("WordCountOnline").setMaster("local[2]");/**2、 创建SparkStreamingContext* 	可以基于SparkConf参数,也可以基于持久化的SparkStreamingContext进行状态恢复。* 	典型的场景是Driver崩溃后由于SparkStreaming具有连续不断的24小时不间断的运行,所以需要再Driver* 	重现启动后从上次运行的状态恢复过来,此时的状态需要基于曾经的CheckPoint。*/JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(1));/** 3、创建SparkStreaming输入数据来源* 	a.数据输入来源可以基于File,HDFS,Flume,Kafka,Socket等。* 	b.在这里我们指定数据来源于网络Socket端口,SparkStreaming连接上该端口,并在运行的时候一直监听该端口的数据,* 		并且后续根据业务需要不断的有数据产生。* 	c.如果经常在每隔5秒没有数据就不断启动空的job其实是对资源的浪费,因为没有接受到数据,仍然提交了job。* 		实际的做法是提交job会判断是否有数据,如果没有的话就不再提交job。*/JavaReceiverInputDStream lines = jssc.socketTextStream("local", 9999);/** 4、我们就像对RDD编程一样,基于DStream进行编程,原因是DStream是RDD产生的模板,在SparkStreaming发生计算之前,其实质* 	是把每个Batch的DStream的操作翻译成为了RDD操作*///4.1、faltMap操作:将遍历每一行,并且将每一行分割单词返回String的IteratorJavaDStream words = lines.flatMap(new FlatMapFunction() {private static final long serialVersionUID = 1L;@Overridepublic Iterable call(String line) throws Exception {return Arrays.asList(line.split(","));}});//4.2、mapToPair操作:将每个单词计数标记为1JavaPairDStream pairs = words.mapToPair(new PairFunction() {@Overridepublic Tuple2 call(String word) throws Exception {return new Tuple2(word, 1);}});//4.3、reduceByKey操作:将每个相同单词的计数标记1相加JavaPairDStream word_count = pairs.reduceByKey(new Function2() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});/** 4.4、print操作:此处的print方法并不会触发job执行,因为目前代码还处于SparkStreaming框架的控制之下,* 	具体是否触发时取决于设置的Duration时间的间隔。*/word_count.print();/** 5、开始计算:SparkStreaming引擎开始执行,也就是Driver开始运行,Driver启动时位于一条线程中,* 	当然内部当然还有消息循环体,接收应用程序本身或者Executor发送过来的消息。*/jssc.start();//6、等待程序执行结束jssc.awaitTermination();}
}

Java版本jdk1.8:可以使用lambda表达式简化代码:

public class WordCount {public static void main(String[] args) throws InterruptedException {//1、创建一个带有两个执行线程的本地StreamingContext,并且设置流数据每批的间隔为1秒/*** appName参数是应用程序在集群UI上显示的名称。* master是Spark,Mesos或YARN集群URL,或者是在本地模式下运行的特殊"local[*]"字符串。* 实际上,当在集群上运行时,不希望在程序中对master进行硬编码,而是使用spark-submit启动应用程序并在那里接收它。* 但是,对于本地测试和单元测试,您可以传递"local[*]"以在进程中运行Spark Streaming。* 请注意,这会在内部创建一个JavaSparkContext(所有Spark功能的起点),可以作为ssc.sparkContext访问。*/SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(1));/*** 定义上下文后,需要执行以下操作:* 	1.通过创建输入DStreams来定义输入源* 	2.通过将转换和输出操作应用于DStream来定义流式计算。* 	3.开始接收数据并使用streamingContext.start()处理它。* 	4.等待使用streamingContext.awaitTermination()停止处理(手动或由于任何错误)。* 	5.可以使用streamingContext.stop()手动停止处理。* 要记住的要点:*  1.一旦启动了上下文,就不能设置或添加新的流式计算。*  2.上下文停止后,无法重新启动。*  3.在JVM中只能同时激活一个StreamingContext。*  4.StreamingContext上的stop()也会停止SparkContext。要仅停止StreamingContext,请将名为stopSparkContext的stop()的可选参数设置为false。*  5.只要在创建下一个StreamingContext之前停止前一个StreamingContext(不停止SparkContext),就可以重复使用SparkContext来创建多个StreamingContexts。*///2、使用此context,我们可以创建一个DStream,它表示来自特定主机名(例如localhost)和端口(例如9999)TCP源的流数据。JavaReceiverInputDStream lines = jssc.socketTextStream("localhost", 9999);//3、将每行文本以空格符切分成一个个单词JavaDStream words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());//4、计算每批单词的数量JavaPairDStream pairs = words.mapToPair(s -> new Tuple2<>(s, 1));JavaPairDStream wordCounts = pairs.reduceByKey((i1,i2) -> i1 + i2);wordCounts.print();//5、开始计算jssc.start();//6、等待计算终止jssc.awaitTermination();}
}

2.2、流式筛选并打印出包含”error”的行

public class WordFilter {public static void main(String[] args) throws InterruptedException {//创建一个Java版本的Spark Context SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordFilter");//从SparkConf创建StreamingContext并指定1秒钟的批处理大小JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(1));//以端口7777作为输入来源创建DStreamJavaReceiverInputDStream lines = jssc.socketTextStream("localhost", 7777);//从DStream中筛选出包含字符串"error"的行JavaDStream errorLines = lines.filter(new Function(){@Overridepublic Boolean call(String line) throws Exception {return line.contains("error");}});//打印出有"error"的行errorLines.print();//启动流计算环境StreamingContext并等待它"完成"jssc.start();//等待作业完成jssc.awaitTermination();}
}

 

 

 

 

 

 

 

 


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部