flink 时间窗需要设置断链

tag:简单记录,回头整理

flink带有时间窗的任务设置并发度从1改为大于1的值,并从savepoint恢复任务会出现 并发度大于maxParallism(1)的报错,是因为当时间窗的算子与其前后算子并发度相同时,其会生成一个链,而时间窗的并发度为1时,其最大并发度也会变成1,这样整个链条中的最大并发度也随着为1,savepoint保存maxParallism也为1,当将时间窗或与其生成同一链条的算子并发度调整时,总是会超过maxParallism导致报错,所以当算子中有时间窗算子时,必须给时间窗设置断链,即:

 // 增加时间窗DataStream> dataBaseStream =normalRowStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(windowTime))).process(new ProcessAllWindowFunction, TimeWindow>() {@Overridepublic void process(Context context, Iterable iterable,Collector> collector) {List arrayList = new ArrayList<>();iterable.forEach(arrayList::add);if (!arrayList.isEmpty()) {collector.collect(arrayList);}}}).uid("TumblingProcessingTimeWindows" + dbType + i).name("TumblingProcessing窗口")// 算子设置断链.disableChaining();


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部