一文搞懂 Flink Timer

什么是 Timer

顾名思义就是 Flink 内部的定时器,与 key 和 timestamp 相关,相同的 key 和 timestamp 只有一个与之对应的 timer。timer 本质上是通过 ScheduledThreadPoolExecutor.schedule 来实现的

Flink synchronizes invocations of onTimer() and processElement(). Hence, users do not have to worry about concurrent modification of state.

虽然这是官方文档中的一句话,但正确性有待商议
ps: 这是源码中的一句话

Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using the timer service, timer callbacks are also guaranteed not to be called concurrently with methods on {@code StreamOperator}.

真实的事件样例,告诉我 只要 key 不同是不会并发修改的,如果一直都是完全相同的 key ,比如我的 key 一直都是 1,完全是会并发修改的。key 的重复性越高,并发修改的可能性就越大,除非 rocksdb 自身保证同一个每个 key ( rocksdb key ) 的事务。
当然啦,前提是 timer 方法与 processElement 方法操作同一个 state 对象, 比如都操作 ListState

Timer 的使用

public class KeyedProcessFunctionImp extends KeyedProcessFunction<String, Tuple2<String, Object>, Tuple2<String, String>> {@Overridepublic void open(Configuration parameters) throws Exception {}@Overridepublic void close() throws Exception {}@Overridepublic void processElement(Tuple2<String, Object> stringObjectTuple2, Context context, Collector<Tuple2<String, String>> collector) throws Exception {System.out.println("注册一个 timer");long currentProcessingTime = context.timerService().currentProcessingTime() / 1000 * 1000 + 60 * 1000;context.timerService().registerProcessingTimeTimer(currentProcessingTime);}@Override//TODO timer 与 process 同时发生public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {System.out.println("我是一个 timer");}}

Timer的存储

Timer 会存储到 key state backend 中,并且会做 checkpoint ,失败会恢复。

Timer的源码分析

context.timerService().registerProcessingTimeTimer(currentProcessingTime); 会直接调用 InternalTimerServiceImpl.registerProcessingTimeTimer 方法

public void registerProcessingTimeTimer(N namespace, long time) {InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;// check if we need to re-schedule our timer to earlierif (time < nextTriggerTime) {if (nextTimer != null) {nextTimer.cancel(false);}//registerProcessingTimeTimer 定时调用 onProcessingTimer 调用,// 最终调用 triggerTarget.onProcessingTimer,比如 windowOperator.onProcessingTimer// ScheduledThreadPoolExecutor.schedulenextTimer = processingTimeService.registerTimer(time, this);}}}

processingTimeTimersQueue 可以保证相同的 key 和 time 对应的 timer 只会注册一次。我们以 rocksdb 为例看细节

@Override// 按照时间戳的顺序添加的,时间戳越大优先级越低public boolean add(@Nonnull E toAdd) {//会依据条件将 rocksdb 中 store 的 timer 存储到 orderedCache 中checkRefillCacheFromStore();final byte[] toAddBytes = serializeElement(toAdd);// 默认 128// orderedCache 通过 treeSet 进行操作的final boolean cacheFull = orderedCache.isFull();if ((!cacheFull && allElementsInCache) ||LEXICOGRAPHIC_BYTE_COMPARATOR.compare(toAddBytes, orderedCache.peekLast()) < 0) {if (cacheFull) {// we drop the element with lowest priority from the cacheorderedCache.pollLast();// the dropped element is now only in the storeallElementsInCache = false;}if (orderedCache.add(toAddBytes)) {// write-through syncaddToRocksDB(toAddBytes);if (toAddBytes == orderedCache.peekFirst()) {peekCache = null;return true;}}} else {// we only added to the storeaddToRocksDB(toAddBytes);allElementsInCache = false;}return false;}

orderedCache 是通过 treeSet 来实现的,所以 time + key + namespace (非window 是固定不变的) 为 treeMap 的 key 。新来的 timer 除了添加到 orderedCache 外还会添加到 rocksdb。
添加完成之后,就正式开始注册 定时任务了。当定时任务开始执行时,调用

@Override// registerProcessingTimeTimer 定时调用 onProcessingTime// time 设定的那个 timestamppublic void onProcessingTime(long time) throws Exception {// null out the timer in case the Triggerable calls registerProcessingTimeTimer()// inside the callback.nextTimer = null;InternalTimer<K, N> timer;// 小于这个 time 的所有 timer 都会被触发while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {processingTimeTimersQueue.poll();keyContext.setCurrentKey(timer.getKey());// windowOperator onProcessingTime// 自己定义的 timertriggerTarget.onProcessingTime(timer);}if (timer != null && nextTimer == null) {//再次创建 timernextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);}}

至此的话,自己写的 Timer 方法就被执行了。
当然还有一些更细节的东西,比如 timer restore ,timer snapshot ,startTimerService 等读者自己可以依需查看

Timer的其他情况

Window Operator 的 Timer 与此类似


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部