flink window的early计算

Tumbing Windows:滚动窗口,窗口之间时间点不重叠。它是按照固定的时间,或固定的事件个数划分的,分别可以叫做滚动时间窗口和滚动事件窗口。
Sliding Windows:滑动窗口,窗口之间时间点存在重叠。对于某些应用,它们需要的时间是不间断的,需要平滑的进行窗口聚合。

         例如,可以每30s记算一次最近1分钟用户所购买的商品数量的总数,这个就是时间滑动窗口;或者每10个客户点击购买,然后就计算一下最近100个客户购买的商品的总和,这个就是事件滑动窗口。
Session Windows:会话窗口,经过一段设置时间无数据认为窗口完成。

在默认的场景下,所有的窗口都是到达时间语义上的windown end time后触发对整个窗口元素的计算,但是在部分场景的情况下,业务方需要在窗口时间没有结束的情况下也可以获得当前的聚合结果,比如每隔五分钟获取当前小时的sum值,这种情况下,官方提供了对于上述窗口的定制化计算器ContinuousEventTimeTriggerContinuousProcessingTimeTrigger

下面是一个使用ContinuousProcessingTimeTrigger的简单例子:

public class ContinueTriggerDemo {public static void main(String[] args) throws Exception {// TODO Auto-generated method stub
String hostName = "localhost";Integer port = Integer.parseInt("8001");;// set up the execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从指定socket获取输入数据DataStream text = env.socketTextStream(hostName, port);text.flatMap(new LineSplitter()) //数据语句分词.keyBy(0) // 流按照单词分区.window(TumblingProcessingTimeWindows.of(Time.seconds(120)))// 设置一个120s的滚动窗口.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))//窗口每统计一次当前计算结果.sum(1)// count求和.map(new Mapdemo())//输出结果加上时间戳
                .print();env.execute("Java WordCount from SocketTextStream Example");}/*** Implements the string tokenizer that splits sentences into words as a* user-defined FlatMapFunction. The function takes a line (String) and* splits it into multiple pairs in the form of "(word,1)" (Tuple2).*/public static final class LineSplitter implementsFlatMapFunction> {@Overridepublic void flatMap(String value, Collector> out) {// normalize and split the lineString[] tokens = value.toLowerCase().split("\\W+");// emit the pairsfor (String token : tokens) {if (token.length() > 0) {out.collect(new Tuple2(token, 1));}}}}public static final class MapdemoimplementsMapFunction, Tuple3> {@Overridepublic Tuple3 map(Tuple2 value)throws Exception {// TODO Auto-generated method stub
DateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String s = format2.format(new Date());return new Tuple3(value.f0, s, value.f1);}}}

在本地启动端口 :nc -lk 8001 并启动flink程序
输入数据:

           aaaabb

观察程序数据结果日志

5> (aa,2018-07-30 16:08:20,2)
5> (bb,2018-07-30 16:08:20,1)
5> (aa,2018-07-30 16:08:40,2)
5> (bb,2018-07-30 16:08:40,1)
5> (aa,2018-07-30 16:09:00,2)
5> (bb,2018-07-30 16:09:00,1)
5> (aa,2018-07-30 16:09:20,2)
5> (bb,2018-07-30 16:09:20,1)
5> (aa,2018-07-30 16:09:40,2)
5> (bb,2018-07-30 16:09:40,1)

在上述输入后继续输入

    aa

日志结果统计为

5> (aa,2018-07-30 16:10:00,3)
5> (bb,2018-07-30 16:10:00,1)

根据日志数据可见,flink轻松实现了一个窗口时间长度为120s并每20s向下游发送一次窗口当前聚合结果的功能。

附源码:

源码路径:flink\flink-streaming-java\src\main\java\org\apache\flink\streaming\api\windowing\triggers\ContinuousProcessingTimeTrigger.java

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.streaming.api.windowing.triggers;import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;/*** A {@link Trigger} that continuously fires based on a given time interval as measured by* the clock of the machine on which the job is running.** @param  The type of {@link Window Windows} on which this trigger can operate.*/
@PublicEvolving
public class ContinuousProcessingTimeTrigger extends Trigger {private static final long serialVersionUID = 1L;private final long interval;/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */private final ReducingStateDescriptor stateDesc =new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);private ContinuousProcessingTimeTrigger(long interval) {this.interval = interval;}@Overridepublic TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc);timestamp = ctx.getCurrentProcessingTime();if (fireTimestamp.get() == null) {long start = timestamp - (timestamp % interval);long nextFireTimestamp = start + interval;ctx.registerProcessingTimeTimer(nextFireTimestamp);fireTimestamp.add(nextFireTimestamp);return TriggerResult.CONTINUE;}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc);if (fireTimestamp.get().equals(time)) {fireTimestamp.clear();fireTimestamp.add(time + interval);ctx.registerProcessingTimeTimer(time + interval);return TriggerResult.FIRE;}return TriggerResult.CONTINUE;}@Overridepublic void clear(W window, TriggerContext ctx) throws Exception {ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc);long timestamp = fireTimestamp.get();ctx.deleteProcessingTimeTimer(timestamp);fireTimestamp.clear();}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(W window,OnMergeContext ctx) {ctx.mergePartitionedState(stateDesc);}@VisibleForTestingpublic long getInterval() {return interval;}@Overridepublic String toString() {return "ContinuousProcessingTimeTrigger(" + interval + ")";}/*** Creates a trigger that continuously fires based on the given interval.** @param interval The time interval at which to fire.* @param  The type of {@link Window Windows} on which this trigger can operate.*/public static  ContinuousProcessingTimeTrigger of(Time interval) {return new ContinuousProcessingTimeTrigger<>(interval.toMilliseconds());}private static class Min implements ReduceFunction {private static final long serialVersionUID = 1L;@Overridepublic Long reduce(Long value1, Long value2) throws Exception {return Math.min(value1, value2);}}
}

源码路径:flink\flink-streaming-java\src\main\java\org\apache\flink\streaming\api\windowing\triggers\ContinuousEventTimeTrigger.java

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**   http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.streaming.api.windowing.triggers;import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;/*** A {@link Trigger} that continuously fires based on a given time interval. This fires based* on {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}.** @see org.apache.flink.streaming.api.watermark.Watermark** @param  The type of {@link Window Windows} on which this trigger can operate.*/
@PublicEvolving
public class ContinuousEventTimeTrigger extends Trigger {private static final long serialVersionUID = 1L;private final long interval;/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */private final ReducingStateDescriptor stateDesc =new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);private ContinuousEventTimeTrigger(long interval) {this.interval = interval;}@Overridepublic TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediatelyreturn TriggerResult.FIRE;} else {ctx.registerEventTimeTimer(window.maxTimestamp());}ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc);if (fireTimestamp.get() == null) {long start = timestamp - (timestamp % interval);long nextFireTimestamp = start + interval;ctx.registerEventTimeTimer(nextFireTimestamp);fireTimestamp.add(nextFireTimestamp);}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {if (time == window.maxTimestamp()){return TriggerResult.FIRE;}ReducingState fireTimestampState = ctx.getPartitionedState(stateDesc);Long fireTimestamp = fireTimestampState.get();if (fireTimestamp != null && fireTimestamp == time) {fireTimestampState.clear();fireTimestampState.add(time + interval);ctx.registerEventTimeTimer(time + interval);return TriggerResult.FIRE;}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic void clear(W window, TriggerContext ctx) throws Exception {ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc);Long timestamp = fireTimestamp.get();if (timestamp != null) {ctx.deleteEventTimeTimer(timestamp);fireTimestamp.clear();}}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(W window, OnMergeContext ctx) throws Exception {ctx.mergePartitionedState(stateDesc);Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get();if (nextFireTimestamp != null) {ctx.registerEventTimeTimer(nextFireTimestamp);}}@Overridepublic String toString() {return "ContinuousEventTimeTrigger(" + interval + ")";}@VisibleForTestingpublic long getInterval() {return interval;}/*** Creates a trigger that continuously fires based on the given interval.** @param interval The time interval at which to fire.* @param  The type of {@link Window Windows} on which this trigger can operate.*/public static  ContinuousEventTimeTrigger of(Time interval) {return new ContinuousEventTimeTrigger<>(interval.toMilliseconds());}private static class Min implements ReduceFunction {private static final long serialVersionUID = 1L;@Overridepublic Long reduce(Long value1, Long value2) throws Exception {return Math.min(value1, value2);}}
}

 

转载于:https://www.cnblogs.com/felixzh/p/9698093.html


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部