大数据之flink中join用法

一、join用法

1、将两个流中的数据进行join处理

package cn._51doit.flink.day05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class TumblingWindowJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//1000,A,1DataStreamSource<String> leftLines = env.socketTextStream("localhost", 8888);//2000,A,2DataStreamSource<String> rightLines = env.socketTextStream("localhost", 9999);//提取第一个流中数据的EventTimeDataStream<String> leftWaterMarkStream = leftLines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {@Overridepublic long extractTimestamp(String line) {return Long.parseLong(line.split(",")[0]);}});//提取第二个流中数据的EventTimeDataStream<String> rightWaterMarkStream = rightLines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {@Overridepublic long extractTimestamp(String line) {return Long.parseLong(line.split(",")[0]);}});//对第一个流整理成tuple3DataStream<Tuple3<Long, String, String>> leftStream = leftWaterMarkStream.map(new MapFunction<String, Tuple3<Long, String, String>>() {@Overridepublic Tuple3<Long, String, String> map(String value) throws Exception {String[] fields = value.split(",");return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);}});//对第二个流整理成tuple3DataStream<Tuple3<Long, String, String>> rightStream = rightWaterMarkStream.map(new MapFunction<String, Tuple3<Long, String, String>>() {@Overridepublic Tuple3<Long, String, String> map(String value) throws Exception {String[] fields = value.split(",");return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);}});//第一个流(左流)调用join方法关联第二个流(右流),并且在where方法和equalTo方法中分别指定两个流join的条件DataStream<Tuple6<Long, String, String, Long, String, String>> joinedStream = leftStream.join(rightStream).where(new KeySelector<Tuple3<Long, String, String>, String>() {@Overridepublic String getKey(Tuple3<Long, String, String> value) throws Exception {return value.f1; //将左流tuple3中的f1作为join的key}}).equalTo(new KeySelector<Tuple3<Long, String, String>, String>() {@Overridepublic String getKey(Tuple3<Long, String, String> value) throws Exception {return value.f1; //将右流tuple3中的f1作为join的key}}).window(TumblingEventTimeWindows.of(Time.seconds(5))) //划分EventTime滚动窗口,窗口长度为5秒.apply(new MyInnerJoinFunction()); //在apply方法中传入自定义的MyInnerJoinFunctionjoinedStream.print(); //调用print sink 输出结果env.execute("TumblingWindowJoinDemo");}}
package cn._51doit.flink.day05;import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;public class MyInnerJoinFunction implements JoinFunction<Tuple3<Long, String, String>, //第一个数据流(左流)输入的数据类型Tuple3<Long, String, String>, //第二个数据流(右流)输入的数据类型Tuple6<Long, String, String, Long, String, String>> { //join后输出的数据类型//第一个流和第二个流输入的数据在同一个时间窗口内并且join的key相同才会调用join方法@Overridepublic Tuple6<Long, String, String, Long, String, String> join(Tuple3<Long, String, String> left, //第一个数据流(左流)输入的一条数据Tuple3<Long, String, String> right) //第二个数据流(右流)输入的一条数据throws Exception {//能join将两个流的数据放入tuple6中,并返回输出return Tuple6.of(left.f0, left.f1, left.f2, right.f0, right.f1, right.f2);}
}

2、左外连接

package cn._51doit.flink.day05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class TumblingWindowLeftOuterJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//1000,A,1DataStreamSource<String> leftSteam = env.socketTextStream("localhost", 8888);//2000,A,2DataStreamSource<String> rightStream = env.socketTextStream("localhost", 9999);//提取第一个流中数据的EventTimeDataStream<String> leftWaterMarkStream = leftSteam.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {@Overridepublic long extractTimestamp(String line) {return Long.parseLong(line.split(",")[0]);}});//提取第二个流中数据的EventTimeDataStream<String> rightWaterMarkStream = rightStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {@Overridepublic long extractTimestamp(String line) {return Long.parseLong(line.split(",")[0]);}});//对第一个流整理成tuple3DataStream<Tuple3<Long, String, String>> leftTuple = leftWaterMarkStream.map(new MapFunction<String, Tuple3<Long, String, String>>() {@Overridepublic Tuple3<Long, String, String> map(String value) throws Exception {String[] fields = value.split(",");return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);}});//对第二个流整理成tuple3DataStream<Tuple3<Long, String, String>> rightTuple = rightWaterMarkStream.map(new MapFunction<String, Tuple3<Long, String, String>>() {@Overridepublic Tuple3<Long, String, String> map(String value) throws Exception {String[] fields = value.split(",");return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);}});//第一个流(左流)和第二个流(右流)进行LeftOuterJoin//在同一个窗口,并且join的条件相等,第一个流中的数据没join上也输出DataStream<Tuple6<Long, String, String, Long, String, String>> joinedStream = leftTuple.coGroup(rightTuple).where(new KeySelector<Tuple3<Long, String, String>, String>() {@Overridepublic String getKey(Tuple3<Long, String, String> value) throws Exception {return value.f1;}}).equalTo(new KeySelector<Tuple3<Long, String, String>, String>() {@Overridepublic String getKey(Tuple3<Long, String, String> value) throws Exception {return value.f1;}}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new MyLeftOuterJoinFunction());joinedStream.print();env.execute();}
}
package cn._51doit.flink.day05;import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.util.Collector;public class MyLeftOuterJoinFunction implements CoGroupFunction<Tuple3<Long, String, String>, //左流输入的数据类型Tuple3<Long, String, String>, //右流输入的数据类型Tuple6<Long, String, String, Long, String, String>> { //输出的数据类型@Overridepublic void coGroup(Iterable<Tuple3<Long, String, String>> first,Iterable<Tuple3<Long, String, String>> second,Collector<Tuple6<Long, String, String, Long, String, String>> out) throws Exception {//循环左流的数据,如果有数据说明触发窗口时左流中有数据for (Tuple3<Long, String, String> left : first) {boolean hasJoined = false;//循环右流的数据,如果有数据说明触发窗口时右流中有数据,即join上流for (Tuple3<Long, String, String> right : second) {//返回两个流join上的数据out.collect(Tuple6.of(left.f0, left.f1, left.f2, right.f0, right.f1, right.f2));hasJoined = true;}//如果没有join上,只返回左流的数据if (!hasJoined) {out.collect(Tuple6.of(left.f0, left.f1, left.f2, null, null, null));}}}
}

3、右外连接

package cn._51doit.flink.day05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class TumblingWindowRightOuterJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//1000,A,1DataStreamSource<String> leftSteam = env.socketTextStream("localhost", 8888);//2000,A,2DataStreamSource<String> rightStream = env.socketTextStream("localhost", 9999);//提取第一个流中数据的EventTimeDataStream<String> leftWaterMarkStream = leftSteam.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {@Overridepublic long extractTimestamp(String line) {return Long.parseLong(line.split(",")[0]);}});//提取第二个流中数据的EventTimeDataStream<String> rightWaterMarkStream = rightStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {@Overridepublic long extractTimestamp(String line) {return Long.parseLong(line.split(",")[0]);}});//对第一个流整理成tuple3DataStream<Tuple3<Long, String, String>> leftTuple = leftWaterMarkStream.map(new MapFunction<String, Tuple3<Long, String, String>>() {@Overridepublic Tuple3<Long, String, String> map(String value) throws Exception {String[] fields = value.split(",");return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);}});//对第二个流整理成tuple3DataStream<Tuple3<Long, String, String>> rightTuple = rightWaterMarkStream.map(new MapFunction<String, Tuple3<Long, String, String>>() {@Overridepublic Tuple3<Long, String, String> map(String value) throws Exception {String[] fields = value.split(",");return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);}});//调用coGroup实现left joinDataStream<Tuple6<Long, String, String, Long, String, String>> joinedStream = leftTuple.coGroup(rightTuple).where(new KeySelector<Tuple3<Long, String, String>, String>() {@Overridepublic String getKey(Tuple3<Long, String, String> value) throws Exception {return value.f1;}}).equalTo(new KeySelector<Tuple3<Long, String, String>, String>() {@Overridepublic String getKey(Tuple3<Long, String, String> value) throws Exception {return value.f1;}}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new MyRightOuterJoinFunction());joinedStream.print();env.execute();}
}
package cn._51doit.flink.day05;import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.util.Collector;public class MyRightOuterJoinFunction implements CoGroupFunction<Tuple3<Long, String, String>, //左流输入的数据类型Tuple3<Long, String, String>, //右流输入的数据类型Tuple6<Long, String, String, Long, String, String>> { //输出的数据类型@Overridepublic void coGroup(Iterable<Tuple3<Long, String, String>> first,Iterable<Tuple3<Long, String, String>> second,Collector<Tuple6<Long, String, String, Long, String, String>> out) throws Exception {//循环右流的数据,如果有数据说明触发窗口时右流中有数据for (Tuple3<Long, String, String> right : second) {boolean hasJoined = false;//循环左流的数据,如果有数据说明触发窗口时左流中有数据,即join上流for (Tuple3<Long, String, String> left : first) {//返回两个流join上的数据out.collect(Tuple6.of(left.f0, left.f1, left.f2, right.f0, right.f1, right.f2));hasJoined = true;}//如果没有join上,只返回右流的数据if (!hasJoined) {out.collect(Tuple6.of(null, null, null, right.f0, right.f1, right.f2));}}}
}

4、interval Join

key相等,设置数据存活的范围

package cn._51doit.flink.day05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;public class IntervalJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//1000,A,1DataStreamSource<String> leftLines = env.socketTextStream("localhost", 8888);//2000,A,2DataStreamSource<String> rightLines = env.socketTextStream("localhost", 9999);//提取第一个流中数据的EventTimeDataStream<String> leftWaterMarkStream = leftLines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {@Overridepublic long extractTimestamp(String line) {return Long.parseLong(line.split(",")[0]);}});//提取第二个流中数据的EventTimeDataStream<String> rightWaterMarkStream = rightLines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {@Overridepublic long extractTimestamp(String line) {return Long.parseLong(line.split(",")[0]);}});//对第一个流整理成tuple3DataStream<Tuple3<Long, String, String>> leftStream = leftWaterMarkStream.map(new MapFunction<String, Tuple3<Long, String, String>>() {@Overridepublic Tuple3<Long, String, String> map(String value) throws Exception {String[] fields = value.split(",");return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);}});//对第二个流整理成tuple3DataStream<Tuple3<Long, String, String>> rightStream = rightWaterMarkStream.map(new MapFunction<String, Tuple3<Long, String, String>>() {@Overridepublic Tuple3<Long, String, String> map(String value) throws Exception {String[] fields = value.split(",");return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);}});DataStream<Tuple6<Long, String, String, Long, String, String>> joinedStream = leftStream.keyBy(t -> t.f1) //指定第一个流分组KeySelector.intervalJoin(rightStream.keyBy(t -> t.f1)) //调用intervalJoin方法并指定第二个流的分组KeySelector.between(Time.seconds(-1), Time.seconds(1)) //设置join的时间区间范围为当前数据时间±1秒.upperBoundExclusive() //默认join时间范围为前后都包括的闭区间,现在设置为前闭后开区间.process(new MyProcessJoinFunction()); //调用process方法中传入自定义的MyProcessJoinFunctionjoinedStream.print(); //调用print sink 输出结果env.execute("IntervalJoinDemo");}
}

二、关联mysql查询维度数据

地理位置信息:使用httpClient查询高德地图

DimDemo

package cn._51doit.flink.day05;import cn._51doit.flink.day05.func.GeoRichMapFunction;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;public class DimDemo {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<LogBean> logBeanDataStream = lines.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String value) throws Exception {LogBean bean = null;try {bean = JSON.parseObject(value, LogBean.class);} catch (Exception e) {e.printStackTrace();}return bean;}});SingleOutputStreamOperator<LogBean> filtered = logBeanDataStream.filter(e -> e != null);//关联维度信息SingleOutputStreamOperator<LogBean> logBeanWithNameDataStream = filtered.map(new RichMapFunction<LogBean, LogBean>() {private transient Connection connection;private transient PreparedStatement prepareStatement;@Overridepublic void open(Configuration parameters) throws Exception {connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=utf-8", "root", "123456");prepareStatement = connection.prepareStatement("select id, name from tb_category where id = ?");}@Overridepublic LogBean map(LogBean value) throws Exception {prepareStatement.setInt(1, value.cid);ResultSet resultSet = prepareStatement.executeQuery();String name = null;if (resultSet.next()) {name = resultSet.getString(2);}resultSet.close();value.name = name;return value;}@Overridepublic void close() throws Exception {if (prepareStatement != null) {prepareStatement.close();}if (connection != null) {connection.close();}}});//查询经纬度,关联位置信息SingleOutputStreamOperator<LogBean> result = logBeanWithNameDataStream.map(new GeoRichMapFunction("4924f7ef5c86a278f5500851541cdcff"));result.print();env.execute();}
}

LogBean

package cn._51doit.flink.day05;public class LogBean {public String oid;public Integer cid;public Double money;public Double longitude;public Double latitude;public String name;public String province;public String city;@Overridepublic String toString() {return "LogBean{" +"oid='" + oid + '\'' +", cid=" + cid +", money=" + money +", longitude=" + longitude +", latitude=" + latitude +", name='" + name + '\'' +", province='" + province + '\'' +", city='" + city + '\'' +'}';}
}

GeoRichMapFunction

package cn._51doit.flink.day05.func;import cn._51doit.flink.day05.LogBean;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;public class GeoRichMapFunction extends RichMapFunction<LogBean, LogBean> {private String key;public GeoRichMapFunction(String key) {this.key = key;}private transient CloseableHttpClient httpclient;@Overridepublic void open(Configuration parameters) throws Exception {httpclient = HttpClients.createDefault();}@Overridepublic LogBean map(LogBean bean) throws Exception {double longitude = bean.longitude;double latitude = bean.latitude;HttpGet httpGet = new HttpGet("https://restapi.amap.com/v3/geocode/regeo?&location="+ longitude+"," +latitude+ "&key=" + key);CloseableHttpResponse response = httpclient.execute(httpGet);try {//System.out.println(response.getStatusLine)HttpEntity entity = response.getEntity();// do something useful with the response body// and ensure it is fully consumedString province = null;String city = null;if (response.getStatusLine().getStatusCode() == 200) {//获取请求的json字符串String result = EntityUtils.toString(entity);//转成json对象JSONObject jsonObj = JSON.parseObject(result);//获取位置信息JSONObject regeocode = jsonObj.getJSONObject("regeocode");if (regeocode != null && !regeocode.isEmpty()) {JSONObject address = regeocode.getJSONObject("addressComponent");//获取省市区bean.province = address.getString("province");bean.city = address.getString("city");}}} finally {response.close();}return bean;}@Overridepublic void close() throws Exception {if (httpclient != null) {httpclient.close();}}
}


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部