Flink 三种数据流分流(推荐旁路分流)
前言
近来,因为flink版本的漏洞:CVE—2020—17519#Apache,官方进行了1.12大版本更新,并说到:我们强烈建议所有用户升级到Flink 1.12.1。然后我就发现了Remove deprecated DataStream#split。文中介绍的split方法已经被舍弃(移除),官方推荐使用Side Outputs(旁路分流)。但是我还是保留了该部分代码,一方面是flink的演变的变化,另外一方面写也写了(删了多不好[< _<])。
所以目前flink 1.12.1只支撑filter和SideOutPut 旁路分流了。
Lambda表达式分析和使用
数据源流源码
数据源的pojo用户信息对象数据在下文的博客中的“模拟数据对象代码”(UserImage)中已有说明,不再赘述:
Flink的常见算子和实例代码
1、filter
同一个数据流,遍历多次:
public class Flink_side_output_filter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<UserImage> item = env.addSource(new UserImageSource());SingleOutputStreamOperator<UserImage> lufei = item.filter((FilterFunction<UserImage>) value -> value.getGroupId() == 1);SingleOutputStreamOperator<UserImage> others = item.filter((FilterFunction<UserImage>) value -> value.getGroupId() != 1);lufei.printToErr();others.print();env.execute();}
}
缺点:分几次流就需要,遍历几次原始的数据流,浪费集群资源,影响效率。

2、Split 分流
flink 提供了split算子,只需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。
public class Flink_side_output_split {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<UserImage> item = env.addSource(new UserImageSource());SplitStream<UserImage> splitStream = item.split(new OutputSelector<UserImage>() {@Overridepublic Iterable<String> select(UserImage value) {// TODO Auto-generated method stubArrayList<String> list = new ArrayList<>();if (value.getGroupId() == 1) {list.add("lufei");}else {list.add("others");}return list;}});DataStream lufei = splitStream.select("lufei");DataStream others = splitStream.select("others");lufei.printToErr();others.print();env.execute();}
}

要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 zeroStream 和 oneStream 流再次调用 split 切分,控制台会抛出以下异常。
lufei.split((OutputSelector) value -> {ArrayList<String> list = new ArrayList<>();list.add("Black_Mustache");return list; }).printToErr();
Exception in thread “main” java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.

3、SideOutPut 分流
SideOutPut 分流
SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:
1、定义 OutputTag标记;
2、调用特定函数进行数据拆分:
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- KeyedCoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
SideOutPut 方式拆分流是可以多次进行拆分的,无需担心会爆出异常。
注意·OutputTag是匿名对象。
public class Flink_side_output_stream {
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<UserImage> item = env.addSource(new UserImageSource());//创建分流标记OutputTag<UserImage> lufei = new OutputTag<UserImage>("lufei"){private static final long serialVersionUID = 1L;};OutputTag<UserImage> others = new OutputTag<UserImage>("others"){private static final long serialVersionUID = 1L;};//分流处理SingleOutputStreamOperator<UserImage> processStream = item.process(new ProcessFunction<UserImage, UserImage>() {private static final long serialVersionUID = 1L;@Overridepublic void processElement(UserImage value, ProcessFunction<UserImage, UserImage>.Context ctx,Collector<UserImage> out) throws Exception {// TODO Auto-generated method stubif (value.getGroupId() == 1) {ctx.output(lufei, value);}else {ctx.output(others, value);}}});//输出DataStream lufeiStream = processStream.getSideOutput(lufei);DataStream othersStream = processStream.getSideOutput(others);lufeiStream.printToErr();othersStream.print();env.execute("旁路分流");
}
}

本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
