Storm 实验:广告实时统计

一、实验描述

  实验数据通过java代码生成,每条数据的格式如下:
     时间戳 省份 城市 用户ID 广告ID
  实验主要完成三个需求,即
(1) 实现实时的动态黑名单机制: 把每天对某个广告点击超过100次的用户拉黑,黑名单用户ID存入Mysql
(2) 实时统计每天各省市各广告的点击总流量,并将其存入Mysql
(3) 最近1分钟广告总点击量,每10s计算一次,并通过html展示

二、实验代码

  • 数据模拟生成
package cn.edu.neu.advertise;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.*;/*** @author 32098*/
public class KafkaDataMocker {public static List<String> mockData() throws InterruptedException {List<String> list = new ArrayList<>();List<String> provinceList = Arrays.asList("江西", "辽宁", "浙江", "广东", "湖南", "湖北", "吉林", "黑龙江", "福建");List<String> cityList = Arrays.asList("南昌","沈阳","杭州","广州","长沙","武汉","长春","哈尔滨","厦门");int len = provinceList.size();Random r = new Random();for(int i=0; i<r.nextInt(100); i++){Thread.sleep(10);int idx = r.nextInt(len);String province = provinceList.get(idx);String city = cityList.get(idx);String uid = "U" + r.nextInt(10);String aid = "Ad_" + r.nextInt(20);String record = System.currentTimeMillis() + " " + province + " " + city + " " + uid + " " + aid;list.add(record);}return list;}public static void main(String[] args) throws InterruptedException {HashMap<String, Object> pros = new HashMap<>(3);pros.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092");pros.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");pros.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(pros);int ite = 0;while (true){ite = ite + 1;System.out.println("################"+ ite +"################");List<String> records = mockData();records.forEach(elem -> {System.out.println(elem);ProducerRecord<String, String> record = new ProducerRecord<>("advertise-user", null, elem);producer.send(record);});Thread.sleep(1000);}}
}/*
启动Kafka服务器:kafka-server-start.sh config/server.properties
启动Kafka消费者(console):kafka-console-consumer.sh --bootstrap-server master:9092 --topic storm-topic --from-beginning*/
  • MysqlUtil
package cn.edu.neu.advertise;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;/*** @author 32098*/
public class MysqlUtil {public static Connection getConnection(){String url = "jdbc:mysql://master:3306/user_advertise?useUnicode=true&characterEncoding=utf-8";String user = "root";String password = "Hive@2020";try {return DriverManager.getConnection(url, user, password);} catch (SQLException throwables) {throwables.printStackTrace();}return null;}
}
  • 实时的动态黑名单机制
package cn.edu.neu.advertise;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.Map;/*** @author 32098* 实现实时的动态黑名单机制: 把每天对某个广告点击超过100次的用户拉黑,黑名单用户ID存入Mysql*/
public class BlackListBoltA extends BaseRichBolt {private OutputCollector collector;@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void execute(Tuple input) {try {String value = input.getStringByField("value");// 必须ack,否则会重复消费kafka中的消息collector.ack(input);System.out.println("Received from kafka: "+ value);String[] strs = value.split(" ");collector.emit(new Values(strs[0], strs[3], strs[4]));}catch (Exception e){e.printStackTrace();collector.fail(input);}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("time_stamp", "uid", "aid"));}
}
package cn.edu.neu.advertise;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;/*** @author 32098* 实现实时的动态黑名单机制: 把每天对某个广告点击超过100次的用户拉黑,黑名单用户ID存入Mysql*/
public class BlackListBoltB extends BaseRichBolt {private OutputCollector collector;@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector;}@Overridepublic void execute(Tuple tuple) {String ts = tuple.getStringByField("time_stamp");String uid = tuple.getStringByField("uid");String aid = tuple.getStringByField("aid");String value = "1";collector.ack(tuple);try {Connection conn = MysqlUtil.getConnection();assert conn != null;PreparedStatement ps = conn.prepareStatement("select uid from black_list where uid=?");ps.setString(1, uid);ResultSet rs = ps.executeQuery();if(!rs.next()){String day = new SimpleDateFormat("yyyy-MM-dd").format(new Date(Long.parseLong(ts.trim())));ps = conn.prepareStatement("select * from user_advertise where day=? and uid=? and aid=?");ps.setString(1, day);ps.setString(2, uid);ps.setString(3, aid);rs = ps.executeQuery();if(rs.next()){PreparedStatement psA = conn.prepareStatement("update user_advertise set count = count + ? where day=? and uid=? and aid=?");// psA.setInt(1, 1);psA.setString(1, value);psA.setString(2, day);psA.setString(3, uid);psA.setString(4, aid);psA.executeUpdate();psA.close();}else{PreparedStatement psB = conn.prepareStatement("insert into user_advertise(day,uid,aid,count) values (?,?,?,?)");psB.setString(1, day);psB.setString(2, uid);psB.setString(3, aid);psB.setString(4, value);psB.executeUpdate();psB.close();}ps = conn.prepareStatement("select * from user_advertise where day=? and uid=? and aid=? and count>60");ps.setString(1, day);ps.setString(2, uid);ps.setString(3, aid);rs = ps.executeQuery();if(rs.next()){PreparedStatement psC = conn.prepareStatement("insert into black_list(uid) value(?) on duplicate key update uid=?");psC.setString(1, uid);psC.setString(2, uid);psC.executeUpdate();psC.close();}ps.close();}conn.close();} catch (SQLException throwables) {throwables.printStackTrace();}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {}
}
package cn.edu.neu.advertise;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;/*** @author 32098*/
public class BlackListApp {private static final String BOOTSTRAP_SERVERS = "master:9092";private static final String TOPIC_NAME = "advertise-user";public static void main(String[] args) {final TopologyBuilder builder = new TopologyBuilder();builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1);builder.setBolt("boltA", new BlackListBoltA(), 2).setNumTasks(2).shuffleGrouping("kafka_spout");builder.setBolt("boltB", new BlackListBoltB(), 2).setNumTasks(2).fieldsGrouping("boltA", new Fields("time_stamp", "uid", "aid"));// 如果外部传参cluster则代表线上环境启动,否则代表本地启动if (args.length > 0 && "cluster".equals(args[0])) {try {StormSubmitter.submitTopology("Cluster-BlackListApp", new Config(), builder.createTopology());} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {e.printStackTrace();}} else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("Local-BlackListApp",new Config(), builder.createTopology());}}private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig() {return KafkaSpoutConfig.builder(BlackListApp.BOOTSTRAP_SERVERS, BlackListApp.TOPIC_NAME)// 除了分组ID,以下配置都是可选的。分组ID必须指定,否则会抛出InvalidGroupIdException异常.setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")// 定义重试策略.setRetry(getRetryService())// 定时提交偏移量的时间间隔,默认是15s.setOffsetCommitPeriodMs(10_000).build();}/*** 定义重试策略* @return KafkaSpoutRetryService*/private static KafkaSpoutRetryService getRetryService() {return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));}
}
  • 实时统计每天各省市各广告的点击总流量
package cn.edu.neu.advertise;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.Map;/*** @author 32098* 实时统计每天各省市各广告的点击总流量,并将其存入Mysql*/
public class ClickCountBoltA extends BaseRichBolt {private OutputCollector collector;@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void execute(Tuple input) {try {String value = input.getStringByField("value");// 必须ack,否则会重复消费kafka中的消息collector.ack(input);System.out.println("Received from kafka: "+ value);String[] strs = value.split(" ");collector.emit(new Values(strs[0], strs[1], strs[2], strs[4]));}catch (Exception e){e.printStackTrace();collector.fail(input);}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("time_stamp", "province", "city", "aid"));}
}
package cn.edu.neu.advertise;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;/*** @author 32098*/
public class ClickCountBoltB extends BaseRichBolt {private OutputCollector collector;@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector;}@Overridepublic void execute(Tuple tuple) {String ts = tuple.getStringByField("time_stamp");String province = tuple.getStringByField("province");String city = tuple.getStringByField("city");String aid = tuple.getStringByField("aid");String value = "1";collector.ack(tuple);String day = new SimpleDateFormat("yyyy-MM-dd").format(new Date(Long.parseLong(ts.trim())));try {Connection conn = MysqlUtil.getConnection();assert conn != null;PreparedStatement ps = conn.prepareStatement("insert into province_city_advertise(day,province,city,aid,count) values (?,?,?,?,?) on duplicate key update count=count+?");ps.setString(1, day);ps.setString(2, province);ps.setString(3, city);ps.setString(4, aid);ps.setString(5, value);ps.setString(6, value);ps.executeUpdate();ps.close();conn.close();} catch (SQLException throwables) {throwables.printStackTrace();}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {}
}
package cn.edu.neu.advertise;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;/*** @author 32098*/
public class ClickCountApp {private static final String BOOTSTRAP_SERVERS = "master:9092";private static final String TOPIC_NAME = "advertise-user";public static void main(String[] args) {final TopologyBuilder builder = new TopologyBuilder();builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1);builder.setBolt("boltA", new ClickCountBoltA()).shuffleGrouping("kafka_spout");builder.setBolt("boltB", new ClickCountBoltB()).fieldsGrouping("boltA", new Fields("time_stamp", "province", "city", "aid"));// 如果外部传参cluster则代表线上环境启动,否则代表本地启动if (args.length > 0 && "cluster".equals(args[0])) {try {StormSubmitter.submitTopology("Cluster-ClickCountApp", new Config(), builder.createTopology());} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {e.printStackTrace();}} else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("Local-ClickCountApp",new Config(), builder.createTopology());}}private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig() {return KafkaSpoutConfig.builder(ClickCountApp.BOOTSTRAP_SERVERS, ClickCountApp.TOPIC_NAME).setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup").setRetry(getRetryService()).setOffsetCommitPeriodMs(10_000).build();}/*** 定义重试策略* @return KafkaSpoutRetryService*/private static KafkaSpoutRetryService getRetryService() {return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));}
}
  • 最近1分钟广告总点击量,每10s计算一次,并通过html展示
package cn.edu.neu.advertise;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.Map;/*** @author 32098* 最近1分钟广告总点击量,每10s计算一次*/
public class ClickNearlyMinuteBoltA extends BaseRichBolt {private OutputCollector collector;@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}@Overridepublic void execute(Tuple input) {try {String value = input.getStringByField("value");// 必须ack,否则会重复消费kafka中的消息collector.ack(input);System.out.println("Received from kafka: "+ value);String[] strs = value.split(" ");collector.emit(new Values(strs[0], strs[4]));}catch (Exception e){e.printStackTrace();collector.fail(input);}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("time_stamp", "aid"));}
}
package cn.edu.neu.advertise;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.Utils;
import org.apache.storm.windowing.TupleWindow;import java.awt.*;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.sql.Array;
import java.text.SimpleDateFormat;
import java.util.*;/*** @author 32098* 最近1分钟广告总点击量,每10s计算一次*/
public class ClickNearlyMinuteBoltB extends BaseWindowedBolt {private OutputCollector collector;private String projectRoot;@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector;this.projectRoot = System.getProperty("user.dir");}@Overridepublic void execute(TupleWindow tupleWindow) {Map<String, Integer> hmsClick = new TreeMap<>();for (Tuple tuple: tupleWindow.get()){String ts = tuple.getStringByField("time_stamp");String aid = tuple.getStringByField("aid");long timeStamp = Long.parseLong(ts.trim());String time = new SimpleDateFormat("HH:mm:ss").format(new Date(timeStamp));String[] hms = time.split(":");int s = (Integer.parseInt(hms[2])/10+1)*10;int m = Integer.parseInt(hms[1]);int h = Integer.parseInt(hms[0]);if(s == 60){m = m + 1;s = 0;if(m == 60){h = h + 1;if(h == 24){h = 0;}}}String hStr, mStr, sStr;if(h < 10){hStr = "0" + h;}else{hStr = String.valueOf(h);}if(m < 10){mStr = "0" + m;}else{mStr = String.valueOf(m);}if(s == 0){sStr = "00";}else{sStr = String.valueOf(s);}String hms_ = hStr+":"+mStr+":"+sStr;if(hmsClick.containsKey(hms_)){hmsClick.put(hms_, hmsClick.get(hms_)+1);}else{hmsClick.put(hms_, 1);}}String file = projectRoot + "/src/main/java/cn/edu/neu/advertise/advertise_click_nearly_minute.json";try {PrintWriter out = new PrintWriter(new FileWriter(new File(file), false));StringBuffer jsonStr = new StringBuffer("[");hmsClick.forEach((xtime, yclick) -> {String jsonElem = "{\"xtime\":\""+xtime+"\",\"yclick\":\""+yclick+"\"},";System.out.println(jsonElem);jsonStr.append(jsonElem);});jsonStr.deleteCharAt(jsonStr.length()-1);jsonStr.append("]");out.println(jsonStr.toString());out.flush();} catch (IOException e) {e.printStackTrace();}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {}
}

  继承 BaseWindowedBolt 以实现滑动窗口。

package cn.edu.neu.advertise;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.topology.base.BaseWindowedBolt.Duration;import java.util.concurrent.TimeUnit;/*** @author 32098*/
public class ClickNearlyMinuteApp {private static final String BOOTSTRAP_SERVERS = "master:9092";private static final String TOPIC_NAME = "advertise-user";public static void main(String[] args) {final TopologyBuilder builder = new TopologyBuilder();builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1);builder.setBolt("boltA", new ClickNearlyMinuteBoltA()).shuffleGrouping("kafka_spout");// windowbuilder.setBolt("boltB", new ClickNearlyMinuteBoltB().withWindow(new Duration(60, TimeUnit.SECONDS), new Duration(10, TimeUnit.SECONDS)),1).setNumTasks(1).fieldsGrouping("boltA", new Fields("time_stamp", "aid"));Config config = new Config();config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 70000);// 如果外部传参cluster则代表线上环境启动,否则代表本地启动if (args.length > 0 && "cluster".equals(args[0])) {try {StormSubmitter.submitTopology("Cluster-ClickNearlyMinuteApp", config, builder.createTopology());} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {e.printStackTrace();}} else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("Local-ClickNearlyMinuteApp",config, builder.createTopology());}}private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig() {return KafkaSpoutConfig.builder(ClickNearlyMinuteApp.BOOTSTRAP_SERVERS, ClickNearlyMinuteApp.TOPIC_NAME).setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup").setRetry(getRetryService()).setOffsetCommitPeriodMs(10_000).build();}/*** 定义重试策略* @return KafkaSpoutRetryService*/private static KafkaSpoutRetryService getRetryService() {return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));}
}

  java.lang.IllegalArgumentException: Window duration (length + sliding interval) value 70000 is more than topology.message.timeout.secs value 30000 (非法参数异常,滑动窗口大小加滑动间隔的值大于timeout的默认值)
  解决方法:config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 70000);


<html lang="en">
<head><meta charset="UTF-8"><title>最近1分钟广告总点击量,每10s计算一次title>

<script src="https://cdn.staticfile.org/echarts/4.3.0/echarts.min.js">script><script type="text/javascript" src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js">script>
head>
<body><div id="display" style="height: 450px; width:800px">div>
<script>var myChart = echarts.init(document.getElementById("display"));setInterval(function () {$.getJSON("advertise_click_nearly_minute.json",function(data){var x = [];var y = [];$.each(data,function (i,obj) {x.push(obj.xtime)y.push(obj.yclick)});var option = {xAxis:{type:"category",data:x},yAxis:{type:"value",},series:[{data:y,type:"line"}]};myChart.setOption(option)})},5000)
script>body>
html>
[
{"xtime":"22:28:20","yclick":"92"},
{"xtime":"22:28:30","yclick":"110"},
{"xtime":"22:28:40","yclick":"105"},
{"xtime":"22:28:50","yclick":"99"},
{"xtime":"22:29:00","yclick":"88"},
{"xtime":"22:29:10","yclick":"60"}
]

三、实验结果

  • 需求(1)
    在这里插入图片描述
    在这里插入图片描述
  • 需求(2)
    在这里插入图片描述
  • 需求(3)
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

在这里插入图片描述

基于Storm的最近一分钟广告总点击量(每10秒统计一次)视频


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部