DataStream算子转换过程
DataStream表示为同一种类型的数据流,用来描述业务转换逻辑。通过转换操作,一个DataStream可以被转换成另一个新的DataStream。
DataStream中有2个成员变量:
// 流程序执行的上下文环境
protected final StreamExecutionEnvironment environment;// 当前DataStream的上一次转换操作(存于StreamExecutionEnvironment的List集合中)
// 即:通过该Transformation,生成当前的DataStream
protected final Transformation<T> transformation;
transformation是当前DataStream的上一次转换操作,得益于transformation,才能转换出当前DataStream。每当调用算子进行转换操作都会产生对应的xxxTransformation,StreamExecutionEnvironment会将这个xxxTransformation添加到List
例如使用DataStream.map()转换时,会在内部创建生成StreamMap算子,同时会把MapFunction对象传入,MapFunction就是我们的数据处理逻辑。StreamMap算子就是AbstractUdfStreamOperator的子类,传入的MapFunction对象会赋值给AbstractUdfStreamOperator中的userFunction变量持有。如此一来,StreamMap算子就持有了MapFunction。
– 底层对应的就是OneInputTransformation转换:
/*** 将MapFunction作为参数传入*/
streamMap.map(new MapFunction<String, JSONObject>() {...});public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {// 获取本次map转换的输出类型TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),Utils.getCallLocationName(), true);return map(mapper, outType);
}/*** 自定义的MapFunction会作为参数,用来构建StreamMap,即StreamOperator的子类*/
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {// 参数:算子name,输出类型,StreamOperator的factoryreturn transform("Map", outputType, new StreamMap<>(clean(mapper)));
}
有了StreamMap算子和代表数据处理逻辑的MapFunction之后,算子name、本次转换的输出类型、StreamMap算子会作为参数,参与本次的转换:
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String operatorName, // outTypeInfo:当前算子的输出类型TypeInformation<R> outTypeInfo, // 本次转换的输出类型// StreamMap算子继承AbstractUdfStreamOperator,实现OneInputStreamOperator接口OneInputStreamOperator<T, R> operator) {// SimpleOperatorFactory.of(operator):根据StreamOperator的类型,创建StreamOperatorFactoryreturn doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}protected <R> SingleOutputStreamOperator<R> doTransform(String operatorName,TypeInformation<R> outTypeInfo,// StreamOperatorFactory:创建StreamOperator的工厂StreamOperatorFactory<R> operatorFactory) {// 保险起见,确保不会出现InvalidTypesException,保证本次的转换操作不会出问题transformation.getOutputType();// 创建本次转换所对应的xxxTransformation实例OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(this.transformation, // 上一次的Transformation转换操作operatorName, // 当前算子的nameoperatorFactory, // MapFunction-->StreamMap -->StreamOperatorFactoryoutTypeInfo, // 当前算子的输出类型// 当前算子的并行度,默认为env全局的并行度environment.getParallelism());// SingleOutputStreamOperator:每次转换操作完毕后,返回给开发者继续操作的数据结构@SuppressWarnings({"unchecked", "rawtypes"})SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);// 将这个xxxTransformation添加到List列表中,它会被用来生成StreamGraph getExecutionEnvironment().addOperator(resultTransform);return returnStream;
}
转换时,首先要确保不会出现InvalidTypesException,不然后面的转换就会出问题。StreamOperatorFactory持有StreamMap算子(也就是StreamOperator),StreamMap持有MapFunction。现在要根据StreamOperatorFactory来创建OneInputTransformation,换言之,MapFunction、StreamMap算子都会被封装到这个OneInputTransformation中。当然,也会创建出SingleOutputStreamOperator,作为本次转换结束之后返回给开发者继续操作的数据结构,下游算子可以用它继续进行转换操作。
map转换生成的OneInputTransformation,会被add到StreamExecutionEnvironment内的 List
在DataStream的转换过程中,不管是哪种类型的转换操作,都是按照以下的程序进行的:
- 将开发者自定义的xxxFunction封装到(对应)创建好的StreamOperator中
- 基于StreamOperator构建xxxTransformation
- 将xxxTransformation添加到StreamExecutionEnvironment的List集合中,用来生成StreamGraph
- 基于StreamGraph,先后生成JobGraph、ExecutionGraph,申请Slot资源并调度、执行…
Transformation持有StreamOperatorFactory,StreamOperatorFactory持有StreamOperator,StreamOperator持有xxxFunction。正是因为这种持有关系,Transformation才能表达DataStream之间的转换关系,因为xxxFunction中定义的就是数据处理逻辑
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
