flink 3-转换

转换 transformation
map

一对一,输入为一,输出为一

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class MapTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env  = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("xxx\hello.txt");SingleOutputStreamOperator<String> map = dataStreamSource.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value.split("\t")[0];}});map.print();env.execute("map test");}
}

输出结果

flatMap

一对多,一行进,多行出

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlatMapTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("xxx\hello.txt");SingleOutputStreamOperator<String> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {for (String word : value.split(" ")) {out.collect(word);}}});flatMap.print();env.execute("flatMap test");}
}

在这里插入图片描述

filter

设置过滤条件

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;public class FilterTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env  = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("xxx\hello.txt");SingleOutputStreamOperator<String[]> filter = dataStreamSource.map(text -> text.split(" ")).filter(text -> text[0].equals("001"));SingleOutputStreamOperator<String> map = filter.map(text -> text[1]);map.addSink(new PrintSinkFunction<>());env.execute("filter test");}
}

output

keyBy

聚合,dataStream中用keyBy, dataSet中用groupBy

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.util.Collector;public class KeyByTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env  = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> dataStreamSource = env.readTextFile("xxx\hello.txt");SingleOutputStreamOperator<Tuple2<String, Integer>> sum = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] strings = value.split(" ");for (String field : strings) {out.collect(new Tuple2<>(field, 1));}}}).keyBy(0).sum(1);sum.addSink(new PrintSinkFunction<>());env.execute("keyBy test");}
}

在这里插入图片描述

reduce
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class ReduceTeset {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env  = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("D:\\ifeng\\flinkTest\\src\\main\\java\\com\\ifeng\\zgx\\exercise\\data\\hello.txt");SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] strings = value.split(" ");for (String str : strings) {out.collect(new Tuple2<>(str, 1));}}}).keyBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<>(value2.f0, value1.f1 + value2.f1);}});reduce.print();env.execute("reduce test");}
}

output

hello.txt

hello.txt

ref

flink transformations
flink Basic API


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部