Flink学习之路-双11实时大屏背后指标计算
实时分析场景中,实时大屏似乎永远都是那最璀璨的星星。其中每年的天猫双11实时大屏几乎是家喻户晓。今天就带大家一起来看看双11实时大屏指标是如何计算的。一定要动手实现一遍,在动手过程中会发现一些问题,通过不断解决问题,才能夯实知识理解。
通过本篇文章做到深入掌握如下知识点,欢迎探讨,共同学习和进步。
1、checkpoing应用
2、kafka数据源连接器构造,包含重要的水位线设置
3、key window窗口函数的应用
4、topK统计
双11实时大屏的指标计算
这里就做一个最简单的模拟天猫实时大屏的小例子,需求如下:
1、实时计算出当天截止到当前时间的累计销售额
2、统计销售额top3的品类
3、每秒钟更新一次统计结果

开发设计
1、订单交易数据接入kafka ods_order_rt topic中,涉及到的字段主要有category、amount和update_time,使用update_time作为记录的事件时间。
2、可以使用key window + 聚合处理函数来首先统计当天各品类的累积销售额,每秒输出一次结果。这种方式的好处是无需保留历史事件数据,大大降低窗口状态。
3、把第2步每秒输出的结果作为一个整体窗口进行计算,输出累积销售额和销售额top3的品类。整个指标计算就在这个窗口处理函数中完成,并将结果数据保存在如下mysql表中。这里有一个巧妙的设计是,为了把第2步每秒输出结果作为一个整体窗口进行计算,在品类销售额记录中增加了一个创建时间字段,字段格式到秒,然后再按时间字段作为key聚合窗口。
CREATE TABLE IF NOT EXISTS `ads_order_gmv_rt`(`stats_time` VARCHAR(20) NOT NULL,`gmv` DOUBLE,`top1_category` VARCHAR(20),`top1_category_amount` DOUBLE,`top2_category` VARCHAR(20),`top2_category_amount` DOUBLE,`top3_category` VARCHAR(20),`top3_category_amount` DOUBLE,PRIMARY KEY ( `stats_time` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
代码实现
public class TianMaoGmv {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(60000); // 设置检查点执行间隔为1分env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 设置事件时间类型// kafka配置属性Properties props = new Properties();props.setProperty("bootstrap.servers", "localhost:9092");props.setProperty("group.id", "demo");// 构造用户数据源连接器FlinkKafkaConsumer011 orderConsumer =new FlinkKafkaConsumer011("ods_order_rt", //设置订单源数据主题new JSONKeyValueDeserializationSchema(false),props);orderConsumer.assignTimestampsAndWatermarks(new OrderMessagePeriodicWatermarks());orderConsumer.setStartFromLatest();// 构造订单流DataStreamSource orderDataStreamSource = env.addSource(orderConsumer);SingleOutputStreamOperator categoryAgg = orderDataStreamSource.map(new MapFunction>() {@Overridepublic Tuple2 map(ObjectNode objectNode) throws Exception {return new Tuple2(objectNode.get("value").get("category").asText(), objectNode.get("value").get("amount").asDouble());}}).keyBy(0).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).trigger(ContinuousEventTimeTrigger.of(Time.seconds(1))).aggregate(new AmountAgg(), new WindowResult());categoryAgg.keyBy(x -> x.getCreateTime()).window(TumblingProcessingTimeWindows.of(Time.seconds(1))).process(new ProcessWindowFunction() {/**** 这里做最后的统计:* 把各个分类的总额加起来,就是当天截止到目前的总销售金额* 这里使用优先级队列解决分类前3销售额*/@Overridepublic void process(String s, Context context, Iterable iterable, Collector collector) throws Exception {// 创建小顶堆Queue queue = new PriorityQueue<>(3, (x, y) -> x.getTotalAmount() >= y.getTotalAmount() ? 1 : -1);GmvResult gmvResult = new GmvResult();SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");gmvResult.setStatsTime(simpleDateFormat.format(new Date(context.window().getStart())));double totalAmount = 0D;for (CategorySummary item : iterable) {if (queue.size() < 3) {queue.add(item);} else if (item.getTotalAmount() > queue.peek().getTotalAmount()) {// 若当前循环元素值大于小顶堆堆顶元素,则需要移除堆顶元素并添加当前循环元素queue.poll();queue.add(item);}totalAmount += item.getTotalAmount();}gmvResult.setGmv(totalAmount);CategorySummary categorySummary = queue.poll();gmvResult.setTop3Category(new Tuple2(categorySummary.getCategory(), categorySummary.getTotalAmount()));categorySummary = queue.poll();gmvResult.setTop2Category(new Tuple2(categorySummary.getCategory(), categorySummary.getTotalAmount()));categorySummary = queue.poll();gmvResult.setTop1Category(new Tuple2(categorySummary.getCategory(), categorySummary.getTotalAmount()));collector.collect(gmvResult);}}).addSink(getMysqlSink("INSERT INTO ads_order_gmv_rt(stats_time,gmv,top1_category,top1_category_amount,top2_category,top2_category_amount,top3_category,top3_category_amount) VALUES(?,?,?,?,?,?,?,?)"));env.execute("flink kafka gmv sample");}private static SinkFunction getMysqlSink(String sql) {return JdbcSink.sink(sql,new JdbcStatementBuilder() {@Overridepublic void accept(PreparedStatement preparedStatement, GmvResult gmvResult) throws SQLException {preparedStatement.setString(1, gmvResult.getStatsTime());preparedStatement.setDouble(2, gmvResult.getGmv());preparedStatement.setString(3, gmvResult.getTop1Category().f0);preparedStatement.setDouble(4, gmvResult.getTop1Category().f1);preparedStatement.setString(5, gmvResult.getTop2Category().f0);preparedStatement.setDouble(6, gmvResult.getTop2Category().f1);preparedStatement.setString(7, gmvResult.getTop3Category().f0);preparedStatement.setDouble(8, gmvResult.getTop3Category().f1);}},JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).withMaxRetries(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/traffic?useSSL=false").withDriverName("com.mysql.jdbc.Driver").withUsername("root").withPassword("").build());}private static class OrderMessagePeriodicWatermarks implements AssignerWithPeriodicWatermarks {private long lastTs = Long.MIN_VALUE;@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(lastTs);}@Overridepublic long extractTimestamp(ObjectNode jsonNodes, long l) {lastTs = jsonNodes.get("value").get("update_time").asLong();return lastTs;}}private static class AmountAgg implements AggregateFunction, Double, Double> {@Overridepublic Double createAccumulator() {return 0D;}@Overridepublic Double add(Tuple2 stringDoubleTuple2, Double aDouble) {return aDouble + stringDoubleTuple2.f1;}@Overridepublic Double getResult(Double aDouble) {return aDouble;}@Overridepublic Double merge(Double aDouble, Double acc1) {return aDouble + acc1;}}private static class CategorySummary {private String category;private double totalAmount;private String createTime;public String getCategory() {return category;}public void setCategory(String category) {this.category = category;}public double getTotalAmount() {return totalAmount;}public void setTotalAmount(double totalAmount) {this.totalAmount = totalAmount;}public String getCreateTime() {return createTime;}public void setCreateTime(String createTime) {this.createTime = createTime;}}private static class GmvResult {private String statsTime;private Double gmv;private Tuple2 top1Category;private Tuple2 top2Category;private Tuple2 top3Category;public String getStatsTime() {return statsTime;}public void setStatsTime(String statsTime) {this.statsTime = statsTime;}public Double getGmv() {return gmv;}public void setGmv(Double gmv) {this.gmv = gmv;}public Tuple2 getTop1Category() {return top1Category;}public void setTop1Category(Tuple2 top1Category) {this.top1Category = top1Category;}public Tuple2 getTop2Category() {return top2Category;}public void setTop2Category(Tuple2 top2Category) {this.top2Category = top2Category;}public Tuple2 getTop3Category() {return top3Category;}public void setTop3Category(Tuple2 top3Category) {this.top3Category = top3Category;}}private static class WindowResult implements WindowFunction {SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Overridepublic void apply(Tuple tuple, TimeWindow timeWindow, Iterable iterable, Collector collector) throws Exception {CategorySummary categorySummary = new CategorySummary();categorySummary.setCategory(((Tuple1)tuple).f0);categorySummary.setTotalAmount(iterable.iterator().next());categorySummary.setCreateTime(simpleDateFormat.format(new Date()));collector.collect(categorySummary);}}}public class GmvProducerTest {public static void main(String[] args) throws Exception {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 5000);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer
遇到的坑
1、categoryAgg.keyBy(x -> x.getCreateTime())
.window(TumblingProcessingTimeWindows.of(Time.seconds(1))) 这段代码的第一个版本用的是TumblingEventTimeWindows,发现一直不会触发窗口计算,改成TumblingProcessingTimeWindows就可以正常触发窗口计算了。
答:categoryAgg使用TumblingEventTimeWindows发现无法触发窗口计算,原因是flink系统并不知道categoryAgg流的记录时间戳,针对categoryAgg调用assignTimestampsAndWatermarks设置时间戳和水位线,即可正常运行。
2、明明是一秒输出一次,为什么保存到数据库中的stats_time字段值不是按秒连续的呢?
答:flink系统是根据记录创建窗口或分配窗口的,若在当前的一秒内,无新记录产生,则无需创建新的窗口。

欢迎关注我的个人公众号。

本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
