Flink 实验:广告实时统计
一、实验描述
实验数据通过java代码生成,每条数据的格式如下:
时间戳 省份 城市 用户ID 广告ID
实验主要完成三个需求,即
(1) 实时统计每天各省市各广告的点击次数,并将其存入Mysql;
(2) 实现实时的动态黑名单机制,即把每天对某个广告点击超过60次的用户拉黑(黑名单用户ID存入Mysql);
(3) 最近1分钟广告总点击量,每10s计算一次,并通过html展示;
二、实验代码
- 数据模拟生成
package cn.edu.neu.experiment;import lombok.*;/*** @author 32098*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
public class AdvertiseClickBean {private String advertiseId;private Long clickTime;private String clickUserId;private String clickUserProvince;private String clickUserCity;
}
package cn.edu.neu.experiment;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.util.Arrays;
import java.util.List;
import java.util.Random;/*** @author 32098*/
public class AdvertiseClickMockDataSource extends RichParallelSourceFunction<AdvertiseClickBean> {private boolean keepMock;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);keepMock = true;}@Overridepublic void run(SourceContext<AdvertiseClickBean> sourceContext) throws Exception {List<String> provinceList = Arrays.asList("江西", "辽宁", "浙江", "广东", "湖南", "湖北", "吉林", "黑龙江", "福建");List<String> cityList = Arrays.asList("南昌","沈阳","杭州","广州","长沙","武汉","长春","哈尔滨","厦门");int len = provinceList.size();Random r = new Random();while (keepMock) {for(int i=0; i<r.nextInt(150); i++){int idx = r.nextInt(len);String aid = "Ad_" + r.nextInt(20);// 模拟数据延迟与乱序Long clickTime = System.currentTimeMillis() - r.nextInt(3)*1000;String clickUserId = "U" + r.nextInt(10);String clickUserProvince = provinceList.get(idx);String clickUserCity = cityList.get(idx);sourceContext.collect(new AdvertiseClickBean(aid, clickTime, clickUserId, clickUserProvince, clickUserCity));}Thread.sleep(6000);}}@Overridepublic void cancel() {keepMock = false;}
}
package cn.edu.neu.experiment;import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.util.Properties;/*** @author 32098*/
public class KafkaAdvertiseDataProducer {private static org.apache.flink.streaming.api.datastream.DataStreamSource<AdvertiseClickBean> DataStreamSource;public static void main(String[] args) throws Exception {// 1. env:创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);// 2. source:添加自定义产生广告点击模拟数据的SourceDataStreamSource<AdvertiseClickBean> advertiseClickDataStream = env.addSource(new AdvertiseClickMockDataSource());// 3. transformationSingleOutputStreamOperator<String> advertiseClickDataJsonStream = advertiseClickDataStream.map(new MapFunction<AdvertiseClickBean, String>() {@Overridepublic String map(AdvertiseClickBean advertiseClickBean) throws Exception {return JSON.toJSONString(advertiseClickBean);}});// 4. sink to kafkaProperties props = new Properties();props.setProperty("bootstrap.servers", "master:9092");FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka", new SimpleStringSchema(), props);advertiseClickDataJsonStream.addSink(kafkaSink);// 5. executeenv.execute();}
}
- 需求(1)实现
package cn.edu.neu.experiment.province_city_ad_click_count;import lombok.*;/*** @author 32098*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
public class AdvertiseClickData {private String clickTime;private String clickUserProvince;private String clickUserCity;private String advertiseId;private int clickCount;
}
package cn.edu.neu.experiment.province_city_ad_click_count;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;/*** @author 32098*/
public class MysqlSink extends RichSinkFunction<AdvertiseClickData>{private Connection conn = null;private PreparedStatement ps = null;@Overridepublic void open(Configuration parameters) throws Exception {conn = DriverManager.getConnection("jdbc:mysql://master:3306/user_advertise", "root", "Hive@2020");String sql = "insert into province_city_advertise(day,province,city,aid,count) values (?,?,?,?,?) on duplicate key update count=?";ps = conn.prepareStatement(sql);}@Overridepublic void invoke(AdvertiseClickData value, Context context) throws Exception {ps.setString(1, value.getClickTime());ps.setString(2, value.getClickUserProvince());ps.setString(3, value.getClickUserCity());ps.setString(4, value.getAdvertiseId());ps.setInt(5, value.getClickCount());ps.setInt(6, value.getClickCount());ps.executeUpdate();}@Overridepublic void close() throws Exception {if (conn != null) {conn.close();}if (ps != null) {ps.close();}}
}
package cn.edu.neu.experiment.province_city_ad_click_count;import cn.edu.neu.experiment.AdvertiseClickBean;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.Properties;/*** @author 32098*/
public class KafkaAdvertiseDataConsumerA {public static void main(String[] args) throws Exception {Properties pros = new Properties();pros.setProperty("bootstrap.servers", "master:9092");pros.setProperty("group.id", "flink");pros.setProperty("auto.offset.reset","latest");pros.setProperty("flink.partition-discovery.interval-millis","5000");pros.setProperty("enable.auto.commit", "true");pros.setProperty("auto.commit.interval.ms", "2000");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("flink_kafka",new SimpleStringSchema(),pros);kafkaSource.setStartFromLatest();// 1. env:创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// 2. sourceDataStreamSource<String> kafkaDataStream = env.addSource(kafkaSource);// 3. transformation// 3.1 to java objectSingleOutputStreamOperator<AdvertiseClickBean> advertiseClickDataStream = kafkaDataStream.map(new MapFunction<String, AdvertiseClickBean>() {@Overridepublic AdvertiseClickBean map(String s) throws Exception {return JSON.parseObject(s, AdvertiseClickBean.class);}});// 3.2 添加水位线DataStream<AdvertiseClickBean> adClickDataStream = advertiseClickDataStream.assignTimestampsAndWatermarks(WatermarkStrategy.<AdvertiseClickBean>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((adClickData, timestamp) -> adClickData.getClickTime()));// 3.3 map: 处理时间处理时间并选取需要的数据SingleOutputStreamOperator<AdvertiseClickData> dealtAdClickDs = adClickDataStream.map(new MapFunction<AdvertiseClickBean, AdvertiseClickData>() {@Overridepublic AdvertiseClickData map(AdvertiseClickBean advertiseClickBean) throws Exception {String ymd = new SimpleDateFormat("yyyy-MM-dd").format(new Date(advertiseClickBean.getClickTime()));return new AdvertiseClickData(ymd, advertiseClickBean.getClickUserProvince(), advertiseClickBean.getClickUserCity(), advertiseClickBean.getAdvertiseId(),1);}});// 3.4 创建视图tEnv.createTemporaryView("advertise_click_data",dealtAdClickDs,$("clickTime"),$("clickUserProvince"),$("clickUserCity"),$("advertiseId"),$("clickCount"));// 3.5 分组聚合Table resultTable = tEnv.sqlQuery("SELECT clickTime, clickUserProvince, clickUserCity, advertiseId, SUM(clickCount) as clickCount FROM advertise_click_data GROUP BY clickTime, clickUserProvince, clickUserCity, advertiseId");// 3.6DataStream<Tuple2<Boolean, AdvertiseClickData>> toConsoleDs = tEnv.toRetractStream(resultTable, AdvertiseClickData.class);DataStream<AdvertiseClickData> resultDs = tEnv.toRetractStream(resultTable, AdvertiseClickData.class).filter(record->record.f0).map(record->record.f1);// 4. sinkresultDs.addSink(new MysqlSink());toConsoleDs.print();// 5. executeenv.execute();}
}
- 需求(2)实现
package cn.edu.neu.experiment.advertise_blacklist;import lombok.*;/*** @author 32098*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
public class AdvertiseClickData {private String clickTime;private String clickUserId;private String advertiseId;private long clickCount;
}
package cn.edu.neu.experiment.advertise_blacklist;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;/*** @author 32098*/
public class MysqlSink extends RichSinkFunction<AdvertiseClickData> {private Connection conn = null;@Overridepublic void open(Configuration parameters) throws Exception {conn = DriverManager.getConnection("jdbc:mysql://master:3306/user_advertise", "root", "Hive@2020");}@Overridepublic void invoke(AdvertiseClickData value, Context context) throws Exception {PreparedStatement ps = conn.prepareStatement("select uid from black_list where uid=?");ps.setString(1, value.getClickUserId());ResultSet rs = ps.executeQuery();if(!rs.next()){String day = value.getClickTime();ps = conn.prepareStatement("select * from user_advertise where day=? and uid=? and aid=?");ps.setString(1, day);ps.setString(2, value.getClickUserId());ps.setString(3, value.getAdvertiseId());rs = ps.executeQuery();if(rs.next()){PreparedStatement psA = conn.prepareStatement("update user_advertise set count = ? where day=? and uid=? and aid=?");psA.setLong(1, value.getClickCount());psA.setString(2, day);psA.setString(3, value.getClickUserId());psA.setString(4, value.getAdvertiseId());psA.executeUpdate();psA.close();}else{PreparedStatement psB = conn.prepareStatement("insert into user_advertise(day,uid,aid,count) values (?,?,?,?)");psB.setString(1, day);psB.setString(2, value.getClickUserId());psB.setString(3, value.getAdvertiseId());psB.setLong(4, value.getClickCount());psB.executeUpdate();psB.close();}ps = conn.prepareStatement("select * from user_advertise where day=? and uid=? and aid=? and count>60");ps.setString(1, day);ps.setString(2, value.getClickUserId());ps.setString(3, value.getAdvertiseId());rs = ps.executeQuery();if(rs.next()){PreparedStatement psC = conn.prepareStatement("insert into black_list(uid) value(?) on duplicate key update uid=?");psC.setString(1, value.getClickUserId());psC.setString(2, value.getClickUserId());psC.executeUpdate();psC.close();}ps.close();}}@Overridepublic void close() throws Exception {if (conn != null) {conn.close();}}
}
package cn.edu.neu.experiment.advertise_blacklist;import cn.edu.neu.experiment.AdvertiseClickBean;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.Properties;import static org.apache.flink.table.api.Expressions.$;/*** @author 32098*/
public class KafkaAdvertiseDataConsumerB {public static void main(String[] args) throws Exception {Properties pros = new Properties();pros.setProperty("bootstrap.servers", "master:9092");pros.setProperty("group.id", "flink");pros.setProperty("auto.offset.reset","latest");pros.setProperty("flink.partition-discovery.interval-millis","5000");pros.setProperty("enable.auto.commit", "true");pros.setProperty("auto.commit.interval.ms", "2000");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("flink_kafka",new SimpleStringSchema(),pros);kafkaSource.setStartFromLatest();// 1. env:创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// 2. sourceDataStreamSource<String> kafkaDataStream = env.addSource(kafkaSource);// 3. transformation// 3.1 to java objectSingleOutputStreamOperator<AdvertiseClickBean> advertiseClickDataStream = kafkaDataStream.map(new MapFunction<String, AdvertiseClickBean>() {@Overridepublic AdvertiseClickBean map(String s) throws Exception {return JSON.parseObject(s, AdvertiseClickBean.class);}});// 3.2 添加水位线DataStream<AdvertiseClickBean> adClickDataStream = advertiseClickDataStream.assignTimestampsAndWatermarks(WatermarkStrategy.<AdvertiseClickBean>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((adClickData, timestamp) -> adClickData.getClickTime()));// 3.3 map: 处理时间并选取需要的数据SingleOutputStreamOperator<AdvertiseClickData> dealtAdClickDs = adClickDataStream.map(new MapFunction<AdvertiseClickBean, AdvertiseClickData>() {@Overridepublic AdvertiseClickData map(AdvertiseClickBean advertiseClickBean) throws Exception {String ymd = new SimpleDateFormat("yyyy-MM-dd").format(new Date(advertiseClickBean.getClickTime()));return new AdvertiseClickData(ymd, advertiseClickBean.getClickUserId(), advertiseClickBean.getAdvertiseId(), 1);}});// 3.4 创建视图tEnv.createTemporaryView("advertise_click_data",dealtAdClickDs,$("clickTime"),$("clickUserId"),$("advertiseId"),$("clickCount"));// 3.5 分组聚合Table resultTable = tEnv.sqlQuery("SELECT clickTime, clickUserId, advertiseId, SUM(clickCount) as clickCount FROM advertise_click_data GROUP BY clickTime, clickUserId, advertiseId");// 3.6DataStream<AdvertiseClickData> resultDs = tEnv.toRetractStream(resultTable, AdvertiseClickData.class).filter(record->record.f0).map(record->record.f1);// 4. sinkresultDs.addSink(new MysqlSink());// 5. executeenv.execute();}
}
- 需求(3)实现
package cn.edu.neu.experiment.advertise_click_count_nearly_minute;import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;/*** @author 32098*/
public class ClickTimeAggregate implements AggregateFunction<Tuple2<String, Long>, Long, Long> {/*** 创建累加器* @return 返回累加器初始值 0*/@Overridepublic Long createAccumulator() {return 0L;}/*** 点击次数往累加器加* @param in 输入* @param acc 当前的累加器值* @return 更新的累加器值*/@Overridepublic Long add(Tuple2<String, Long> in, Long acc) {return in.f1 + acc;}/*** 获取累加器的最终值* @param acc 累加器的最终值* @return 累加器的最终值*/@Overridepublic Long getResult(Long acc) {return acc;}/*** 合并各个subTask的结果*/@Overridepublic Long merge(Long accA, Long accB) {return accA + accB;}
}
package cn.edu.neu.experiment.advertise_click_count_nearly_minute;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;/*** Long: click time* Tuple2: Tuple2.of(advertiseId, click time)* String: key => advertiseId* @author 32098*/
public class AggregateDataCollect implements WindowFunction<Long, Tuple2<String, Long>, String, TimeWindow> {/**** @param s key => advertiseId* @param timeWindow timeWindow* @param input click time* @param collector collector* @throws Exception Exception*/@Overridepublic void apply(String s, TimeWindow timeWindow, Iterable<Long> input, Collector<Tuple2<String, Long>> collector) throws Exception {long clickTime = input.iterator().next();collector.collect(Tuple2.of(s, clickTime));}
}
package cn.edu.neu.experiment.advertise_click_count_nearly_minute;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Map;
import java.util.TreeMap;/*** @author 32098*/
public class WindowDataProcess extends ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow> {@Overridepublic void process(String s, Context context, Iterable<Tuple2<String, Long>> inputs, Collector<Tuple2<String, Long>> collector) throws Exception {Map<String, Long> adAndClickTime = new TreeMap<>();for (Tuple2<String, Long> input : inputs) {String key = input.f0;if(adAndClickTime.containsKey(key)){adAndClickTime.put(key, adAndClickTime.get(key)+input.f1);} else{adAndClickTime.put(key, input.f1);}}adAndClickTime.forEach((xtime, yclick) -> {String jsonElem = "{\"xtime\":\""+xtime+"\",\"yclick\":\""+yclick+"\"},";System.out.println(jsonElem);});}
}
package cn.edu.neu.experiment.advertise_click_count_nearly_minute;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.text.SimpleDateFormat;
import java.util.*;/*** @author 32098*/
public class JsonSink extends RichSinkFunction<Tuple2<String, Long>> {private TreeMap<String, Long> timeClick = null;private long lastInvokeTime = 0;private SimpleDateFormat dateFormat = null;@Overridepublic void open(Configuration parameters) throws Exception {timeClick = new TreeMap<String, Long>();dateFormat = new SimpleDateFormat("ss");lastInvokeTime = System.currentTimeMillis();}@Overridepublic void invoke(Tuple2<String, Long> value, Context context) throws Exception {long invokeTime = System.currentTimeMillis();if(Integer.parseInt(dateFormat.format(invokeTime)) - Integer.parseInt(dateFormat.format(lastInvokeTime))>1){writeToJson();}timeClick.put(value.f0, value.f1);lastInvokeTime = System.currentTimeMillis();
// if(timeClick.containsKey(value.f0)){
// return;
// }
// if(timeClick.size() == 6){
// timeClick.pollFirstEntry();
// }
// timeClick.put(value.f0, value.f1);
// writeToJson();}@Overridepublic void close() throws Exception {// adAndClickTime.clear();}public void writeToJson(){String projectRoot = System.getProperty("user.dir");String file = projectRoot + "/src/main/java/cn/edu/neu/experiment/advertise_click_count_nearly_minute/advertise_click_count_nearly_minute.json";try {PrintWriter out = new PrintWriter(new FileWriter(new File(file), false));StringBuffer jsonStr = new StringBuffer("[");// System.out.println(timeClick.size());timeClick.forEach((time, click) -> {String json = "{\"xtime\":\""+time+"\",\"yclick\":\""+click+"\"},";jsonStr.append(json);// System.out.println(json);});jsonStr.deleteCharAt(jsonStr.length()-1);jsonStr.append("]");out.println(jsonStr.toString());out.flush();out.close();timeClick.clear();} catch (IOException e) {e.printStackTrace();}}
}
package cn.edu.neu.experiment.advertise_click_count_nearly_minute;import cn.edu.neu.experiment.AdvertiseClickBean;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.Properties;import static org.apache.flink.table.api.Expressions.$;/*** @author 32098*/
public class KafkaAdvertiseDataConsumerC {@Data@AllArgsConstructor@NoArgsConstructorpublic static class TimeClickData{private Long clickTime;private String dealtTime;private Long click;}public static void main(String[] args) throws Exception {Properties pros = new Properties();pros.setProperty("bootstrap.servers", "master:9092");pros.setProperty("group.id", "flink");pros.setProperty("auto.offset.reset","latest");pros.setProperty("flink.partition-discovery.interval-millis","5000");pros.setProperty("enable.auto.commit", "true");pros.setProperty("auto.commit.interval.ms", "2000");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("flink_kafka",new SimpleStringSchema(),pros);kafkaSource.setStartFromLatest();// 1. env:创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// 2. sourceDataStreamSource<String> kafkaDataStream = env.addSource(kafkaSource);// 3. transformation// 3.1 to java objectSingleOutputStreamOperator<AdvertiseClickBean> advertiseClickDataStream = kafkaDataStream.map(new MapFunction<String, AdvertiseClickBean>() {@Overridepublic AdvertiseClickBean map(String s) throws Exception {return JSON.parseObject(s, AdvertiseClickBean.class);}});// 3.2 添加水位线DataStream<AdvertiseClickBean> adClickDataStream = advertiseClickDataStream.assignTimestampsAndWatermarks(WatermarkStrategy.<AdvertiseClickBean>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((adClickData, timestamp) -> adClickData.getClickTime()));// 3.3 处理事件时间:处理如下/*9s(1-10) => 10s13s(11-20) => 20s24s(21-30) => 30s32s(31-40) => 40s48s(41-50) => 50s56s(51-60) => 60s(0)(s / 10 (整除) + 1)*10 : (56/10+1)=60*/KeyedStream<Tuple2<String, Long>, String> adClickTimeKeyedDs = adClickDataStream.map(new MapFunction<AdvertiseClickBean, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(AdvertiseClickBean advertiseClickBean) throws Exception {long ts = advertiseClickBean.getClickTime();String time = new SimpleDateFormat("HH:mm:ss").format(new Date(ts));String[] hms = time.split(":");int s = (Integer.parseInt(hms[2])/10+1)*10;int m = Integer.parseInt(hms[1]);int h = Integer.parseInt(hms[0]);if(s == 60){m = m + 1;s = 0;if(m == 60){h = h + 1;if(h == 24){h = 0;}}}String hStr, mStr, sStr;if(h < 10){hStr = "0" + h;}else{hStr = String.valueOf(h);}if(m < 10){mStr = "0" + m;}else{mStr = String.valueOf(m);}if(s == 0){sStr = "00";}else{sStr = String.valueOf(s);}String hmsNew = hStr+":"+mStr+":"+sStr;return Tuple2.of(hmsNew, 1L);}}).keyBy(e -> e.f0);SingleOutputStreamOperator<Tuple2<String, Long>> resultA = adClickTimeKeyedDs.window(SlidingEventTimeWindows.of(Time.seconds(60), Time.seconds(10))).sum(1);SingleOutputStreamOperator<Tuple2<String, Long>> resultB = adClickTimeKeyedDs.window(SlidingEventTimeWindows.of(Time.seconds(60), Time.seconds(10))).reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> valueA, Tuple2<String, Long> valueB) throws Exception {return Tuple2.of(valueA.f0, valueA.f1+valueB.f1);}});SingleOutputStreamOperator<Tuple2<String, Long>> resultC = adClickTimeKeyedDs.window(SlidingEventTimeWindows.of(Time.seconds(60), Time.seconds(10))).aggregate(new ClickTimeAggregate(), new AggregateDataCollect());SingleOutputStreamOperator<Tuple2<String, Long>> resultD = adClickTimeKeyedDs.window(SlidingEventTimeWindows.of(Time.seconds(60), Time.seconds(10))).process(new WindowDataProcess());// 4. sinkresultC.addSink(new JsonSink());resultA.print();resultB.print();// resultC.print();resultD.print();// // 3~ transformation
// SingleOutputStreamOperator adClickTimeDs = adClickDataStream.map(new MapFunction() {
// @Override
// public TimeClickData map(AdvertiseClickBean advertiseClickBean) throws Exception {
// long ts = advertiseClickBean.getClickTime();
// String time = new SimpleDateFormat("HH:mm:ss").format(new Date(ts));
// String[] hms = time.split(":");
// int s = (Integer.parseInt(hms[2])/10+1)*10;
// int m = Integer.parseInt(hms[1]);
// int h = Integer.parseInt(hms[0]);
// if(s == 60){
// m = m + 1;
// s = 0;
// if(m == 60){
// h = h + 1;
// if(h == 24){
// h = 0;
// }
// }
// }
// String hStr, mStr, sStr;
// if(h < 10){
// hStr = "0" + h;
// }else{
// hStr = String.valueOf(h);
// }
// if(m < 10){
// mStr = "0" + m;
// }else{
// mStr = String.valueOf(m);
// }
// if(s == 0){
// sStr = "00";
// }else{
// sStr = String.valueOf(s);
// }
// String hmsNew = hStr+":"+mStr+":"+sStr;
// return new TimeClickData(ts, hmsNew, 1L);
// }
// });
//
// tEnv.createTemporaryView("t_time_click", adClickTimeDs, $("clickTime").rowtime(), $("dealtTime"), $("click"));
// Table tempTable = tEnv.sqlQuery("SELECT dealtTime, count(click) as total_click FROM t_time_click GROUP BY dealtTime, HOP(clickTime, interval '10' SECOND, interval '60' SECOND) ORDER BY dealtTime DESC LIMIT 24");
// SingleOutputStreamOperator resultStream = tEnv.toRetractStream(tempTable, Row.class).filter(f -> f.f0).map(f -> f.f1);
//
// // 4~ sink
// resultStream.print();// 5. executeenv.execute();}
}
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>最近1分钟广告总点击量</title>
<!-- <script src="echarts.js"></script>-->
<!-- <script type="text/javascript" src="jquery-1.9.0.min.js"></script>--><script src="https://cdn.staticfile.org/echarts/4.3.0/echarts.min.js"></script><script type="text/javascript" src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js"></script>
</head>
<body><div id="display" style="height: 450px; width:800px; position: relative; left: 24%"></div>
<script>var myChart = echarts.init(document.getElementById("display"));setInterval(function () {$.getJSON("advertise_click_count_nearly_minute.json",function(data){var x = [];var y = [];$.each(data,function (i,obj) {x.push(obj.xtime)y.push(obj.yclick)});var option = {xAxis:{type:"category",data:x},yAxis:{type:"value",},series: [{data:y,type:"line",smooth:false,color:"steelblue",},{data:y,type:"bar",barWidth: 50,color: "lightblue"}]};myChart.setOption(option)})},5000)
</script></body>
</html>
三、实验结果
-
需求(1)

-
需求(2)


-
需求(3)



基于Flink的最近1分钟每10s统计一次的广告点击量动态变化视频
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
