算子案例
文章目录
- RDD转换算子案例
- 1. 数据准备
- 2.需求描述
- 3.代码实现
数据文件链接
https://gitee.com/sparky_z/spark-data-file
RDD转换算子案例
1. 数据准备
agent.log:时间戳,省份,城市,用户,广告,中间字段使用空格分隔。
2.需求描述
统计出每一个省份每个广告被点击数量排行的 Top3
3.代码实现
package rdd.operator.transformimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Spark24_RDD_Req {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 案例实操// 1. 获取原始数据:时间戳,省份,城市,用户,广告val dataRDD = sc.textFile("datas/agent.log")// 2. 将原始数据进行结构的转换。方便统计// 时间戳,省份,城市,用户,广告// =>// ( ( 省份,广告 ), 1 )val mapRDD = dataRDD.map(line => {val datas = line.split(" ")(( datas(1), datas(4) ), 1)})// 3. 将转换结构后的数据,进行分组聚合// ( ( 省份,广告 ), 1 ) => ( ( 省份,广告 ), sum )val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey(_+_)// 4. 将聚合的结果进行结构的转换// ( ( 省份,广告 ), sum ) => ( 省份, ( 广告, sum ) )val newMapRDD = reduceRDD.map{case ( (prv, ad), sum ) => {(prv, (ad, sum))}}// 5. 将转换结构后的数据根据省份进行分组// ( 省份, 【( 广告A, sumA ),( 广告B, sumB )】 )val groupRDD: RDD[(String, Iterable[(String, Int)])] = newMapRDD.groupByKey()// 6. 将分组后的数据组内排序(降序),取前3名val resultRDD = groupRDD.mapValues(iter => {iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)})// 7. 采集数据打印在控制台resultRDD.collect().foreach(println)sc.stop()}
}

本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
