剑指大数据-flink学习精要demo
cp2 Flink 入门
wordcount 批处理DEMO
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)DataSource lineDS = env.readTextFile("input/words.txt");// 3. 转换数据格式FlatMapOperator> wordAndOne = lineDS.flatMap((String line, Collector> out) -> {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG)); //当 Lambda 表达式使用 Java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息// 4. 按照 word 进行分组UnsortedGrouping> wordAndOneUG = wordAndOne.groupBy(0);// 5. 分组内聚合统计AggregateOperator> sum = wordAndOneUG.sum(1);// 6. 打印结果sum.print();}
}
wordcount 流处理demo
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class BoundedStreamWordCount {public static void main(String[] args) throws Exception {//1.创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.读取文件DataStreamSource lineDSS = env.readTextFile("input/words.txt");//3.转换数据格式SingleOutputStreamOperator> wordAndOne = lineDSS.flatMap((String line, Collector words) -> { Arrays.stream(line.split(" ")).forEach(words::collect);}).returns(Types.STRING).map(word -> Tuple2.of(word, 1L).returns(Types.TUPLE(Types.STRING, Types.LONG));//4.分组KeyedStream, String> wordAndOneKS = wordAndOne.keyBy(t -> t.f0);//5.求和SingleOutputStreamOperator> result = wordAndOneKS.sum(1);//6.打 印result.print();//7.执 行env.execute();}
}
CP5 DataStreamAPI
1.创建执行环境
StreamExecutionEnvironment/ExecutionEnvironment (流/批)= StreamExecutionEnvironment.getExecutionEnvironment(); //根据上下文自动获取StreamExecutionEnvironment.createLocalEnvironment(); //本地StreamExecutionEnvironment.createRemoteEnvironment( //远程"host", // JobManager 主机名1234, // JobManager 进程端口号"path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
2.source 算子
Event事件类
import java.sql.Timestamp;
public class Event { public String user; public String url; public Long timestamp;public Event() {}public Event(String user, String url, Long timestamp) { this.user = user;this.url = url; this.timestamp = timestamp;}@Overridepublic String toString() { return "Event{" +"user='" + user + '\'' +", url='" + url + '\'' +", timestamp=" + new Timestamp(timestamp) + '}';}
}
2.1 从集合中读取:fromCollection/fromElements
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);ArrayList clicks = new ArrayList<>(); clicks.add(new Event("Mary","./home",1000L)); clicks.add(new Event("Bob","./cart",2000L));DataStream stream = env.fromCollection(clicks); DataStreamSource stream2 = env.fromElements( new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L))stream.print();env.execute();
}
2.2 从文件读取数据:readTextFile
org.apache.hadoop hadoop-client 2.7.5 provided
DataStream stream = env.readTextFile("clicks.csv");
2.3 从Socket 读取数据
DataStream stream = env.socketTextStream("localhost", 7777);
2.4 从Kafka 读取数据
org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version}
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;public class SourceKafkaTest {public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "hadoop102:9092"); properties.setProperty("group.id", "consumer-group"); properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest");DataStreamSource stream = env.addSource(new FlinkKafkaConsumer("clicks",new SimpleStringSchema(), properties));stream.print("Kafka"); env.execute();}
}
2.5 自定义Source算子:ClickSource
import com.flink.wc.myflink.bean.Event;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Calendar;
import java.util.Random;public class ClickSource implements SourceFunction {private Boolean running = true;@Overridepublic void run(SourceContext ctx) throws Exception {Random random = new Random();String[] users = {"Mary","Alice","Bob","Cary"};String[] urls = {"./home", "./cart","./fav", "./prod?id=1","./prod?id=2"};while (running) {ctx.collect(new Event(users[random.nextInt(users.length)], // user 和 url 随机组合urls[random.nextInt(urls.length)],Calendar.getInstance().getTimeInMillis() //getTimeInMillis 方法返回当前时间));// 在这个循环中 每隔一秒 就collect(发送)一个数据Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}
}
ClickSource 并行实例
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.Random;
public class ParallelSourceExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new CustomSource()).setParallelism(2).print();env.execute();}while (running) { sourceContext.collect(random.nextInt());}@Overridepublic void cancel() { running = false;}
}
3.Transformation算子 demo
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransMapTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()env.setParallelism(1);DataStreamSource stream = env.fromElements( new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L)); // 传入匿名类,实现 MapFunctionstream.map(new MapFunction() {@Override public String map(Event e) throws Exception { return e.user;}});// 传入 MapFunction 的实现类stream.map(new UserExtractor()).print();env.execute();}public static class UserExtractor implements MapFunction { @Overridepublic String map(Event e) throws Exception { return e.user;}}
}
4.filter算子demo
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransFilterTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource stream = env.fromElements( new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L));// 传入匿名类实现 FilterFunction stream.filter(new FilterFunction() {@Overridepublic boolean filter(Event e) throws Exception { return e.user.equals("Mary");}});// 传入 FilterFunction 实现类stream.filter(new UserFilter()).print();env.execute();}public static class UserFilter implements FilterFunction { @Overridepublic boolean filter(Event e) throws Exception { return e.user.equals("Mary");}}
}
5.扁平映射(flatMap):
主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。
消费一个元素,可以产生 0 到多个元素
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class TransFlatmapTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource stream = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L));stream.flatMap(new MyFlatMap()).print();env.execute();}public static class MyFlatMap implements FlatMapFunction {@Overridepublic void flatMap(Event value, Collector out) throws Exception{if (value.user.equals("Mary")) {out.collect(value.user);} else if (value.user.equals("Bob")) {out.collect(value.user);out.collect(value.url);}}}
}
6.聚合算子(Aggregation)
--按key聚合
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransKeyByTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource stream = env.fromElements( new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L));// 使用 Lambda 表达式KeyedStream keyedStream = stream.keyBy(e -> e.user);// 使用匿名类实现 KeySelectorKeySelector() {@Override public String getKey(Event e) throws Exception { return e.user;}});env.execute();}
}
--简单聚合
sum(By),min(By),max(By):By 返回整条数据
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransTupleAggreationTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource> stream = env.fromElements( Tuple2.of("a", 1),Tuple2.of("a", 3),Tuple2.of("b", 3),Tuple2.of("b", 4));stream.keyBy(r -> r.f0).sum(1).print();stream.keyBy(r -> r.f0).sum("f1").print();stream.keyBy(r -> r.f0).max(1).print();stream.keyBy(r -> r.f0).max("f1").print();stream.keyBy(r -> r.f0).min(1).print();stream.keyBy(r -> r.f0).min("f1").print();stream.keyBy(r -> r.f0).maxBy(1).print();stream.keyBy(r -> r.f0).maxBy("f1").print();stream.keyBy(r -> r.f0).minBy(1).print();stream.keyBy(r -> r.f0).minBy("f1").print();env.execute();}
}--归约聚合(reduce)
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class TransReduceTest {public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);// 这里的 ClickSource()使用了之前自定义数据源小节中的 ClickSource()env.addSource(new ClickSource())//将Event 数据类型转换成元组类型.map(new MapFunction>() { @Overridepublic Tuple2 map(Event e) throws Exception { return Tuple2.of(e.user, 1L);}}).keyBy(r -> r.f0) // 使用用户名来进行分流.reduce(new ReduceFunction>() { @Overridepublic Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {// 每到一条数据,用户 pv 的统计值加 1return Tuple2.of(value1.f0, value1.f1 + value2.f1);}}).keyBy(r -> true) // 为每一条数据分配同一个 key,将聚合结果发送到一条流中去.reduce(new ReduceFunction>() { @Overridepublic Tuple2 reduce(Tuple2 value1,Tuple2 value2) throws Exception {// 将累加器更新为当前最大的 pv 统计值,然后向下游发送累加器的值return value1.f1 > value2.f1 ? value1 : value2;}}).print();env.execute();}
}
7.用户自定义函数(UDF)
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransFunctionUDFTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource clicks = env.fromElements( new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L));DataStream stream = clicks.filter(new FlinkFilter()); stream.print();env.execute();}public static class FlinkFilter implements FilterFunction { @Overridepublic boolean filter(Event value) throws Exception { return value.url.contains("home");}}
}
//使用匿名类
DataStream stream = clicks.filter(new FilterFunction() { @Override public boolean filter(Event value) throws Exception { return value.url.contains("home");}
});
//使用匿名类
DataStream stream = clicks.filter(new KeyWordFilter("home"));
public static class KeyWordFilter implements FilterFunction {private String keyWord;KeyWordFilter(String keyWord) { this.keyWord = keyWord; } @Override public boolean filter(Event value) throws Exception { return value.url.contains(this.keyWord);}
}
匿名函数(Lambda)
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransFunctionLambdaTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource clicks = env.fromElements( new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L));//map 函数使用 Lambda 表达式,返回简单类型,不需要进行类型声明DataStream stream1 = clicks.map(event -> event.url);env.execute();}
}// flatMap 使用 Lambda 表达式,必须通过 returns 明确声明返回类型
DataStream stream2 = clicks.flatMap((Event event, Collector out) -> {out.collect(event.url);}).returns(Types.STRING).print();
*注:有时候使用匿名表达式会报错
*示例
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class ReturnTypeResolve {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource clicks = env.fromElements( new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L));// 1) 使 用 显 式 的 ".returns(...)" DataStream> stream3 = clicks.map( event -> Tuple2.of(event.user, 1L) ).returns(Types.TUPLE(Types.STRING, Types.LONG)); stream3.print();// 2) 使用类来替代 Lambda 表达式clicks.map(new MyTuple2Mapper()).print();// 3) 使用匿名类来代替 Lambda 表达式clicks.map(new MapFunction>() { @Override public Tuple2 map(Event value) throws Exception { return Tuple2.of(value.user, 1L);}}).print();env.execute();}// 自定义 MapFunction 的实现类public static class MyTuple2Mapper implements MapFunction>{@Overridepublic Tuple2 map(Event value) throws Exception { return Tuple2.of(value.user, 1L);}}
}
富函数类(Rich Function Classes)
//富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class RichFunctionTest {public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2);DataStreamSource clicks = env.fromElements( new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L),new Event("Alice", "./prod?id=1", 5 * 1000L), new Event("Cary", "./home", 60 * 1000L));// 将点击事件转换成长整型的时间戳输出clicks.map(new RichMapFunction() { @Overridepublic void open(Configuration parameters) throws Exception { super.open(parameters);System.out.println("索引为"+getRuntimeContext().getIndexOfThisSubtask() + " 的任务开始");}@Overridepublic Long map(Event value) throws Exception { return value.timestamp;}@Overridepublic void close() throws Exception { super.close();System.out.println("索引为"+getRuntimeContext().getIndexOfThisSubtask() + " 的任务结束");}}).print();env.execute();}
}
//一个常见的应用场景就是,如果我们希望连接到一个外部数据库进行读写操作,
//那么将连接操作放在 map()中显然不是个好选择——因为每来一条数据就会重新连接一次数据库;
//所以我们可以在 open()中建立连接,在 map()中读写数据,而在 close()中关闭连接。所以我们推荐
public class MyFlatMap extends RichFlatMapFunction> { @Overridepublic void open(Configuration configuration) {// 做一些初始化工作// 例如建立一个和 MySQL 的连接}@Overridepublic void flatMap(IN in, Collector out) {} // 对数据库进行读写@Overridepublic void close() {} // 清理工作,关闭和 MySQL 数据库的连接
}
8.物理分区
常见的物理分区策略有
随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)
随机分配(Random)
stream.shuffle().print("shuffle").setParallelism(4);
重缩放分区(rescale)让数据只在当前 TaskManager 的多个 slot 之间重新分配,避免网络传输带来的损耗。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
public class RescaleTest throws Exception {public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);// 这里使用了并行数据源的富函数版本// 这样可以调用 getRuntimeContext 方法来获取运行时上下文的一些信息env.addSource(new RichParallelSourceFunction() { @Override public void run(SourceContext sourceContext) throws Exception {for (int i = 0; i < 8; i++) {// 将奇数发送到索引为 1 的并行子任务// 将偶数发送到索引为 0 的并行子任务if ((i + 1) % 2 == getRuntimeContext().getIndexOfThisSubtask()) {sourceContext.collect(i + 1);}}}@Overridepublic void cancel() {}}).setParallelism(2).rescale().print().setParallelism(4);env.execute();}
}
广播(broadcast):经过广播后,数据在不同的分区都保留一份,可能进行重复处理。
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class BroadcastTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据源,并行度为 1DataStreamSource stream = env.addSource(new ClickSource());// 经广播后打印输出,并行度为 4stream.broadcast().print("broadcast").setParallelism(4);env.execute();}
}
全局分区(.global)
自定义分区(Custom)
/*
当 Flink 提供的所有分区策略都不能满足用户的需求时,
我们可以通过使用partitionCustom()方法来自定义分区策略。
在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个是应用分区器的字段,
它的指定方式与 keyBy 指定 key 基本一样:可以通过字段名称指定, 也可以通过字段位置索引来指定,还可以实现一个KeySelector。
*/
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class CustomPartitionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 将自然数按照奇偶分区env.fromElements(1, 2, 3, 4, 5, 6, 7, 8).partitionCustom(new Partitioner() { @Override public int partition(Integer key, int numPartitions) { return key % 2;}}, new KeySelector() { @Overridepublic Integer getKey(Integer value) throws Exception { return value;}}).print().setParallelism(2);env.execute();}
}
9.输出算子( stream.addSink(new SinkFunction(…)) )
addSource 的参数需要实现一个 SourceFunction 接口;
类似地,addSink 方法同样需要传入一个参数,实现的是 SinkFunction 接口。
在这个接口中只需要重写一个方法 invoke(),用来将指定的值写入到外部系统中。
这个方法在每条数据记录到来时都会调用:
default void invoke(IN value, Context context) throws Exception
--输出到文件
import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.Defa ultRollingPolicy;import java.util.concurrent.TimeUnit;
public class SinkToFileTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);DataStreamSource stream = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L),new Event("Alice", "./prod?id=100", 3000L), new Event("Alice", "./prod?id=200", 3500L), new Event("Bob", "./prod?id=2", 2500L),new Event("Alice", "./prod?id=300", 3600L), new Event("Bob", "./home", 3000L),new Event("Bob", "./prod?id=1", 2300L),new Event("Bob", "./prod?id=3", 3300L));StreamingFileSink fileSink = StreamingFileSink.forRowFormat(new Path("./output"), new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(15)).withInactivityInterval(TimeUnit.MINUTES.toMillis(5)).withMaxPartSize(1024 * 1024 * 1024).build()).build();// 将 Event 转换成 String 写入文件stream.map(Event::toString).addSink(fileSink);env.execute();}
}
输出到 Kafka
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
public class SinkToKafkaTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);Properties properties = new Properties(); properties.put("bootstrap.servers", "hadoop102:9092");DataStreamSource stream = env.readTextFile("input/clicks.csv");stream.addSink(new FlinkKafkaProducer( "clicks",new SimpleStringSchema(), properties));env.execute();}
}
输出到 Redis
org.apache.bahir flink-connector-redis_2.11 1.0
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescrip tion;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;public class SinkToRedisTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 创建一个到 redis 连接的配置FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop102").build();env.addSource(new ClickSource()).addSink(new RedisSink(conf, new MyRedisMapper()));env.execute();}
}public static class MyRedisMapper implements RedisMapper { @Overridepublic String getKeyFromData(Event e) { return e.user;}@Overridepublic String getValueFromData(Event e) { return e.url;}@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "clicks");}}
输出到 Elasticsearch
org.apache.flink flink-connector-elasticsearch7_${scala.binary.version}${flink.version}
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap; public class SinkToEsTest {public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);DataStreamSource stream = env.fromElements( new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L),new Event("Alice", "./prod?id=100", 3000L), new Event("Alice", "./prod?id=200", 3500L), new Event("Bob", "./prod?id=2", 2500L),new Event("Alice", "./prod?id=300", 3600L), new Event("Bob", "./home", 3000L),new Event("Bob", "./prod?id=1", 2300L),new Event("Bob", "./prod?id=3", 3300L));ArrayList httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("hadoop102", 9200, "http"));ElasticsearchSinkFunction elasticsearchSinkFunction = new ElasticsearchSinkFunction() {@Overridepublic void process(Event element, RuntimeContext ctx, RequestIndexer indexer) {HashMap data = new HashMap<>(); data.put(element.user, element.url);IndexRequest request = Requests.indexRequest().index("clicks").type("type")/*Es6必须定义type*/.source(data);indexer.add(request);}};stream.addSink(new ElasticsearchSink.Builder(httpHosts, elasticsearchSinkFunction).build());stream.addSink(esBuilder.build());env.execute();}
}
输出到 MySQL
org.apache.flink flink-connector-jdbc_${scala.binary.version} ${flink.version}
mysql mysql-connector-java 5.1.47
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinkToMySQL {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource stream = env.fromElements( new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L),new Event("Alice", "./prod?id=100", 3000L), new Event("Alice", "./prod?id=200", 3500L),new Event("Bob", "./prod?id=2", 2500L),new Event("Alice", "./prod?id=300", 3600L),new Event("Bob", "./prod?id=1", 2300L),new Event("Bob", "./prod?id=3", 3300L));stream.addSink(JdbcSink.sink("INSERT INTO clicks (user, url) VALUES (?, ?)",(statement, r) -> {statement.setString(1, r.user); statement.setString(2, r.url);},JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).withMaxRetries(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/userbehavior")// 对于 MySQL 5.7,用"com.mysql.jdbc.Driver".withDriverName("com.mysql.cj.jdbc.Driver").withUsername("username").withPassword("password").build()));env.execute();}
}
自定义 Sink 输出(实现自定义SinkFunction)
org.apache.hbase hbase-client ${hbase.version}
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import java.nio.charset.StandardCharsets;public class SinkCustomtoHBase {public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);env.fromElements("hello", "world").addSink(new RichSinkFunction() {public org.apache.hadoop.conf.Configuration configuration; // 管理 Hbase 的配置信息,这里因为 Configuration 的重名问题,将类以完整路径导入public Connection connection; // 管理 Hbase 连接@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum","hadoop102:2181");connection = ConnectionFactory.createConnection(configuration);}@Overridepublic void invoke(String value, Context context) throws Exception {Table table = connection.getTable(TableName.valueOf("test")); // 表名为 testPut put = new Put("rowkey".getBytes(StandardCharsets.UTF_8)); // 指定 rowkeyput.addColumn("info".getBytes(StandardCharsets.UTF_8) // 指定列名, value.getBytes(StandardCharsets.UTF_8) // 写入的数据, "1".getBytes(StandardCharsets.UTF_8)); // 写入的数据table.put(put); // 执行 put 操作table.close(); // 将表关闭}@Overridepublic void close() throws Exception { super.close();connection.close(); // 关闭连接}});env.execute();}}
CP6 时间和窗口
1.水位线生成
public SingleOutputStreamOperator assignTimestampsAndWatermarks(WatermarkStrategy watermarkStrategy)
DataStream stream = env.addSource(new ClickSource());
DataStream withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks();public interface WatermarkStrategy extends TimestampAssignerSupplier,WatermarkGeneratorSupplier{@Override TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context);@Override WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
⚫TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
⚫WatermarkGenerator: 主要负责按照既定的方式, 基于时间戳生成水位线。在WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()。
⚫onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳, 以及允许发出水位线的一个 WatermarkOutput,可以基于事件做各种操作
⚫onPeriodicEmit:周期性调用的方法,可以由 WatermarkOutput 发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为200ms。
2.内置生成器
2.1有序流
/*
对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),
所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,
直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现
*/
stream.assignTimestampsAndWatermarks( WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner(){@Overridepublic long extractTimestamp(Event element, long recordTimestamp){return element.timestamp;}}));
2.2 乱序流
/*
由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间( Fixed Amount of Lateness)。
这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。
调用 WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。
这个方法需要传入一个 maxOutOfOrderness 参数,表示“最大乱序程度”,
它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。
*/
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import java.time.Duration;
public class WatermarkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.addSource(new ClickSource())// 插入水位线的逻辑.assignTimestampsAndWatermarks(// 针对乱序流插入水位线,延迟时间设置为 5sWatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner() {// 抽取时间戳的逻辑@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}})).print();env.execute();}
}
3.自定义水位线生成
(1)周期性水位线生成器(Periodic Generator)
周期性生成器一般是通过 onEvent()观察判断输入的事件,而在 onPeriodicEmit()里发出水位线。
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;// 自定义水位线的产生
public class CustomWatermarkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.addSource(new ClickSource()).assignTimestampsAndWatermarks(new CustomWatermarkStrategy()).print();env.execute();}public static class CustomWatermarkStrategy implements WatermarkStrategy {@Overridepublic TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp){return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段}};}@Overridepublic WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context){return new CustomPeriodicGenerator();}}public static class CustomPeriodicGenerator implements WatermarkGenerator {private Long delayTime = 5000L; // 延迟时间private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳@Overridepublic void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认 200ms 调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}}
}
(2)断点式水位线生成器(Punctuated Generator)
断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的特殊事件时, 就立即发出水位线。
一般来说,断点式生成器不会通过 onPeriodicEmit()发出水位线。
public class CustomPunctuatedGenerator implements WatermarkGenerator {@Overridepublic void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {// 只有在遇到特定的 itemId 时,才发出水位线if (r.user.equals("Mary")) {output.emitWatermark(new Watermark(r.timestamp - 1));}}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线}
}
4.在自定义数据源中发送水位线
我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。
这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用 assignTimestampsAndWatermarks 方法来生成水位线了
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import java.util.Calendar;
import java.util.Random;
public class EmitWatermarkInSourceFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.addSource(new ClickSourceWithWatermark()).print();env.execute();// 泛型是数据源中的类型public static class ClickSourceWithWatermark implements SourceFunction{private boolean running = true;@Overridepublic void run(SourceContext sourceContext) throws Exception { Random random = new Random();String[] userArr = {"Mary", "Bob", "Alice"};String[] urlArr = {"./home", "./cart", "./prod?id=1"}; while (running) {long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳String username = userArr[random.nextInt(userArr.length)]; String url = urlArr[random.nextInt(urlArr.length)]; Event event = new Event(username, url, currTs);// 使用 collectWithTimestamp 方法将数据发送出去,并指明数据中的时间戳的字段sourceContext.collectWithTimestamp(event, event.timestamp);// 发送水位线sourceContext.emitWatermark(new Watermark(event.timestamp - 1L)); Thread.sleep(1000L);}}@Overridepublic void cancel() { running = false;}}}
}
5.关于时间窗口
//(1)滚动处理时间窗口
stream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(...)
//(2)滑动处理时间窗口
stream.keyBy(...).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(...)
//(3)处理时间会话窗口
stream.keyBy(...).window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)
/*这里.withGap()方法需要传入一个 Time 类型的参数 size,表示会话的超时时间,也就是最小间隔 session gap。我们这里创建了静态会话超时时间为 10 秒的会话窗口。
*/.window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor>() {@Overridepublic long extract(Tuple2 element) {// 提取 session gap 值返回, 单位毫秒return element.f0.length() * 1000;}}))
/*这里.withDynamicGap()方法需要传入一个 SessionWindowTimeGapExtractor 作为参数,用来定义 session gap 的动态提取逻辑。在这里,我们提取了数据元素的第一个字段,用它的长度乘以 1000 作为会话超时的间隔。
*/ //(4)滚动事件时间窗口
stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.seconds(5))).aggregate(...)
//(5)滑动事件时间窗口
stream.keyBy(...).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(...)
//(6)事件时间会话窗口
stream.keyBy(...).window(EventTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)
6.关于计数窗口
//(1)滚动计数窗口
stream.keyBy(...).countWindow(10)
//(2)滑动计数窗口
stream.keyBy(...).countWindow(10,3)
7.全局窗口
stream.keyBy(...).window(GlobalWindows.create());
8.窗口聚合函数
8.1.增量聚合函数(incremental aggregation functions)
(1)归约函数(ReduceFunction)
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;
public class WindowReduceExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从自定义数据源读取数据,并提取时间戳、生成水位线SingleOutputStreamOperator stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner(){@Overridepublic long extractTimestamp(Event element, long recordTimestamp){return element.timestamp;}})); stream.map(new MapFunction>() {@Overridepublic Tuple2 map(Event value) throws Exception {// 将数据转换成二元组,方便计算return Tuple2.of(value.user, 1L);}}).keyBy(r -> r.f0)// 设置滚动事件时间窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction>() {@Overridepublic Tuple2 reduce(Tuple2 value1,Tuple2 value2) throws Exception {// 定义累加规则,窗口闭合时,向下游发送累加结果return Tuple2.of(value1.f0, value1.f1 + value2.f1);}}).print();env.execute();}
}
(2)聚合函数(AggregateFunction)
public interface AggregateFunction extends Function, Serializable{ACC createAccumulator();ACC add(IN value, ACC accumulator);OUT getResult(ACC accumulator);ACC merge(ACC a, ACC b);
}
⚫ createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚 合任务只会调用一次。
⚫ add():将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进 一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。
⚫ getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态, 然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均值,就可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。
⚫ merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在 需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)。
计算 “人均重复访问量”
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.HashSet;public class WindowAggregateFunctionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner(){@Overridepublic long extractTimestamp(Event element, long recordTimestamp){return element.timestamp;}}));// 所有数据设置相同的 key,发送到同一个分区统计 PV 和 UV,再相除stream.keyBy(data -> true).window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(2))).aggregate(new AvgPv()).print();env.execute();}public static class AvgPv implements AggregateFunction, Long>, Double> {@Overridepublic Tuple2, Long> createAccumulator() { // 创建累加器return Tuple2.of(new HashSet(), 0L);}@Overridepublic Tuple2, Long> add(Event value,Tuple2, Long> accumulator) {// 属于本窗口的数据来一条累加一次,并返回累加器accumulator.f0.add(value.user);return Tuple2.of(accumulator.f0, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2, Long> accumulator) {// 窗口闭合时,增量聚合结束,将计算结果发送到下游return (double) accumulator.f1 / accumulator.f0.size();}@Overridepublic Tuple2, Long> merge(Tuple2, Long> a, Tuple2, Long> b) {return null;}}
}
8.2.全窗口函数(full window fuvnctions)
(1)窗口函数(WindowFunction)
stream.keyBy().window().apply(new MyWindowFunction());
/*这个类中可以获取到包含窗口所有数据的可迭代集合( Iterable),还可以拿到窗口(Window)本身的信息*/
public interface WindowFunction extends Function, Serializable {void apply(KEY key, W window, Iterable input, Collector out) throws Exception;
}
/*
当窗口到达结束时间需要触发计算时,就会调用这里的 apply 方法。
我们可以从 input 集合中取出窗口收集的数据,结合 key 和 window 信息,
通过收集器(Collector)输出结果。这里 Collector 的用法,与 FlatMapFunction 中相同。
*/
(2)处理窗口函数(ProcessWindowFunction)
增强版的WindowFunction,除了可以拿到窗口中的所有数据之外 还可以获取“上下文对象”(Context)
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.HashSet;public class UvCountByWindowExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 将数据全部发往同一分区,按窗口统计 UVstream.keyBy(data -> true).window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new UvCountByWindow()).print();env.execute();}// 自定义窗口处理函数public static class UvCountByWindow extends ProcessWindowFunction{@Overridepublic void process(Boolean aBoolean, Context context, Iterable elements, Collector out) throws Exception {HashSet userSet = new HashSet<>();// 遍历所有数据,放到 Set 里去重for (Event event: elements){userSet.add(event.user);}// 结合窗口信息,包装输出内容Long start = context.window().getStart();Long end = context.window().getEnd();out.collect(" 窗 口 : " + new Timestamp(start) + " ~ " + new Timestamp(end) + " 的独立访客数量是:" + userSet.size());}}
}
// ReduceFunction 与 WindowFunction 结合
public SingleOutputStreamOperator reduce(ReduceFunction reduceFunction, WindowFunction function)
// ReduceFunction 与 ProcessWindowFunction 结合
public SingleOutputStreamOperator reduce(ReduceFunction reduceFunction,ProcessWindowFunction function)
// AggregateFunction 与 WindowFunction 结合
public SingleOutputStreamOperator aggregate(AggregateFunction aggFunction, WindowFunction windowFunction)
// AggregateFunction 与 ProcessWindowFunction 结合
public SingleOutputStreamOperator aggregate(AggregateFunction aggFunction, ProcessWindowFunction windowFunction)
UrlViewCount demo
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class UrlViewCountExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 需要按照 url 分组,开滑动窗口统计stream.keyBy(data -> data.url).window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))// 同时传入增量聚合函数和全窗口函数.aggregate(new UrlViewCountAgg(), new UrlViewCountResult()).print();env.execute();}// 自定义增量聚合函数,来一条数据就加一public static class UrlViewCountAgg implements AggregateFunction {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(Event value, Long accumulator) {return accumulator + 1;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long a, Long b) {return null;}}// 自定义窗口处理函数,只需要包装窗口信息public static class UrlViewCountResult extends ProcessWindowFunction {@Overridepublic void process(String url, Context context, Iterable elements,Collector out) throws Exception {// 结合窗口信息,包装输出内容Long start = context.window().getStart();Long end = context.window().getEnd();// 迭代器中只有一个元素,就是增量聚合函数的计算结果out.collect(new UrlViewCount(url, elements.iterator().next(), start,end));}}
}
9.测试水位线的使用
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;public class WatermarkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource dataStreamSource = executionEnvironment.fromElements(new MyEvent("zs", "/user", 1000L),new MyEvent("zs", "/order", 1100L),new MyEvent("zs", "/product?id=1", 1200L),new MyEvent("ls", "/user", 1200L),new MyEvent("ls", "/product", 2000L),new MyEvent("ww", "/product", 4000L),new MyEvent("ww", "/order", 6000L),new MyEvent("zl", "/order", 10000L));// 有序流/*dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(MyEvent element, long recordTimestamp) {*//* 时间戳和水位线的单位都必须是毫秒*//*return element.getTimestamp();}}));*/// 无序流(且延迟时间是5s)dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(5)).withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(MyEvent element, long recordTimestamp) {return element.getTimestamp();}}))// 根据user分组,开窗统计.keyBy(data -> data.user).window(TumblingEventTimeWindows.of(Time.seconds(1))).process(new WatermarkTestResult()).print();/*** 事实上。 有序流的水位线生成器本质和乱序流是一样的,相当于延迟设为0的乱序流水线生成器* forMonotonousTimestamps* forBoundedOutOfOrderness* @see AscendingTimestampsWatermarks*/executionEnvironment.execute();}// 自定义处理窗口函数,输出当前的水位线和窗口信息以及每个窗口的数据信息public static class WatermarkTestResult extends ProcessWindowFunction {@Overridepublic void process(String s, Context context, Iterable elements, Collector out) throws Exception {Long start = context.window().getStart();Long end = context.window().getEnd();Long currentWatermark = context.currentWatermark();Long count = elements.spliterator().getExactSizeIfKnown();// 收集元素, 然后汇总到结果集List result = new ArrayList<>();Iterator iterator = elements.iterator();while (iterator.hasNext()) {result.add(iterator.next().toString());}out.collect("窗口" + start + " ~ " + end + "中共有" + count + "个元素,窗口闭合计算时,水位线处于:" + currentWatermark + " result: " + result);}}
}
10.其他API
10.1.触发器(Trigger)
stream.keyBy(...).window(...).trigger(new MyTrigger())
Trigger 是一个抽象类,自定义时必须实现下面四个抽象方法:⚫onElement():窗口中每到来一个元素,都会调用这个方法。
⚫onEventTime():当注册的事件时间定时器触发时,将调用这个方法。
⚫onProcessingTime ():当注册的处理时间定时器触发时,将调用这个方法。
⚫clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态。
下面我们举一个例子。在日常业务场景中,我们经常会开比较大的窗口来计算每个窗口的pv 或者 uv 等数据。
但窗口开的太大,会使我们看到计算结果的时间间隔变长。所以我们可以使用触发器,来隔一段时间触发一次窗口计算
trigger demo
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class TriggerExample {public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event event, long l) { return event.timestamp;}})).keyBy(r -> r.url).window(TumblingEventTimeWindows.of(Time.seconds(10))).trigger(new MyTrigger()).process(new WindowResult()).print();env.execute();}public static class WindowResult extends ProcessWindowFunction {@Overridepublic void process(String s, Context context, Iterable iterable, Collector collector) throws Exception {collector.collect(new UrlViewCount( s,// 获取迭代器中的元素个数iterable.spliterator().getExactSizeIfKnown(), context.window().getStart(), context.window().getEnd()));}}public static class MyTrigger extends Trigger {@Overridepublic TriggerResult onElement(Event event, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {ValueState isFirstEvent = triggerContext.getPartitionedState(new ValueStateDescriptor("first-event",Types.BOOLEAN));if (isFirstEvent.value() == null) {for (long i = timeWindow.getStart(); i < timeWindow.getEnd(); i = i + 1000L) {triggerContext.registerEventTimeTimer(i);}isFirstEvent.update(true);}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long l,TimeWindow timeWindow,TriggerContext triggerContext) throws Exception {return TriggerResult.FIRE;}@Overridepublic TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {ValueState isFirstEvent =triggerContext.getPartitionedState(new ValueStateDescriptor("first-event",Types.BOOLEAN));isFirstEvent.clear();}}
}
10.2.移除器(Evictor)
stream.keyBy(...).window(...).evictor(new MyEvictor())Evictor 接口定义了两个方法:
⚫evictBefore():定义执行窗口函数之前的移除数据操作
⚫evictAfter():定义执行窗口函数之后的以处数据操作
10.3.允许延迟(Allowed Lateness)
stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1)))
10.4.将迟到的数据放入侧输出流
DataStream stream = env.addSource(...);
OutputTag outputTag = new OutputTag("late") {};
stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1))).sideOutputLateData(outputTag)SingleOutputStreamOperator winAggStream =
stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1))).sideOutputLateData(outputTag).aggregate(new MyAggregateFunction())
11.窗口生命周期 :创建 -》触发-》销毁
12.迟到数据处理
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration; public class ProcessLateDataExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 读取 socket 文本流 SingleOutputStreamOperator stream = env.socketTextStream("localhost", 7777).map(new MapFunction() {@Override public Event map(String value) throws Exception {String[] fields = value.split(" ");return new Event(fields[0].trim(), fields[1].trim(),Long.valueOf(fields[2].trim())); } })// 方式一:设置 watermark 延迟时间,2 秒钟 .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner() {@Override public long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;} })); // 定义侧输出流标签 OutputTag outputTag = new OutputTag("late"){}; SingleOutputStreamOperator result = stream.keyBy(data -> data.url) .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 方式二:允许窗口处理迟到数据,设置 1 分钟的等待时间 .allowedLateness(Time.minutes(1)) // 方式三:将最后的迟到数据输出到侧输出流 .sideOutputLateData(outputTag) .aggregate(new UrlViewCountAgg(), new UrlViewCountResult()); result.print("result"); result.getSideOutput(outputTag).print("late"); // 为方便观察,可以将原始数据也输出 stream.print("input"); env.execute(); } public static class UrlViewCountAgg implements AggregateFunction {@Override public Long createAccumulator() { return 0L; } @Override public Long add(Event value, Long accumulator) { return accumulator + 1; } @Override public Long getResult(Long accumulator) { return accumulator; } @Override public Long merge(Long a, Long b) { return null;} } public static class UrlViewCountResult extends ProcessWindowFunction { @Override public void process(String url, Context context, Iterable elements, Collector out) throws Exception { // 结合窗口信息,包装输出内容 Long start = context.window().getStart(); Long end = context.window().getEnd(); out.collect(new UrlViewCount(url, elements.iterator().next(), start, end)); } }
}
CP7 处理函数
7.1基本处理函数
当数据的 user 为“Mary”时,将其输出一次;而如果为“Bob”时,将 user 输出两次
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
public class ProcessFunctionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.addSource(new ClickSource()).assignTimestampsAndWatermarks( WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}})).process(new ProcessFunction() { @Overridepublic void processElement(Event value,Context ctx,Collector out) throws Exception {if (value.user.equals("Mary")) { out.collect(value.user);}else if (value.user.equals("Bob")) { out.collect(value.user); out.collect(value.user);}System.out.println(ctx.timerService().currentWatermark());}}).print();env.execute();}
}
7.1.2ProcessFunction 解析
在源码中我们可以看到,抽象类 ProcessFunction 继承了 AbstractRichFunction,
有两个泛型类型参数:I 表示 Input,也就是输入的数据类型;O 表示Output,也就是处理完成之后输出的数据类型。
内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。
public abstract class ProcessFunction extends AbstractRichFunction {
...public abstract void processElement(I value, Context ctx, Collector out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {}
...
}
.processElement()
/*用于“处理元素”,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次, 参数包括三个:输入数据值 value,上下文 ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器 out 来定义的。
*/
⚫value:当前流中的输入元素,也就是正在处理的数据,类型与流中数据类 型一致。
⚫ctx:类型是 ProcessFunction 中定义的内部抽象类Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法
⚫out:“收集器”(类型为 Collector),用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样,直接调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用。public abstract class Context {public abstract Long timestamp();public abstract TimerService timerService();public abstract void output(OutputTag outputTag, X value);
}
7.1.3处理函数的分类
(1)ProcessFunction
最基本的处理函数,基于DataStream 直接调用.process()时作为参数传入。
(2)KeyedProcessFunction
对流按键分区后的处理函数,基于 KeyedStream 调用.process()时作为参数传入。要想使用定时器,比如基于KeyedStream。
(3)ProcessWindowFunction
开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream 调用.process()时作为参数传入。
(4)ProcessAllWindowFunction
同样是开窗之后的处理函数,基于AllWindowedStream 调用.process()时作为参数传入。
(5)CoProcessFunction
合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。
(6)ProcessJoinFunction
间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为参数传入。
(7)BroadcastProcessFunction
广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物。关于广播流的相关操作,我们会在后续章节详细介绍。
(8)KeyedBroadcastProcessFunction
按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用.process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream 与广播流(BroadcastStream)做连接之后的产物。
接下来,我们就对 KeyedProcessFunction 和 ProcessWindowFunction 的具体用法展开详细说明。
7.2.1定时器(Timer)和定时服务(TimerService)
public abstract TimerService timerService();
//TimerService 是 Flink 关于时间和定时器的基础服务接口,包含以下六个方法:
long currentProcessingTime(); // 获取当前的处理时间
long currentWatermark(); // 获取当前的水位线(事件时间)
void registerProcessingTimeTimer(long time); // 注册处理时间定时器,当处理时间超过 time 时触发
void registerEventTimeTimer(long time); // 注册事件时间定时器,当水位线超过 time 时触发
void deleteProcessingTimeTimer(long time); // 删除触发时间为 time 的处理时间定时器
void deleteEventTimeTimer(long time); // 删除触发时间为 time 的处理时间定时器
7.2.2KeyedProcessFunction 的使用
stream.keyBy( t -> t.f0 ).process(new MyKeyedProcessFunction())
public abstract class KeyedProcessFunction extends AbstractRichFunction{...public abstract void processElement(I value, Context ctx, Collector out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {}public abstract class Context {...}
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
public class ProcessingTimeTimerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 处理时间语义,不需要分配时间戳和 watermarkSingleOutputStreamOperator stream = env.addSource(new ClickSource());// 要用定时器,必须基于 KeyedStreamstream.keyBy(data -> true).process(new KeyedProcessFunction() {@Overridepublic void processElement(Event value, Context ctx, Collector out) throws Exception {Long currTs = ctx.timerService().currentProcessingTime();out.collect("数据到达,到达时间:" + new Timestamp(currTs));// 注册一个 10 秒后的定时器ctx.timerService().registerProcessingTimeTimer(currTs + 10 * 1000L);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {out.collect("定时器触发,触发时间:" + new Timestamp(timestamp));}}).print();env.execute();}
}
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;public class EventTimeTimerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator stream = env.addSource(new CustomSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forMonot onousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element,long recordTimestamp){return element.timestamp;}}));// 基于 KeyedStream 定义事件时间定时器stream.keyBy(data -> true).process(new KeyedProcessFunction() {@Overridepublic void processElement(Event value,Context ctx,Collector out) throws Exception {out.collect("数据到达,时间戳为:" + ctx.timestamp());out.collect("数据到达,水位线为:" + ctx.timerService().currentWatermark() + "\n -------分割线 ");// 注册一个 10 秒后的定时器ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 1000L);}@Overridepublic void onTimer(long timestamp,OnTimerContext ctx,Collector out) throws Exception {out.collect("定时器触发,触发时间:" + timestamp);}}).print();env.execute();}// 自定义测试数据源public static class CustomSource implements SourceFunction { @Overridepublic void run(SourceContext ctx) throws Exception {// 直接发出测试数据ctx.collect(new Event("Mary", "./home", 1000L));// 为了更加明显,中间停顿 5 秒钟Thread.sleep(5000L);// 发出 10 秒后的数据ctx.collect(new Event("Mary", "./home", 11000L)); Thread.sleep(5000L);// 发出 10 秒+1ms 后的数据ctx.collect(new Event("Alice", "./cart", 11001L)); Thread.sleep(5000L);}@Overridepublic void cancel() { }}
}
7.3窗口处理函数
除了 KeyedProcessFunction ,另外一大类常用的处理函数,就是基于窗口的ProcessWindowFunction 和 ProcessAllWindowFunction
7.3.1窗口处理函数的使用
stream.keyBy( t -> t.f0 ).window( TumblingEventTimeWindows.of(Time.seconds(10))).process(new MyProcessWindowFunction())
7.3.2ProcessWindowFunction 解析
public abstract class ProcessWindowFunction extends AbstractRichFunction {...public abstract void process(KEY key,Context context,Iterable elements,Collector out) throws Exception;public void clear(Context context) throws Exception {}public abstract class Context implements java.io.Serializable {...}
}
⚫IN:input,数据流中窗口任务的输入数据类型。
⚫OUT:output,窗口任务进行计算之后的输出数据类型。
⚫KEY:数据中键 key 的类型
⚫W:窗口的类型,是Window 的子类型。一般情况下我们定义时间窗口,W 就是TimeWindow。而内部定义的方法,跟我们之前熟悉的处理函数就有所区别了。因为全窗口函数不是逐个处理元素的,所以处理数据的方法在这里并不是.processElement(),而是改成了.process()。方法包含四个参数。
⚫key:窗口做统计计算基于的键,也就是之前 keyBy 用来分区的字段。
⚫context:当前窗口进行计算的上下文,它的类型就是 ProcessWindowFunction内部定义的抽象类Context。
⚫elements:窗口收集到用来计算的所有数据,这是一个可迭代的集合类型。
⚫out:用来发送数据输出计算结果的收集器,类型为Collector。public abstract class Context implements java.io.Serializable { public abstract W window();public abstract long currentProcessingTime(); public abstract long currentWatermark();public abstract KeyedStateStore windowState(); public abstract KeyedStateStore globalState();public abstract void output(OutputTag outputTag, X value);
}
7.4应用案例——Top N
7.4.1使用ProcessAllWindowFunction
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
public class ProcessAllWindowTopN {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator eventStream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 只需要 url 就可以统计数量,所以转换成 String 直接开窗统计SingleOutputStreamOperator result = eventStream.map(new MapFunction() {@Overridepublic String map(Event value) throws Exception {return value.url;}}).windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 开滑动窗口.process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {HashMap urlCountMap = new HashMap<>();// 遍历窗口中数据,将浏览量保存到一个 HashMap 中for (String url : elements) {if (urlCountMap.containsKey(url)) {long count = urlCountMap.get(url);urlCountMap.put(url, count + 1L);} else {urlCountMap.put(url, 1L);}}ArrayList> mapList = new ArrayList>();// 将浏览量数据放入 ArrayList,进行排序for (String key : urlCountMap.keySet()) {mapList.add(Tuple2.of(key, urlCountMap.get(key)));}mapList.sort(new Comparator>() {@Overridepublic int compare(Tuple2 o1, Tuple2 o2) {return o2.f1.intValue() - o1.f1.intValue();}});// 取排序后的前两名,构建输出结果StringBuilder result = new StringBuilder();result.append("========================================\n");for (int i = 0; i < 2; i++) {Tuple2 temp = mapList.get(i);String info = "浏览量 No." + (i + 1) +" url:" + temp.f0 +" 浏览量:" + temp.f1 +" 窗 口 结 束 时 间 : " + new Timestamp(context.window().getEnd()) + "\n";result.append(info);}result.append("========================================\n");out.collect(result.toString());}});result.print();env.execute();}
}
7.4.2使用 KeyedProcessFunction
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;
public class KeyedProcessTopN {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从自定义数据源读取数据SingleOutputStreamOperator eventStream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 需要按照 url 分组,求出每个 url 的访问量SingleOutputStreamOperator urlCountStream = eventStream.keyBy(data -> data.url).window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))).aggregate(new UrlViewCountAgg(),new UrlViewCountResult());// 对结果中同一个窗口的统计数据,进行排序处理SingleOutputStreamOperator result = urlCountStream.keyBy(data -> data.windowEnd).process(new TopN(2));result.print("result");env.execute();}// 自定义增量聚合public static class UrlViewCountAgg implements AggregateFunction {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(Event value, Long accumulator) {return accumulator + 1;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long a,Long b) {return null;}}// 自定义全窗口函数,只需要包装窗口信息public static class UrlViewCountResult extends ProcessWindowFunction {@Overridepublic void process(String url,Context context,Iterable elements,Collector out) throws Exception {// 结合窗口信息,包装输出内容Long start = context.window().getStart();Long end = context.window().getEnd();out.collect(new UrlViewCount(url,elements.iterator().next(),start,end));}}// 自定义处理函数,排序取 top npublic static class TopN extends KeyedProcessFunction{// 将 n 作为属性private Integer n;// 定义一个列表状态private ListState urlViewCountListState;public TopN(Integer n) {this.n = n;}@Overridepublic void open(Configuration parameters) throws Exception {// 从环境中获取列表状态句柄urlViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor("url-view-count-list",Types.POJO(UrlViewCount.class)));}@Overridepublic void processElement(UrlViewCount value,Context ctx,Collector out) throws Exception {// 将 count 数据添加到列表状态中,保存起来urlViewCountListState.add(value);// 注册 window end + 1ms 后的定时器,等待所有数据到齐开始排序ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() + 1);}@Overridepublic void onTimer(long timestamp,OnTimerContext ctx,Collector out) throws Exception {// 将数据从列表状态变量中取出,放入 ArrayList,方便排序ArrayList urlViewCountArrayList = new ArrayList<>();for (UrlViewCount urlViewCount:urlViewCountListState.get()) {urlViewCountArrayList.add(urlViewCount);}// 清空状态,释放资源urlViewCountListState.clear();// 排序urlViewCountArrayList.sort(new Comparator() {@Overridepublic int compare(UrlViewCount o1, UrlViewCount o2) {return o2.count.intValue() - o1.count.intValue();}});// 取前两名,构建输出结果StringBuilder result = new StringBuilder();result.append("========================================\n");result.append("窗口结束时间:" + new Timestamp(timestamp - 1) + "\n");for (int i = 0; i < this.n; i++) {UrlViewCount UrlViewCount = urlViewCountArrayList.get(i);String info = "No." + (i + 1) + " " + "url:" + UrlViewCount.url + " " + "浏览量:" + UrlViewCount.count + "\n";result.append(info);}result.append("========================================\n");out.collect(result.toString());}}
}
7.5侧输出流(Side Output)
OutputTag outputTag = new OutputTag("side-output") {};
DataStream stringStream = longStream.getSideOutput(outputTag);DataStream stream = env.addSource(...);SingleOutputStreamOperator longStream = stream.process(new ProcessFunction(){@Overridepublic void processElement( Integer value, Context ctx, Collector out) throws Exception {// 转换成 Long,输出到主流中out.collect(Long.valueOf(value));// 转换成 String,输出到侧输出流中ctx.output(outputTag, "side-output: " + String.valueOf(value));}});
CP8 多流转换 union、connect、join 以及 coGroup
8.1分流
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SplitStreamByFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator stream = env.addSource(new ClickSource());// 筛选 Mary 的浏览行为放入 MaryStream 流中DataStream MaryStream = stream.filter(new FilterFunction() {@Override public boolean filter(Event value) throws Exception {return value.user.equals("Mary");} });// 筛选 Bob 的购买行为放入 BobStream 流中DataStream BobStream = stream.filter(new FilterFunction() {@Override public boolean filter(Event value) throws Exception {return value.user.equals("Bob");}});// 筛选其他人的浏览行为放入 elseStream 流中DataStream elseStream = stream.filter(new FilterFunction() {@Override public boolean filter(Event value) throws Exception {return !value.user.equals("Mary") && !value.user.equals("Bob");}});MaryStream.print("Mary pv");BobStream.print("Bob pv");elseStream.print("else pv");env.execute();}
}
使用sideoutput
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class SplitStreamByOutputTag { // 定义输出标签,侧输出流的数据类型为三元组(user, url, timestamp)private static OutputTag> MaryTag = new OutputTag>("Mary-pv") {};private static OutputTag> BobTag = new OutputTag>("Bob-pv") {};public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator stream = env.addSource(new ClickSource());SingleOutputStreamOperator processedStream = stream.process(new ProcessFunction() {@Overridepublic void processElement(Event value, Context ctx, Collector out) throws Exception {if (value.user.equals("Mary")) {ctx.output(MaryTag, new Tuple3<>(value.user, value.url, value.timestamp));} else if (value.user.equals("Bob")) {ctx.output(BobTag, new Tuple3<>(value.user, value.url, value.timestamp));} else {out.collect(value);}}});processedStream.getSideOutput(MaryTag).print("Mary pv");processedStream.getSideOutput(BobTag).print("Bob pv");processedStream.print("else");env.execute();}
}
8.2.1联合(Union) https://blog.csdn.net/m0_63475429/article/details/127352539
import com.ambitfly.pojo.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;public class UnionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator stream1 = env.socketTextStream("node1",7777).map(data -> {String[] field = data.split(",");return new Event(field[0].trim(),field[1].trim(),Long.valueOf(field[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}}));stream1.print("stream1");SingleOutputStreamOperator stream2 = env.socketTextStream("node2",7777).map(data -> {String[] field = data.split(",");return new Event(field[0].trim(),field[1].trim(),Long.valueOf(field[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}}));stream2.print("stream2");// 合并两条流stream1.union(stream2).process(new ProcessFunction() {@Overridepublic void processElement(Event event, ProcessFunction.Context ctx, Collector out) throws Exception {out.collect(" 水 位 线 : " + ctx.timerService().currentWatermark());}}).print();env.execute();}}
8.2.2连接(Connect)
1.连接流(ConnectedStreams)
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
public class CoMapExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream stream1 = env.fromElements(1,2,3);DataStream stream2 = env.fromElements(1L,2L,3L);ConnectedStreams connectedStreams = stream1.connect(stream2);SingleOutputStreamOperator result = connectedStreams.map(new CoMapFunction() {@Override public String map1(Integer value) { return "Integer: " + value; }@Override public String map2(Long value) { return "Long: " + value; }});result.print();env.execute();}
}
// connectedStreams.keyBy(keySelector1, keySelector2);
2.CoProcessFunction
对于连接流ConnectedStreams 的处理操作,需要分别定义对两条流的处理转换,因此接口中就会有两个相同的方法需要实现,用数字“1”“2”区分,在两条流中的数据到来时分别调用。
我们把这种接口叫作“协同处理函数”(co-process function)
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
// 实时对账
public class BillCheckExample {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 来自 app 的支付日志SingleOutputStreamOperator> appStream =env.fromElements(Tuple3.of("order-1","app",1000L),Tuple3.of("order-2","app",2000L)).assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner>() {@Overridepublic long extractTimestamp(Tuple3 element, long recordTimestamp) {return element.f2;}}));// 来自第三方支付平台的支付日志SingleOutputStreamOperator> thirdpartStream = env.fromElements(Tuple4.of("order-1", "third-party", "success", 3000L),Tuple4.of("order-3", "third-party", "success", 4000L)).assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner>() {@Overridepublic long extractTimestamp(Tuple4element, long recordTimestamp) {return element.f3;}}));// 检测同一支付单在两条流中是否匹配,不匹配就报警appStream.connect(thirdpartStream).keyBy(data -> data.f0, data -> data.f0).process(new OrderMatchResult()).print();env.execute();}// 自定义实现 CoProcessFunctionpublic static class OrderMatchResult extends CoProcessFunction, Tuple4, String>{// 定义状态变量,用来保存已经到达的事件private ValueState> appEventState;private ValueState> thirdPartyEventState;@Overridepublic void open(Configuration parameters) throws Exception {appEventState = getRuntimeContext().getState(new ValueStateDescriptor>("app-event", Types.TUPLE(Types.STRING, Types.STRING, Types.LONG)));thirdPartyEventState = getRuntimeContext().getState(new ValueStateDescriptor>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING,Types.STRING,Types.LONG)));}@Overridepublic void processElement1(Tuple3 value,Context ctx,Collector out) throws Exception {// 看另一条流中事件是否来过if (thirdPartyEventState.value() != null){out.collect(" 对 账 成 功 : " + value + " " + thirdPartyEventState.value());// 清空状态thirdPartyEventState.clear();} else {// 更新状态appEventState.update(value);// 注册一个 5 秒后的定时器,开始等待另一条流的事件ctx.timerService().registerEventTimeTimer(value.f2 + 5000L);}}@Overridepublic void processElement2(Tuple4 value,Context ctx, Collector out) throws Exception {if (appEventState.value() != null){out.collect("对账成功:" + appEventState.value() + " " + value);// 清空状态appEventState.clear();} else {// 更新状态thirdPartyEventState.update(value);// 注册一个 5 秒后的定时器,开始等待另一条流的事件ctx.timerService().registerEventTimeTimer(value.f3 + 5000L);}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {// 定时器触发,判断状态,如果某个状态不为空,说明另一条流中事件没来if (appEventState.value() != null) {out.collect("对账失败:" + appEventState.value() + " " + "第三方支付平台信息未到");}if (thirdPartyEventState.value() != null) {out.collect("对账失败:" + thirdPartyEventState.value() + " " + "app信息未到");}appEventState.clear();thirdPartyEventState.clear();}}
}
8.3基于时间的合流——双流联结(Join)
8.3.1窗口联结(Window Join)
1.窗口联结的调用
stream1.join(stream2).where().equalTo().window().apply()上面代码中.where()的参数是键选择器(KeySelector),用来指定第一条流中的 key;
而.equalTo()传入的 KeySelector 则指定了第二条流中的 key。
两者相同的元素,如果在同一窗口中,就可以匹配起来,并通过一个“联结函数”(JoinFunction)进行处理了。
这里.window()传入的就是窗口分配器,之前讲到的三种时间窗口都可以用在这里:滚动窗口(tumbling window)、滑动窗口(sliding window)和会话窗口(session window)。
而后面调用.apply()可以看作实现了一个特殊的窗口函数。注意这里只能调用.apply(),没有其他替代的方法。
2.窗口联结的处理流程
3.窗口联结实例
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class WindowJoinExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream> stream1 = env.fromElements(Tuple2.of("a", 1000L), Tuple2.of("b", 1000L), Tuple2.of("a", 2000L), Tuple2.of("b", 2000L)).assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner>() {@Overridepublic long extractTimestamp(Tuple2 stringLongTuple2, long l) {return stringLongTuple2.f1;}}));DataStream> stream2 = env.fromElements(Tuple2.of("a", 3000L), Tuple2.of("b", 3000L), Tuple2.of("a", 4000L), Tuple2.of("b", 4000L)).assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner>() {@Overridepublic long extractTimestamp(Tuple2 stringLongTuple2, long l) {return stringLongTuple2.f1;}}));stream1.join(stream2).where(new KeySelector, String>() {@Overridepublic String getKey(Tuple2 value) throws Exception {return value.f0;}}).equalTo(new KeySelector, String>() {@Overridepublic String getKey(Tuple2 value) throws Exception {return value.f0;}}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new FlatJoinFunction, Tuple2, String>() {@Overridepublic void join(Tuple2 first, Tuple2 second, Collector out) throws Exception {out.collect(first + "=>" + second);}}).print();env.execute();}
}
8.3.2间隔联结(Interval Join)
stream1.keyBy().intervalJoin(stream2.keyBy()).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction out) {out.collect(left + "," + right);}});
间隔联结实例
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class IntervalJoinExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator> orderStream =env.fromElements(Tuple3.of("Mary", "order-1", 5000L),Tuple3.of("Alice", "order-2", 5000L),Tuple3.of("Bob", "order-3", 20000L),Tuple3.of("Alice", "order-4", 20000L),Tuple3.of("Cary", "order-5", 51000L)).assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner>() {@Overridepublic long extractTimestamp(Tuple3 element, long recordTimestamp) {return element.f2;}}));SingleOutputStreamOperator clickStream = env.fromElements(new Event("Bob", "./cart", 2000L),new Event("Alice", "./prod?id=100", 3000L),new Event("Alice", "./prod?id=200", 3500L),new Event("Bob", "./prod?id=2", 2500L),new Event("Alice", "./prod?id=300", 36000L),new Event("Bob", "./home", 30000L),new Event("Bob", "./prod?id=1", 23000L),new Event("Bob", "./prod?id=3", 33000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));orderStream.keyBy(data -> data.f0).intervalJoin(clickStream.keyBy(data -> data.user)).between(Time.seconds(-5), Time.seconds(10)).process(new ProcessJoinFunction,Event,String>() {@Overridepublic void processElement(Tuple3 left,Event right,Context ctx,Collector out) throws Exception {out.collect(right + " => " + left);}}).print();env.execute();}
}
8.3.3窗口同组联结(Window CoGroup)
stream1.coGroup(stream2).where().equalTo().window(TumblingEventTimeWindows.of(Time.hours(1))).apply()
基于窗口的 join
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class CoGroupExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream> stream1 = env.fromElements(Tuple2.of("a", 1000L),Tuple2.of("b", 1000L),Tuple2.of("a", 2000L),Tuple2.of("b", 2000L)).assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner>() {@Overridepublic long extractTimestamp(Tuple2 stringLongTuple2, long l) {return stringLongTuple2.f1;}}));DataStream> stream2 = env.fromElements(Tuple2.of("a", 3000L),Tuple2.of("b", 3000L),Tuple2.of("a", 4000L),Tuple2.of("b", 4000L)).assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner>() {@Overridepublic long extractTimestamp(Tuple2 stringLongTuple2, long l) {return stringLongTuple2.f1;}}));stream1.coGroup(stream2).where(r -> r.f0).equalTo(r -> r.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction,Tuple2,String>() {@Overridepublic void coGroup(Iterable> iter1,Iterable> iter2,Collector collector) throws Exception {collector.collect(iter1 + "=>" + iter2);}}).print();env.execute();}
}
CP9 状态编程
无状态算子:map、filter、flatMap
有状态算子:sum
(1)算子任务接收到上游发来的数据;
(2)获取当前状态;
(3)根据业务逻辑进行计算,更新状态;
(4)得到计算结果,输出发送到下游任务。
9.2按键分区状态(Keyed State)
9.2.2状态保存:支持的结构类型
1.值状态(ValueState)
public interface ValueState
T value() throws IOException;
void update(T value) throws IOException;
}
⚫T value():获取当前状态的值;
⚫update(T value):对状态进行更新,传入的参数 value 就是要覆写的状态值。
在具体使用时,为了让运行时上下文清楚到底是哪个状态,我们还需要创建一个“状态描述器”(StateDescriptor)来提供状态的基本信息。
public ValueStateDescriptor(String name, Class
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class PeriodicPvExample{public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forMonot onousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element,long recordTimestamp) {return element.timestamp;}}));stream.print("input");// 统计每个用户的 pv,隔一段时间(10s)输出一次结果stream.keyBy(data -> data.user).process(new PeriodicPvResult()).print();env.execute();}// 注册定时器,周期性输出 pvpublic static class PeriodicPvResult extends KeyedProcessFunction{// 定义两个状态,保存当前 pv 值,以及定时器时间戳ValueState countState;ValueState timerTsState;@Overridepublic void open(Configuration parameters) throws Exception {countState = getRuntimeContext().getState(new ValueStateDescriptor("count", Long.class));timerTsState = getRuntimeContext().getState(new ValueStateDescriptor("timerTs", Long.class));}@Overridepublic void processElement(Event value,Context ctx,Collector out) throws Exception {// 更新 count 值Long count = countState.value();if (count == null){countState.update(1L);} else {countState.update(count + 1);}// 注册定时器if (timerTsState.value() == null){ctx.timerService().registerEventTimeTimer(value.timestamp + 10 * 1000L);timerTsState.update(value.timestamp + 10 * 1000L);}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {out.collect(ctx.getCurrentKey() + " pv: " + countState.value());// 清空状态timerTsState.clear();}}
}
2.列表状态(ListState)
⚫Iterable
⚫update(List
⚫add(T value):在状态列表中添加一个元素 value;
⚫addAll(List
类似地,ListState 的状态描述器就叫作 ListStateDescriptor,用法跟 ValueStateDescriptor完全一致。
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
public class TwoStreamFullJoinExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator> stream1 = env.fromElements(Tuple3.of("a", "stream-1", 1000L),Tuple3.of("b", "stream-1", 2000L)).assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner>() {@Overridepublic long extractTimestamp(Tuple3 t, long l) {return t.f2;}}));SingleOutputStreamOperator> stream2 = env.fromElements(Tuple3.of("a", "stream-2", 3000L),Tuple3.of("b", "stream-2", 4000L)).assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner>() {@Overridepublic long extractTimestamp(Tuple3 t, long l) {return t.f2;}}));stream1.keyBy(r -> r.f0).connect(stream2.keyBy(r -> r.f0)).process(new CoProcessFunction, Tuple3, String>() {private ListState> stream1ListState;private ListState> stream2ListState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);stream1ListState = getRuntimeContext().getListState(new ListStateDescriptor>("stream1-list", Types.TUPLE(Types.STRING, Types.STRING)));stream2ListState = getRuntimeContext().getListState(new ListStateDescriptor>("stream2-list", Types.TUPLE(Types.STRING, Types.STRING)));}@Overridepublic void processElement1(Tuple3 left,Context context,Collector collector) throws Exception {stream1ListState.add(left);for (Tuple3 right : stream2ListState.get()) {collector.collect(left + " => " + right);}}@Overridepublic void processElement2(Tuple3 right,Context context,Collector collector) throws Exception {stream2ListState.add(right);for (Tuple3 left : stream1ListState.get()) {collector.collect(left + " => " + right);}}}).print();env.execute();}
}
3.映射状态(MapState)
⚫UV get(UK key):传入一个 key 作为参数,查询对应的 value 值;
⚫put(UK key, UV value):传入一个键值对,更新 key 对应的 value 值;
⚫putAll(Map
⚫remove(UK key):将指定 key 对应的键值对删除;
⚫boolean contains(UK key):判断是否存在指定的 key,返回一个 boolean 值。另外,MapState 也提供了获取整个映射相关信息的方法:
⚫Iterable
⚫Iterable
⚫Iterable
⚫boolean isEmpty():判断映射是否为空,返回一个 boolean 值。
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
public class FakeWindowExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 统计每 10s 窗口内,每个 url 的 pvstream.keyBy(data -> data.url).process(new FakeWindowResult(10000L)).print();env.execute();}public static class FakeWindowResult extends KeyedProcessFunction {// 定义属性,窗口长度private Long windowSize;public FakeWindowResult(Long windowSize) {this.windowSize = windowSize;}// 声明状态,用 map 保存 pv 值(窗口 start,count)MapState windowPvMapState;@Overridepublic void open(Configuration parameters) throws Exception {windowPvMapState = getRuntimeContext().getMapState(new MapStateDescriptor("window-pv", Long.class, Long.class));}@Overridepublic void processElement(Event value, Context ctx, Collector out) throws Exception {// 每来一条数据,就根据时间戳判断属于哪个窗口Long windowStart = value.timestamp / windowSize * windowSize;Long windowEnd = windowStart + windowSize;// 注册 end -1 的定时器,窗口触发计算ctx.timerService().registerEventTimeTimer(windowEnd - 1);// 更新状态中的 pv 值if (windowPvMapState.contains(windowStart)) {Long pv = windowPvMapState.get(windowStart);windowPvMapState.put(windowStart, pv + 1);} else {windowPvMapState.put(windowStart, 1L);}}// 定时器触发,直接输出统计的 pv 结果@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {Long windowEnd = timestamp + 1;Long windowStart = windowEnd - windowSize;Long pv = windowPvMapState.get(windowStart);out.collect("url: " + ctx.getCurrentKey()+ "访问量:" + pv+ " 窗口 :" + new Timestamp(windowStart) + " ~ " + new Timestamp(windowEnd));// 模拟窗口的销毁,清除 map 中的 keywindowPvMapState.remove(windowStart);}}
}
4.归约状态(ReducingState)
public ReducingStateDescriptor(String name, ReduceFunction reduceFunction, Class typeClass) {...}
5.聚合状态(AggregatingState)
public static class MyFlatMapFunction extends RichFlatMapFunction{// 声明状态private transient ValueState state;@Overridepublic void open(Configuration config) {// 在 open 生命周期方法中获取状态ValueStateDescriptor descriptor = new ValueStateDescriptor<>("my state", /*状态名称*/Types.LONG /*状态类型*/);state = getRuntimeContext().getState(descriptor);}@Overridepublic void flatMap(Long input, Collector out) throws Exception {// 访问状态Long currentState = state.value();currentState += 1; // 状态数值加 1// 更新状态state.update(currentState); if (currentState >= 100) {out.collect(“state: ” + currentState); state.clear(); /*清空状态*/}}
}
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
public class AverageTimestampExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 统计每个用户的点击频次,到达 5 次就输出统计结果stream.keyBy(data -> data.user).flatMap(new AvgTsResult()).print();env.execute();}public static class AvgTsResult extends RichFlatMapFunction {// 定义聚合状态,用来计算平均时间戳AggregatingState avgTsAggState;// 定义一个值状态,用来保存当前用户访问频次ValueState countState;@Overridepublic void open(Configuration parameters) throws Exception {avgTsAggState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor,Long>("avg-ts",new AggregateFunction, Long>() {@Overridepublic Tuple2 createAccumulator() {return Tuple2.of(0L, 0L);}@Overridepublic Tuple2 add(Event value, Tuple2 accumulator) {return Tuple2.of(accumulator.f0 + value.timestamp,accumulator.f1 + 1);}@Overridepublic Long getResult(Tuple2 accumulator) {return accumulator.f0 / accumulator.f1;}@Overridepublic Tuple2 merge(Tuple2 a,Tuple2 b) {return null;}},Types.TUPLE(Types.LONG, Types.LONG)));countState = getRuntimeContext().getState(new ValueStateDescriptor("count", Long.class));}@Overridepublic void flatMap(Event value, Collector out) throws Exception {Long count = countState.value();if (count == null) {count = 1L;} else {count++;}countState.update(count);avgTsAggState.add(value);// 达到 5 次就输出结果,并清空状态if (count == 5) {out.collect(value.user + " 平均时间戳: " + new Timestamp(avgTsAggState.get()));countState.clear();}}}
}
9.2.4状态生存时间(TTL)
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("my state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);这里用到了几个配置项:⚫.newBuilder()状态TTL 配置的构造器方法,必须调用,返回一个 Builder 之后再调用.build()方法就可以得到 StateTtlConfig 了。方法需要传入一个 Time 作为参数,这就是设定的状态生存时间。⚫.setUpdateType()设置更新类型。更新类型指定了什么时候更新状态失效时间,这里的 OnCreateAndWrite 表示只有创建状态和更改状态(写操作)时更新失效时间。另一种类型OnReadAndWrite 则表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间。这个配置默认为 OnCreateAndWrite。⚫.setStateVisibility()设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能基于存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里设置的NeverReturnExpired 是默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是 ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,就返回它的值。除此之外,TTL 配置还可以设置在保存检查点(checkpoint)时触发清除操作,或者配置增量的清理(incremental cleanup),还可以针对 RocksDB 状态后端使用压缩过滤器(compaction filter)进行后台清理。关于检查点和状态后端的内容,我们会在后续章节继续讲解。这里需要注意,目前的 TTL 设置只支持处理时间。另外,所有集合类型的状态(例如ListState、MapState)在设置 TTL 时,都是针对每一项(per-entry)元素的。也就是说,一个列表状态中的每一个元素,都会以自己的失效时间来进行清理,而不是整个列表一起清理。
9.3算子状态(Operator State)
9.3.2状态类型:ListState、UnionListState 和 BroadcastState。
1.列表状态(ListState)
2.联合列表状态(UnionListState)
3.广播状态(BroadcastState)
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.util.ArrayList;
import java.util.List;
public class BufferingSinkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element,long recordTimestamp) {return element.timestamp;}}));stream.print("input");// 批量缓存输出stream.addSink(new BufferingSink(10));env.execute();}public static class BufferingSink implements SinkFunction, CheckpointedFunction {private final int threshold;private transient ListState checkpointedState;private List bufferedElements;public BufferingSink(int threshold) {this.threshold = threshold;this.bufferedElements = new ArrayList<>();}@Overridepublic void invoke(Event value,Context context) throws Exception {bufferedElements.add(value);if (bufferedElements.size() == threshold) {for (Event element : bufferedElements) {// 输出到外部系统,这里用控制台打印模拟System.out.println(element);}System.out.println("==========输出完毕=========");bufferedElements.clear();}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedState.clear();// 把当前局部变量中的所有元素写入到检查点中for (Event element : bufferedElements) {checkpointedState.add(element);}}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptor descriptor = new ListStateDescriptor<>("buffered-elements",Types.POJO(Event.class));checkpointedState = context.getOperatorStateStore().getListState(descriptor);// 如果是从故障中恢复,就将 ListState 中的所有元素添加到局部变量中if (context.isRestored()) {for (Event element : checkpointedState.get()) {bufferedElements.add(element);}}}}
}
9.4广播状态(Broadcast State)
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;public class BroadcastStateExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取用户行为事件流DataStreamSource actionStream = env.fromElements(new Action("Alice", "login"),new Action("Alice", "pay"),new Action("Bob", "login"),new Action("Bob", "buy"));// 定义行为模式流,代表了要检测的标准DataStreamSource patternStream = env.fromElements(new Pattern("login", "pay"),new Pattern("login", "buy"));// 定义广播状态的描述器,创建广播流MapStateDescriptor bcStateDescriptor = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));BroadcastStream bcPatterns = patternStream.broadcast(bcStateDescriptor);// 将事件流和广播流连接起来,进行处理DataStream> matches = actionStream.keyBy(data -> data.userId).connect(bcPatterns).process(new PatternEvaluator());matches.print();env.execute();}public static class PatternEvaluator extends KeyedBroadcastProcessFunction> {// 定义一个值状态,保存上一次用户行为ValueState prevActionState;@Overridepublic void open(Configuration conf) {prevActionState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastAction",Types.STRING));}@Overridepublic void processBroadcastElement(Pattern pattern, Context ctx, Collector> out) throws Exception {BroadcastState bcState = ctx.getBroadcastState(new MapStateDescriptor<>("patterns",Types.VOID,Types.POJO(Pattern.class)));// 将广播状态更新为当前的 patternbcState.put(null, pattern);}@Overridepublic void processElement(Action action, ReadOnlyContext ctx, Collector> out) throws Exception {Pattern pattern = ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))).get(null);String prevAction = prevActionState.value();if (pattern != null && prevAction != null) {// 如果前后两次行为都符合模式定义,输出一组匹配if (pattern.action1.equals(prevAction) && pattern.action2.equals(action.action)) {out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));}}// 更新状态prevActionState.update(action.action);}}// 定义用户行为事件 POJO 类public static class Action {public String userId;public String action;public Action() {}public Action(String userId, String action) {this.userId = userId;this.action = action;}@Override public String toString() {return "Action{" +"userId=" + userId + ", action='" + action + '\'' +'}';}}// 定义行为模式 POJO 类,包含先后发生的两个行为public static class Pattern {public String action1;public String action2;public Pattern() {}public Pattern(String action1, String action2) {this.action1 = action1;this.action2 = action2;}@Override public String toString() {return "Pattern{" +"action1='" + action1 + '\'' +", action2='" + action2 + '\'' +'}';}}
}
9.5状态持久化和状态后端
9.5.1检查点(Checkpoint)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();
env.enableCheckpointing(1000);
9.5.2状态后端(State Backends)
1.状态后端的分类
哈希表状态后端(HashMapStateBackend)
内嵌RocksDB 状态后端(EmbeddedRocksDBStateBackend)
状态后端配置:flink-conf.yaml中的state.backend
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.setStateBackend(new EmbeddedRocksDBStateBackend());
CP10 容错机制
10.1.4检查点配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔 1 秒启动一次检查点保存
env.enableCheckpointing(1000);
2.检查点存储(Checkpoint Storage)
// 配置存储检查点到 JobManager 堆内存
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
// 配置存储检查点到文件系统
env.getCheckpointConfig().setCheckpointStorage(new
FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
(1)检查点模式(CheckpointingMode)设置检查点一致性的保证级别,有“精确一次”(exactly-once)和“至少一次”(at-least-once)两个选项。默认级别为exactly-once,而对于大多数低延迟的流处理程序,at-least-once 就够用了,而且处理效率会更高。关于一致性级别,我们会在 10.2 节继续展开。
(2)超时时间(checkpointTimeout)用于指定检查点保存的超时时间,超时没完成就会被丢弃掉。传入一个长整型毫秒数作为参数,表示超时时间。
(3)最小间隔时间(minPauseBetweenCheckpoints)用于指定在上一个检查点完成之后,检查点协调器(checkpoint coordinator)最快等多久可以出发保存下一个检查点的指令。这就意味着即使已经达到了周期触发的时间点,只要距离上一个检查点完成的间隔不够,就依然不能开启下一次检查点的保存。这就为正常处理数据留下了充足的间隙。当指定这个参数时,maxConcurrentCheckpoints 的值强制为 1。
(4)最大并发检查点数量(maxConcurrentCheckpoints)用于指定运行中的检查点最多可以有多少个。由于每个任务的处理进度不同,完全可能出现后面的任务还没完成前一个检查点的保存、前面任务已经开始保存下一个检查点了。这个参数就是限制同时进行的最大数量。如果前面设置了 minPauseBetweenCheckpoints,则 maxConcurrentCheckpoints 这个参数就不起作用了。
(5)开启外部持久化存储(enableExternalizedCheckpoints)用于开启检查点的外部持久化,而且默认在作业失败的时候不会自动清理,如果想释放空间需要自己手工清理。里面传入的参数 ExternalizedCheckpointCleanup 指定了当作业取消的时候外部的检查点该如何清理。⚫DELETE_ON_CANCELLATION:在作业取消的时候会自动删除外部检查点,但是如果是作业失败退出,则会保留检查点。⚫RETAIN_ON_CANCELLATION:作业取消的时候也会保留外部检查点。
(6)检查点异常时是否让整个任务失败(failOnCheckpointingErrors)用于指定在检查点发生异常的时候,是否应该让任务直接失败退出。默认为 true,如果设置为 false,则任务会丢弃掉检查点然后继续运行。
(7)不对齐检查点(enableUnalignedCheckpoints)不再执行检查点的分界线对齐操作,启用之后可以大大减少产生背压时的检查点保存时间。这个设置要求检查点模式(CheckpointingMode)必须为 exctly-once,并且并发的检查点个数为 1。StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用检查点,间隔时间 1 秒
env.enableCheckpointing(1000);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
// 设置精确一次模式
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 最小间隔时间 500 毫秒
checkpointConfig.setMinPauseBetweenCheckpoints(500);
// 超时时间 1 分钟
checkpointConfig.setCheckpointTimeout(60000);
// 同时只能有一个检查点
checkpointConfig.setMaxConcurrentCheckpoints(1);
// 开启检查点的外部持久化保存,作业取消后依然保留
checkpointConfig.enableExternalizedCheckpoints( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 启用不对齐的检查点保存方式
checkpointConfig.enableUnalignedCheckpoints();
// 设置检查点存储,可以直接传入一个 String,指定文件系统的路径
checkpointConfig.setCheckpointStorage("hdfs://my/checkpoint/dir")
CP11 Table API 和SQL
public class TableExample {public static void main(String[] args) throws Exception {// 获取流执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据源SingleOutputStreamOperator eventStream = env.fromElements(new Event("Alice", "./home", 1000L),new Event("Bob", "./cart", 1000L),new Event("Alice", "./prod?id=1", 5 * 1000L), new Event("Cary", "./home", 60 * 1000L),new Event("Bob", "./prod?id=3", 90 * 1000L), new Event("Alice", "./prod?id=7", 105 * 1000L));// 获取表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 将数据流转换成表Table eventTable = tableEnv.fromDataStream(eventStream);// 用执行 SQL 的方式提取数据Table visitTable = tableEnv.sqlQuery("select url, user from " + eventTable);// 将表转换成数据流,打印输出tableEnv.toDataStream(visitTable).print();// 执行程序env.execute();}
}
flinksql demo
// 创建表环境
TableEnvironment tableEnv = ...;
// 创建输入表,连接外部系统读取数据
tableEnv.executeSql("CREATE TEMPORARY TABLE inputTable ... WITH ( 'connector'= ... )");
// 注册一个表,连接到外部系统,用于输出
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector'= ... )");
// 执行 SQL 对表进行查询转换,得到一个新的表
Table table1 = tableEnv.sqlQuery("SELECT ... FROM inputTable... ");
// 使用 Table API 对表进行查询转换,得到一个新的表
Table table2 = tableEnv.from("inputTable").select(...);
// 将得到的结果写入输出表
TableResult tableResult = table1.executeInsert("outputTable");
11.2.3创建表
// 1.连接器表(Connector Tables)
tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable ... WITH ( 'connector'= ... )");
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");
// 2.虚拟表(Virtual Tables)
Table newTable = tableEnv.sqlQuery("SELECT ... FROM MyTable... ");
tableEnv.createTemporaryView("NewTable", newTable);
11.2.4表的查询
// 创建表环境
TableEnvironment tableEnv = ...;
// 创建表
tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )");
// 查询用户 Alice 的点击事件,并提取表中前两个字段
Table aliceVisitTable = tableEnv.sqlQuery( "SELECT user, url FROM EventTable " + "WHERE user = 'Alice' ");
Table urlCountTable = tableEnv.sqlQuery( "SELECT user, COUNT(url) FROM EventTable GROUP BY user ")
// 注册表
tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )");
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");
// 将查询结果输出到 OutputTable 中
tableEnv.executeSql ( "INSERT INTO OutputTable "+"SELECT user, url " + "FROM EventTable " + "WHERE user = 'Alice' ")
//2.调用 Table API 进行查询
Table eventTable = tableEnv.from("EventTable");
Table maryClickTable = eventTable.where($("user").isEqual("Alice")).select($("url"), $("user"));
11.2.5输出表
// 注册表,用于输出数据到外部系统
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");
// 经过查询转换,得到结果表
Table result = ...
// 将结果表写入已注册的输出表中
result.executeInsert("OutputTable");
11.2.6表和流的转换
1.将表(Table)转换成流(DataStream)
(1)调用 toDataStream()方法
Table aliceVisitTable = tableEnv.sqlQuery( "SELECT user, url " +"FROM EventTable " + "WHERE user = 'Alice' ");
(2)调用 toChangelogStream()方法
2.将流(DataStream)转换成表(Table)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 获取表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 读取数据源
SingleOutputStreamOperator eventStream = env.addSource(...)
// 将数据流转换成表
//(1)调用 fromDataStream()方法
Table eventTable = tableEnv.fromDataStream(eventStream);// 提取 Event 中的 timestamp 和 url 作为表中的列
Table eventTable2 = tableEnv.fromDataStream(eventStream, $("timestamp"),$("url"));
// 将 timestamp 字段重命名为 ts
Table eventTable2 = tableEnv.fromDataStream(eventStream, $("timestamp").as("ts"),$("url"));
// (2)调用createTemporaryView()方法
tableEnv.createTemporaryView("EventTable", eventStream,$("timestamp").as("ts"),$("url"));
// (3)调用 fromChangelogStream ()方法
StreamTableEnvironment tableEnv = ...; DataStream stream = ...;
// 将数据流转换成动态表,动态表只有一个字段,重命名为 myLong
Table table = tableEnv.fromDataStream(stream, $("myLong"));StreamTableEnvironment tableEnv = ...; DataStream stream = ...;
Table table = tableEnv.fromDataStream(stream);
Table table = tableEnv.fromDataStream(stream, $("user"));
Table table = tableEnv.fromDataStream(stream, $("user").as("myUser"),$("url").as("myUrl"));DataStream dataStream = env.fromElements(Row.ofKind(RowKind.INSERT, "Alice", 12),Row.ofKind(RowKind.INSERT, "Bob", 5),Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));// 将更新日志流转换为表
Table table = tableEnv.fromChangelogStream(dataStream);
4.综合应用示例
public class TableToStreamExample {public static void main(String[] args) throws Exception {// 获取流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据源SingleOutputStreamOperator eventStream = env.fromElements(new Event("Alice", "./home", 1000L),new Event("Bob", "./cart", 1000L),new Event("Alice", "./prod?id=1", 5 * 1000L), new Event("Cary", "./home", 60 * 1000L),new Event("Bob", "./prod?id=3", 90 * 1000L), new Event("Alice", "./prod?id=7", 105 * 1000L));// 获取表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 将数据流转换成表tableEnv.createTemporaryView("EventTable", eventStream);// 查询 Alice 的访问 url 列表Table aliceVisitTable = tableEnv.sqlQuery("SELECT url, user FROM EventTable WHERE user = 'Alice'");// 统计每个用户的点击次数Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) FROM EventTable GROUP BY user");// 将表转换成数据流,在控制台打印输出tableEnv.toDataStream(aliceVisitTable).print("alice visit"); tableEnv.toChangelogStream(urlCountTable).print("count");// 执行程序env.execute();}
}
11.3.2将流转换成动态表
// 获取流环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 读取数据源
SingleOutputStreamOperator eventStream = env.fromElements(new Event("Alice", "./home", 1000L),new Event("Bob", "./cart", 1000L),new Event("Alice", "./prod?id=1", 5 * 1000L), new Event("Cary", "./home", 60 * 1000L),new Event("Bob", "./prod?id=3", 90 * 1000L),new Event("Alice", "./prod?id=7", 105 * 1000L));
// 获取表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 将数据流转换成表
tableEnv.createTemporaryView("EventTable", eventStream, $("user"), $("url"),$("timestamp").as("ts"));
// 统计每个用户的点击次数
Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user");
// 将表转换成数据流,在控制台打印输出
tableEnv.toChangelogStream(urlCountTable).print("count");
// 执行程序
env.execute();
11.3.2用 SQL 持续查询
1.更新(Update)查询
Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user");
2.追加(Append)查询
Table aliceVisitTable = tableEnv.sqlQuery("SELECT url, user FROM EventTable WHERE user = 'Cary'");
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class AppendQueryExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);// 读取数据源,并分配时间戳、生成水位线SingleOutputStreamOperator eventStream = env.fromElements(new Event("Alice", "./home", 1000L),new Event("Bob", "./cart", 1000L),new Event("Alice", "./prod?id=1", 25 * 60 * 1000L),new Event("Alice", "./prod?id=4", 55 * 60 * 1000L),new Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L), new Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L), new Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 创建表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 将数据流转换成表,并指定时间属性Table eventTable = tableEnv.fromDataStream( eventStream,$("user"),$("url"),$("timestamp").rowtime().as("ts")); // 将 timestamp 指定为事件时间,并命名为 ts// 为方便在SQL中引用,在环境中注册表 EventTable tableEnv.createTemporaryView("EventTable", eventTable);// 设置 1 小时滚动窗口,执行 SQL 统计查询Table result = tableEnv.sqlQuery("SELECT " +"user, " +"window_end AS endT, " + // 窗口结束时间"COUNT(url) AS cnt " + // 统计 url 访问次数"FROM TABLE( " +"TUMBLE( TABLE EventTable, " + // 1 小时滚动窗口"DESCRIPTOR(ts), " + "INTERVAL '1' HOUR)) " +"GROUP BY user, window_start, window_end ");tableEnv.toDataStream(result).print();env.execute();}
}
11.3.3将动态表转换为流
⚫ 仅追加(Append-only)流
⚫ 撤回(Retract)流
⚫ 更新插入(Upsert)流
11.4时间属性和窗口
11.4.1事件时间
CREATE TABLE events ( user STRING,url STRING, ts BIGINT,ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),WATERMARK FOR ts_ltz AS time_ltz - INTERVAL '5' SECOND
)
WITH (...);// 方法一:
// 流中数据类型为二元组 Tuple2,包含两个字段;需要自定义提取时间戳并生成水位线
DataStream> stream = inputStream.assignTimestampsAndWatermarks(...);
// 声明一个额外的逻辑字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user"), $("url"),$("ts").rowtime());// 方法二:
// 流中数据类型为三元组 Tuple3,最后一个字段就是事件时间戳
DataStream> stream = inputStream.assignTimestampsAndWatermarks(...);
// 不再声明额外字段,直接用最后一个字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user"), $("url"),$("ts").rowtime());
11.4.2处理时间
CREATE TABLE EventTable( user STRING,url STRING,ts AS PROCTIME()
) WITH (...);DataStream> stream = ...;
// 声明一个额外的字段作为处理时间属性字段
Table table = tEnv.fromDataStream(stream, $("user"), $("url"),$("ts").proctime());
11.5聚合(Aggregation)查询
Table result = tableEnv.sqlQuery("SELECT " +"user, " +"window_end AS endT, " + "COUNT(url) AS cnt " +"FROM TABLE( " +"TUMBLE( TABLE EventTable, " + "DESCRIPTOR(ts), " + "INTERVAL '1' HOUR)) " +"GROUP BY user, window_start, window_end "
);
package com.atguigu.chapter11;
https://blog.csdn.net/JiaXingNashishua/article/details/127141341
import com.atguigu.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;public class CumulateWindowExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据源,并分配时间戳、生成水位线SingleOutputStreamOperator eventStream = env.fromElements(new Event("Alice", "./home", 1000L),new Event("Bob", "./cart", 1000L),new Event("Alice", "./prod?id=1", 25 * 60 * 1000L),new Event("Alice", "./prod?id=4", 55 * 60 * 1000L),new Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L),new Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L),new Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 创建表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 将数据流转换成表,并指定时间属性Table eventTable = tableEnv.fromDataStream(eventStream,$("user"),$("url"),$("timestamp").rowtime().as("ts"));// 为方便在 SQL 中引用,在环境中注册表 EventTabletableEnv.createTemporaryView("EventTable", eventTable);// 设置累积窗口,执行 SQL 统计查询Table result = tableEnv.sqlQuery("SELECT " +"user, " +"window_end AS endT, " +"COUNT(url) AS cnt " +"FROM TABLE( " +"CUMULATE( TABLE EventTable, " + // 定义累积窗口"DESCRIPTOR(ts), " +"INTERVAL '30' MINUTE, " +"INTERVAL '1' HOUR)) " +"GROUP BY user, window_start, window_end ");tableEnv.toDataStream(result).print();env.execute();}
}
11.5.4应用实例 —— Top N
1.普通 Top N
SELECT ... FROM (SELECT ...,ROW_NUMBER() OVER ([PARTITION BY <字段 1>[, <字段 1>...]] ORDER BY <排序字段 1> [asc|desc][, <排序字段 2> [asc|desc]...]) AS row_num FROM ...)
WHERE row_num <= N [AND <其它条件>]
2.窗口 Top N
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;public class WindowTopNExample {public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);// 读取数据源,并分配时间戳、生成水位线SingleOutputStreamOperator eventStream = env.fromElements(new Event("Alice", "./home", 1000L),new Event("Bob", "./cart", 1000L),new Event("Alice", "./prod?id=1", 25 * 60 * 1000L), new Event("Alice", "./prod?id=4", 55 * 60 * 1000L),new Event("Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L), new Event("Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L), new Event("Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)).assignTimestampsAndWatermarks( WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 创建表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 将数据流转换成表,并指定时间属性Table eventTable = tableEnv.fromDataStream(eventStream,$("user"),$("url"),$("timestamp").rowtime().as("ts")); //将timestamp指定为事件时间,并命名为ts// 为方便在SQL中引用,在环境中注册表 EventTable tableEnv.createTemporaryView("EventTable", eventTable);// 定义子查询,进行窗口聚合,得到包含窗口信息、用户以及访问次数的结果表String subQuery ="SELECT window_start, window_end, user, COUNT(url) as cnt " + "FROM TABLE ( " +"TUMBLE( TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR ))" +"GROUP BY window_start, window_end, user ";// 定义 Top N 的外层查询String topNQuery ="SELECT * " + "FROM (" + "SELECT *, " + "ROW_NUMBER() OVER ( " +"PARTITION BY window_start, window_end " + "ORDER BY cnt desc " + ") AS row_num " +"FROM (" + subQuery + ")) " + "WHERE row_num <= 2";// 执行SQL得到结果表Table result = tableEnv.sqlQuery(topNQuery); tableEnv.toDataStream(result).print();env.execute();}
}
11.7.2自定义函数(UDF)
// (1)注册函数
tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);
//(2)使用Table API 调用函数
tableEnv.from("MyTable").select(call("MyFunction", $("myField")));
tableEnv.from("MyTable").select(call(SubstringFunction.class, $("myField"))); //(3)在 SQL 中调用函数
tableEnv.sqlQuery("SELECT MyFunction(myField) FROM MyTable");
// 2.标量函数(Scalar Functions)
public static class HashFunction extends ScalarFunction {// 接受任意类型输入,返回 INT 型输出public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) { return o.hashCode();}
}
// 注册函数
tableEnv.createTemporarySystemFunction("HashFunction", HashFunction.class);
// 在 SQL 里调用注册好的函数
tableEnv.sqlQuery("SELECT HashFunction(myField) FROM MyTable");
表函数
//实现一个分隔字符串的函数 SplitFunction,将一个字符串转换成(字符串,长度)的二元组。
// 注意这里的类型标注,输出是 Row 类型,Row 中包含两个字段:word 和 length。
@FunctionHint(output = @DataTypeHint("ROW"))
public static class SplitFunction extends TableFunction {public void eval(String str) {for (String s : str.split(" ")) {// 在 SQL 里调用注册好的函数// 1. 交叉联结tableEnv.sqlQuery("SELECT myField, word, length " +"FROM MyTable, LATERAL TABLE(SplitFunction(myField))");// 2. 带 ON TRUE 条件的左联结tableEnv.sqlQuery("SELECT myField, word, length " + "FROM MyTable " +"LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE");// 重命名侧向表中的字段tableEnv.sqlQuery("SELECT myField, newWord, newLength " + "FROM MyTable " +"LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE");}}
}
聚合函数(AggregateFunction)
// 累加器类型定义
public static class WeightedAvgAccumulator { public long sum = 0; // 加权和public int count = 0; // 数据个数
}
自定义聚合函数,输出为长整型的平均值,累加器类型为 WeightedAvgAccumulator
public static class WeightedAvg extends AggregateFunction {@Overridepublic WeightedAvgAccumulator createAccumulator() { return new WeightedAvgAccumulator();}// 创建累加器@Overridepublic Long getValue(WeightedAvgAccumulator acc) { if (acc.count == 0) {return null;} // 防止除数为 0else {return acc.sum / acc.count;} // 计算平均值并返回}// 累加计算方法,每来一行数据都会调用public void accumulate(WeightedAvgAccumulator acc, Long iValue, Integer iWeight) {acc.sum += iValue * iWeight;acc.count += iWeight;}
}// 注册自定义聚合函数
tableEnv.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);
// 调用函数计算加权平均值
Table result = tableEnv.sqlQuery("SELECT student, WeightedAvg(score, weight) FROM ScoreTable GROUP BY student");
表聚合函数
// 聚合累加器的类型定义,包含最大的第一和第二两个数据
public static class Top2Accumulator { public Integer first;public Integer second;
}// 自定义表聚合函数,查询一组数中最大的两个,返回值为(数值,排名)的二元组
public static class Top2 extends TableAggregateFunction, Top2Accumulator> {@Overridepublic Top2Accumulator createAccumulator() {Top2Accumulator acc = new Top2Accumulator();acc.first = Integer.MIN_VALUE; // 为方便比较,初始值给最小值acc.second = Integer.MIN_VALUE; return acc;}
// 每来一个数据调用一次,判断是否更新累加器if (value > acc.first) { acc.second = acc.first; acc.first = value;} else if (value > acc.second) { acc.second = value;}
}// 输出(数值,排名)的二元组,输出两行数据
public void emitValue(Top2Accumulator acc, Collector> out) {if (acc.first != Integer.MIN_VALUE) { out.collect(Tuple2.of(acc.first, 1));}if (acc.second != Integer.MIN_VALUE) { out.collect(Tuple2.of(acc.second, 2));}
}// 注册表聚合函数函数
tableEnv.createTemporarySystemFunction("Top2", Top2.class);// 在 Table API 中调用函数
tableEnv.from("MyTable").groupBy($("myField")).flatAggregate(call("Top2", $("value")).as("value", "rank")).select($("myField"), $("value"), $("rank"));
11.9 连接外部系统
//(1)kafka
org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version}
org.apache.flink flink-csv ${flink.version}
CREATE TABLE KafkaTable (`user` STRING,`url` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH ('connector' = 'kafka', 'topic' = 'events','properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset','format' = 'csv'
)// Upsert Kafka
CREATE TABLE pageviews_per_region ( user_region STRING,pv BIGINT, uv BIGINT,PRIMARY KEY (user_region) NOT ENFORCED) WITH ('connector' = 'upsert-kafka', 'topic' = 'pageviews_per_region','properties.bootstrap.servers' = '...', 'key.format' = 'avro','value.format' = 'avro');
CREATE TABLE pageviews ( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING,WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND) WITH ('connector' = 'kafka','topic' = 'pageviews','properties.bootstrap.servers' = '...', 'format' = 'json');
-- 计算 pv、uv 并插入到 upsert-kafka 表中
INSERT INTO pageviews_per_region SELECT user_region, COUNT(*),COUNT(DISTINCT user_id) FROM pageviews GROUP BY user_region;//(2)fileSystem 文件系统
CREATE TABLE MyTable (column_name1 INT, column_name2 STRING,...part_name1 INT, part_name2 STRING
) PARTITIONED BY (part_name1, part_name2)
WITH ('connector' = 'filesystem', -- 连接器类型'path' = '...', -- 文件路径'format' = '...' -- 文件格式)// JDBC
org.apache.flink flink-connector-jdbc_${scala.binary.version} ${flink.version}
mysql mysql-connector-java 5.1.38
-- 创建一张连接到 MySQL 的 表
CREATE TABLE MyTable ( id BIGINT,name STRING, age INT,status BOOLEAN,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/mydatabase', 'table-name' = 'users'
);-- 将另一张表 T 的数据写入到 MyTable 表中
INSERT INTO MyTable
SELECT id, name, age, status FROM T;// (3)Elasticsearch
org.apache.flink flink-connector-elasticsearch7_${scala.binary.version}${flink.version}
-- 创建一张连接到 Elasticsearch 的 表
CREATE TABLE MyTable (user_id STRING, user_name STRING, uv BIGINT,pv BIGINT,PRIMARY KEY (user_id) NOT ENFORCED
) WITH ('connector' = 'elasticsearch-7', 'hosts' = 'http://localhost:9200', 'index' = 'users'
);// (4)HBase
org.apache.flink flink-connector-hbase-1.4_${scala.binary.version} ${flink.version}
org.apache.flink flink-connector-hbase-2.2_${scala.binary.version} ${flink.version}
-- 创建一张连接到 HBase 的 表
CREATE TABLE MyTable (rowkey INT,family1 ROW,family2 ROW,family3 ROW, PRIMARY KEY (rowkey) NOT ENFORCED
) WITH ('connector' = 'hbase-1.4', 'table-name' = 'mytable','zookeeper.quorum' = 'localhost:2181'
);-- 假设表 T 的字段结构是 [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
INSERT INTO MyTable SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;// (5)Hive
org.apache.flink flink-connector-hive_${scala.binary.version} ${flink.version}
org.apache.hive hive-exec ${hive.version}
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir = "/opt/hive-conf";
// 创建一个 HiveCatalog,并在表环境中注册
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
// 使用 HiveCatalog 作为当前会话的
catalog tableEnv.useCatalog("myhive");Flink SQL> create catalog myhive with ('type' = 'hive', 'hive-conf-dir' = '/opt/hive-conf');[INFO] Execute statement succeed.
Flink SQL> use catalog myhive; [INFO] Execute statement succeed.// (6)设置sql方言
// 我们可以通过配置table.sql-dialect 属性来设置SQL 方言:
set table.sql-dialect=hive;
// 在配置文件 sql-cli-defaults.yaml 中通过“configuration”模块来设置
execution: planner: blink type: batch
result-mode: table
configuration:table.sql-dialect: hive// Table API 中设置
// 配置 hive 方言
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// 配置 default 方言
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);//4.读写 Hive 表
-- 设置 SQL 方言为 hive,创建 Hive 表
SET table.sql-dialect=hive; CREATE TABLE hive_table (user_id STRING, order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ('partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='metastore,success-file');-- 设置 SQL 方言为 default,创建 Kafka 表
SET table.sql-dialect=default; CREATE TABLE kafka_table (user_id STRING, order_amount DOUBLE, log_ts TIMESTAMP(3),WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND – 定义水位线
) WITH (...);-- 将 Kafka 中读取的数据经转换后写入 Hive INSERT INTO TABLE hive_table
SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
