快速用源码了解Flink的watermark 及Idle

快速用源码了解Flink的watermark及Idle

先回答两个问题
WaterMark(以下用wm 表示)的定义是什么,启到了什么作用?
根据wm的策略,产生 eventTimeStamp的wm 或systemTimeStamp 的wm。主要用来表征,数据流现在处理的数据到了哪儿。wm特征是一直递增,为流处理的触发启到一个参照物的作用。

wm 是从哪里产生的,如果数据源部分分区停了怎么办?
下游任务面对多个数据源分区是怎么获得wm。
wm 如果说是eventTime 产生的,那么数据源就是其产生的源头。数据源分区停了,分两种情况,第一种,该数据分区一开始就没有数据,从kafkaSource中可以看到
第二种在下游任务,在讲解Idle中体现。

//这里是 FlinkKafkaConsumerBase ,用来获取kafka 分区信息,如果所处的分区没有或没有数据,就将数据源表示为Idle。
if (subscribedPartitionsToStartOffsets.isEmpty()) {sourceContext.markAsTemporarilyIdle();}

Idle 的好处在于,
1.如果数据源的Task,需要处理多个数据分区
若,其中某个分区设置了Idle,就不在理会该分区的wm,而是从其它分区 轮询的找最小的wm。

//在CombinedWatermarkStatus 类中,明显可以看到,
//PartialWatermark 是分区的wm,
//如果PartialWatermark 没有设置Idle,则可以参与整体wm的选取。
//如果PartialWatermark 的分区没有数据,且没有设置Idle。会导致wm不再更新,因为是轮询找分区中最小的wmpublic boolean updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (PartialWatermark partialWatermark : partialWatermarks) {if (!partialWatermark.isIdle()) {minimumOverAllOutputs =Math.min(minimumOverAllOutputs, partialWatermark.getWatermark());allIdle = false;}hasOutputs = true;}

2.第二个好处,在于如果下游任务在接受到Idle 时,就不在处理该InputChannel里的wm。因为 下游任务会有多个上游数据源,每个InputChannel都独立的保存着每个源头的发来的wm,而下游任务整体的wm是轮询每个InputChannel 取最小的wm。

//这是StatusWatermarkValue类处理wm。
//可以看出,这里要比较两个wm,一个时inpuchannel的wm,一个整体的wm lastOutputWatermark。public void inputWatermark(Watermark watermark, int channelIndex, DataOutput<?> output)throws Exception {// ignore the input watermark if its input channel, or all input channels are idle (i.e.// overall the valve is idle).//如果整体没有Idle,且 分区也没有Idleif (lastOutputStreamStatus.isActive()&& channelStatuses[channelIndex].streamStatus.isActive()) {long watermarkMillis = watermark.getTimestamp();// if the input watermark's value is less than the last received watermark for its input// channel, ignore it also.//如果wm 大于channel的wmif (watermarkMillis > channelStatuses[channelIndex].watermark) {channelStatuses[channelIndex].watermark = watermarkMillis;// previously unaligned input channels are now aligned if its watermark has caught// up//如果wm 大于整体的wmif (!channelStatuses[channelIndex].isWatermarkAligned&& watermarkMillis >= lastOutputWatermark) {// 记住这里的标志位,这里就意味着当前wm 可以参与整体wm的选取channelStatuses[channelIndex].isWatermarkAligned = true;}// now, attempt to find a new min watermark across all aligned channels //这里用来获取整体wm。findAndOutputNewMinWatermarkAcrossAlignedChannels(output);}}}private void findAndOutputNewMinWatermarkAcrossAlignedChannels(DataOutput<?> output)throws Exception {long newMinWatermark = Long.MAX_VALUE;boolean hasAlignedChannels = false;// determine new overall watermark by considering only watermark-aligned channels across all// channelsfor (InputChannelStatus channelStatus : channelStatuses) {// 之前的标志位起了作用。if (channelStatus.isWatermarkAligned) {hasAlignedChannels = true;newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);}}// we acknowledge and output the new overall watermark if it really is aggregated// from some remaining aligned channel, and is also larger than the last output watermarkif (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {lastOutputWatermark = newMinWatermark;output.emitWatermark(new Watermark(lastOutputWatermark));}}

小小的总结下Idle,如果没有Idle,那么该分区wm 会一直停留在原地,按照,整体wm的选取原则,会取分区wm中最小的一个,那么整体wm也会停留在原地,无法更新。而如果一个分区设置了Idle 意味着,整体wm 的选取会跳过该分区wm,就不会导致整体wm无法更新的情况。
根据整体wm启到了触发参照物的作用,如果整体wm 不更新了,也就意味无法触发,这是很严重的。

接着要回答一个问题,如果数据源的分区,在产出数据后,接着一段时间后不再产生新的数据了,数据源会设置Idle么?这个处理方式最好是,在数据源后面加上.withIdleness(…)
加上后,数据源就会根据timeout的时间限制来检测是否要将数据源设为Idle。

如果,你对源码感兴趣请关注微信公众号 a解code 。


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部