获取系统URL访问的前三名(通过Scala方式实现/通过Spark方式实现),Spark将URL访问日志进行分类并通过自定义Partitioner的方式将文件写入到不同分区上

1、创建Maven项目

创建的过程参考:http://blog.csdn.net/tototuzuoquan/article/details/74571374

2、准备日志文件

url.log的内容类似:
这里写图片描述

20160321101954  http://java.toto.cn/java/course/javaeeadvanced.shtml
20160321101954  http://java.toto.cn/java/course/javaee.shtml
20160321101954  http://java.toto.cn/java/course/android.shtml
20160321101954  http://java.toto.cn/java/video.shtml
20160321101954  http://java.toto.cn/java/teacher.shtml
20160321101954  http://java.toto.cn/java/course/android.shtml
20160321101954  http://php.toto.cn/php/teacher.shtml
20160321101954  http://net.toto.cn/net/teacher.shtml
20160321101954  http://java.toto.cn/java/course/hadoop.shtml
20160321101954  http://java.toto.cn/java/course/base.shtml
20160321101954  http://net.toto.cn/net/course.shtml
20160321101954  http://php.toto.cn/php/teacher.shtml
20160321101954  http://net.toto.cn/net/video.shtml
20160321101954  http://java.toto.cn/java/course/base.shtml
20160321101954  http://net.toto.cn/net/teacher.shtml
20160321101954  http://java.toto.cn/java/video.shtml
20160321101954  http://java.toto.cn/java/video.shtml

3、编写UrlCount1,代码如下:

通过scala的方式获取日志文件中每类次主机名出现的前3名

package cn.toto.sparkimport java.net.URLimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** 获取到每类host出现的次数的前三名,下面通过sacle的方式实现* Created by toto on 2017/7/8.*/
object UrlCount1 {def main(args: Array[String]): Unit = {//使用local就是启动一个线程,local[2]表示启动2个线程,Local[*]表示根据机器来自动分配val conf = new SparkConf().setAppName("UrlCount1").setMaster("local[2]")val sc = new SparkContext(conf)val lines:RDD[String] = sc.textFile(args(0))//splitval urlAndOne = lines.map(line =>{val fields = line.split("\t")val url = fields(1)//封装成url,次数(url,1)})//聚合,计算某个url出现了多少次,所以要聚合一下,这里做了Cache,问题是当数据量很大的时候,可能出现内存溢出val summedUrl = urlAndOne.reduceByKey(_+_).cache()println(summedUrl)//返回的是[(host,url,次数)]这样的元组//groupBy(_._1)    表示按照host进行分组val grouped = summedUrl.map(t => {val host = new URL(t._1).getHost//主机名,url,次数(host,t._1,t._2)}).groupBy(_._1)println(grouped)//_                  :表示上面的集合//toList             :表示它转化为集合//sortBy             :这里是scala的集合//_._3               :表示按照次数进行排序//.reverse.take(3)   :表示取前3名val result = grouped.mapValues(_.toList.sortBy(_._3).reverse.take(3))println(result.collect().toBuffer)sc.stop()}
}

运行参数配置:
这里写图片描述
运行结果:
这里写图片描述


4、通过Spark的方式计算URL出现的前3名

代码如下:

package cn.toto.sparkimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** 使用SparkRDD的方式取出每个子Host的出现的次数的前3名,并循环打印出来。* Created by toto on 2017/7/8.*/
object UrlCount2 {/*** 使用了Spark的RDD缓存机制,这样再进行排序时不会出现内存溢出* @param args*/def main(args: Array[String]): Unit = {//后续这些Url就从数据库中获取到val urls = Array("http://java.toto.cn","http://php.toto.cn","http://net.toto.cn")val conf = new SparkConf().setAppName("UrlCount1").setMaster("local[2]")val sc = new SparkContext(conf)val lines:RDD[String] = sc.textFile(args(0))//splitval urlAndOne = lines.map(line => {val fields = line.split("\t")val url = fields(1)//(url,次数)(url,1)})//聚合val summedUrl = urlAndOne.reduceByKey(_+_)//循环过滤for(u <- urls) {//过滤(值过滤出urls这些的内容)val insRdd = summedUrl.filter(t => {val url = t._1url.startsWith(u)})val result = insRdd.sortBy(_._2, false).take(3)println(result.toBuffer)}sc.stop()}
}

运行参数配置:
这里写图片描述
运行结果:
这里写图片描述


5、将url进行筛选,分类,并通过自定义分区将数据存储到不同的文件中

package cn.toto.sparkimport java.net.URLimport org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}import scala.collection.mutable/*** 自定义Partitioner,按照不同的子主机名存储到不同的分区文件中* Created by toto on 2017/7/8.*/
object UrlCount3 {/*** 如果把每个学员单独产生的内容都写入到磁盘文件中* @param args*/def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("UrlCount1").setMaster("local[2]")val sc = new SparkContext(conf)val lines : RDD[String] = sc.textFile("E:\\workspace\\url.log")//splitval urlAndOne = lines.map(line => {val fields = line.split("\t")val url = fields(1)(url,1)})//聚合val summedUrl = urlAndOne.reduceByKey(_+_).cache()val rdd1 = summedUrl.map(t => {val host = new URL(t._1).getHost//(host,(url,出现次数))(host,(t._1,t._2))})val urls = rdd1.map(_._1).distinct().collect()val partitioner = new HostPartitioner(urls)//安装自定义的分区器重新分区val partitionedRdd = rdd1.partitionBy(partitioner)val result = partitionedRdd.mapPartitions(it => {it.toList.sortBy(_._2._2).reverse.take(3).iterator})result.saveAsTextFile("E:\\workspace\\out")sc.stop()}
}class HostPartitioner(urls: Array[String]) extends Partitioner {val rules = new mutable.HashMap[String,Int]()var index = 0for(url <- urls){rules.put(url,index)index += 1}override def getPartition(key: Any): Int = {val url = key.toString//如果取到了值就返回url,否则返回0rules.getOrElse(url,0)}//分区数量override def numPartitions: Int = urls.length
}

最终的输出内容是:
这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部