flink 自定义 窗口_【Flink 精选】阐述 Watermark 机制,剖析 Watermark 的产生和传递流程...

af7356daa06ed76f60e8115f8b2842a2.png

本文阐述 Flink 的事件时间和 Watermark 机制,剖析 Watermark 产生和传递的流程。


1 Event time 和 Watermark 的关系

1.1 Event time 和 Processing time介绍

Event time 事件时间和Processing time 处理时间主要区别是产生时间不同,前者是事件的实际发生时间,后者是机器的系统处理时间,如下图所示。

5c18c407759d3ce67e0dad69e1274c74.png

① Event time 事件时间事件在其设备上发生的时间

Event time 是事件在进入 Flink 之前已经嵌入到记录的时间,其大小取决于事件本身,与网络延时、系统时区等因素无关。

② Processing time 处理时间:作业正在执行相应操作机器系统时间

Processing time 提供了最佳的性能和最低的延迟,但是不能提供确定性,即计算结果是不确定的。 例如,时间窗口为5min的求和统计,应用程序在 9:00 开始运行,则第一个时间窗口处理 [9:00, 9:05) 的事件,下一个窗口处理 [9:05, 9:10) 的事件,依此类推。通信延迟、作业故障重启等问题,可能导致窗口的计算结果是不一样的。如下图所示,假设事件(事件时间, 数值) 遇到上述问题,场景一:事件 B 有网络延迟落在[9:10, 9:15),场景二:作业故障重启导致事件 A 和事件 B落在[9:10, 9:15)。

c1b10213f1a3c91b85ae5353bf6a9e95.png

1.2 Event time 和 Watermark

问题:Flink 支持事件时间,如何测量事件时间的进度?例如,5min 的事件时间窗口,当事件时间超过 5min 时,需要通知 Flink 触发窗口计算。解答:Watermark 机制

Watermark 本质是时间戳,与业务数据一样无差别地传递下去,目的是衡量事件时间的进度(通知 Flink 触发事件时间相关的操作,例如窗口)。

说明: Watermark(T) 表示目前系统的时间事件是 T,即系统后续没有 T'
/*** 1.Watermark 是一个时间戳, 它表示小于该时间戳的事件都已经到达了。* 2.Watermark 一般情况在源位置产生(也可以在流图中的其它节点产生), 通过流图节点传播。* 3.Watermark 也是 StreamElement, 和普通数据一起在算子之间传递。* 4.Watermark 可以触发窗口计算, 时间戳为 Long.MAX_VALUE 表示算子后续没有任何数据。*/
public final class Watermark extends StreamElement {// 省略.../*** The timestamp of the watermark in milliseconds.*/private final long timestamp;/*** Creates a new watermark with the given timestamp in milliseconds.*/public Watermark(long timestamp) {this.timestamp = timestamp;}/*** Returns the timestamp associated with this {@link Watermark} in milliseconds.*/public long getTimestamp() {return timestamp;}// 省略...
}

如下图所示,事件 Event 是按照事件时间 EventTime 顺序上报的。

19d8c1e1729e5aea3aab3677e76b20d1.png

如下图所示,事件 Event 是不按照事件时间 EventTime 乱序上报的。

aeb473b1f55260e42f9b559a069d6265.png

2 Watermark 的产生

2.1 Watermark 类型

说明:flink-1.12 支持 WatermarkStrategy 和 WatermarkGenerator

flink 采用 WatermarkStrategy 设置自定义 Watermark 类型,WatermarkGenerator 是 Watermark 的基类。flink 实现了 Punctuated Watermarks 从事件获取事件的时间戳、Periodic Watermarks 周期获取事件的时间戳。

/*** The {@code WatermarkGenerator} generates watermarks either based on events or* periodically (in a fixed interval).** 

Note: This WatermarkGenerator subsumes the previous distinction between the* {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.*/ @Public public interface WatermarkGenerator {/*** 从事件获取事件的时间戳*/void onEvent(T event, long eventTimestamp, WatermarkOutput output);/*** 周期获取事件的时间戳*/void onPeriodicEmit(WatermarkOutput output); }

使用 WatermarkStrategy 的样例,如下代码。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream input = env.fromElements("data");// 使用 WatermarkStrategy 设置 Watermark 类型input.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(10)));

2.2 Watermark 的产生

Watermark 是算子 TimestampsAndWatermarksOperator 产生的,WatermarkStrategy 相当于 UDFFunction(封装于TimestampsAndWatermarksOperator 内部)。processElement 方法实现事件产生 Watermark,processWatermark 方法阻断上游传过来的 Watermark,onProcessingTime 方法实现周期产生 Watermark。

public class TimestampsAndWatermarksOperatorextends AbstractStreamOperatorimplements OneInputStreamOperator, ProcessingTimeCallback {
// 省略...@Overridepublic void processElement(final StreamRecord element) throws Exception {final T event = element.getValue();final long previousTimestamp = element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE;final long newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp);element.setTimestamp(newTimestamp);output.collect(element);// 事件产生 WatermarkwatermarkGenerator.onEvent(event, newTimestamp, wmOutput);}// 阻断上游传过来的 watermark@Overridepublic void processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) throws Exception {// if we receive a Long.MAX_VALUE watermark we forward it since it is used// to signal the end of input and to not block watermark progress downstreamif (mark.getTimestamp() == Long.MAX_VALUE) {wmOutput.emitWatermark(Watermark.MAX_WATERMARK);}}@Overridepublic void onProcessingTime(long timestamp) throws Exception {// 采用定时器, 周期产生 WatermarkwatermarkGenerator.onPeriodicEmit(wmOutput);final long now = getProcessingTimeService().getCurrentProcessingTime();// 更新定时器getProcessingTimeService().registerTimer(now + watermarkInterval, this);}
// 省略...
}

(1)Watermark 周期产生

public class TimePeriodicWatermarkGenerator implements WatermarkGenerator {private final long maxTimeLag = 5000; // 5 seconds@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {// don't need to do anything because we work on processing time}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));}
}
结合算子 TimestampsAndWatermarksOperator 和 TimePeriodicWatermarkGenerator,分析 Watermark 的产生流程。如下图所示,横轴表示 processing time,圆形表示事件,圆形中的时间 t 表示事件时间,圆形落在横轴表示事件在算子中的处理,其中 Watermark 的产生周期为 60s 和允许延迟时间为 10s。以第一个周期 [0,60) 为例,获取事件中的最大事件时间 max,向下游发送 watermark(最大事件时间 - 允许延迟时间 - 1)

39eca6e715e1fa8ffc6e5e794931ac47.png

(2)Watermark 事件产生

public class PunctuatedAssigner implements WatermarkGenerator {@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {if (event.hasWatermarkMarker()) {output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));}}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// don't need to do anything because we emit in reaction to events above}
}

3 Watermark 的传递

Watermark 的传递方式是广播,即广播方式发送到下游。Watermark 与业务数据一样,无差别地传递下去。

8bb0efe2b3c857f12bc3b4657a25a36b.png
例子:多并发的场景下,Watermark 是 source task 产生,经过 keyby 分组后触发窗口计算。 说明:① Watermark 要单调递增。② 如果算子有多个上游(广播)即输入多个 Watermark(T),则该算子取最小 Watermark 即 min(Watermark(T1), Watermark(T2))

78c600e2f66c408c265b0752f2b1d1d6.png

从 WindowOperator 源码分析窗口是如何传递 Watermark。 首先分析 WindowOperator 类图,可知 WindowOperator 间接继承AbstractStreamOperator,而 AbstractStreamOperator 实现了接口 Input 的 processWatermark 方法、接口 TwoInputStreamOperator 的 processWatermark1 方法 和 processWatermark2 方法。

799d875ccf6dca526d54a5fe094fc5f7.png

接着分析一下 AbstractStreamOperator 实现的 processWatermark 、processWatermark1 和 processWatermark2。

// 省略 ....public void processWatermark(Watermark mark) throws Exception {if (timeServiceManager != null) {timeServiceManager.advanceWatermark(mark);}// 发送 watermarkoutput.emitWatermark(mark);}/*** 2个上游的watermark* 计算最小watermark, 并设置为当前算子的watermark*/public void processWatermark1(Watermark mark) throws Exception {input1Watermark = mark.getTimestamp();long newMin = Math.min(input1Watermark, input2Watermark);if (newMin > combinedWatermark) {combinedWatermark = newMin;processWatermark(new Watermark(combinedWatermark));}}/*** 2个上游的watermark* 计算最小watermark, 并设置为当前算子的watermark*/public void processWatermark2(Watermark mark) throws Exception {input2Watermark = mark.getTimestamp();long newMin = Math.min(input1Watermark, input2Watermark);if (newMin > combinedWatermark) {combinedWatermark = newMin;processWatermark(new Watermark(combinedWatermark));}}
// 省略 ....


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部