Flink:JDBC Sink
Flink:JDBC Sink
使用JDBC Sink需要先导入依赖(我的Flink为1.12)
<dependency><groupId>org.apache.flinkgroupId><artifactId>flink-connector-jdbc_2.11artifactId><version>1.12.0version>
dependency>
编码:
创建JDBCSink类
package com.lzy.exer;import com.lzy.day02.bean.WaterSensor;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.sql.PreparedStatement;
import java.sql.SQLException;public class JDBCSink {public static void main(String[] args) throws Exception {// 创建流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从端口读取数据DataStreamSource<String> streamSource = env.socketTextStream("hadoop102", 9999);// 将读取到的数据转化为JavaBeanSingleOutputStreamOperator<WaterSensor> waterSensorDStream = streamSource.map(line -> {String[] fields = line.split(" ");return new WaterSensor(fields[0],Long.valueOf(fields[1]),Integer.valueOf(fields[2]));});// 使用JDBC Sink将数据写入MySQLwaterSensorDStream.addSink(JdbcSink.sink("insert into test_sensor values(?,?,?)",new JdbcStatementBuilder<WaterSensor>() {@Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {preparedStatement.setString(1,waterSensor.getId());preparedStatement.setLong(2,waterSensor.getTs());preparedStatement.setInt(3,waterSensor.getVc());}},new JdbcExecutionOptions.Builder().withBatchSize(1).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.jdbc.Driver").withUrl("jdbc:mysql://hadoop102:3306/flink_test?useSSL=false").withUsername("root").withPassword("123456").build()));// 执行env.execute();}
}
创建数据库flink_test:
create database flink_test;
创建表test_sensor:
create table test_sensor(id VARCHAR(255),ts bigint,vc int
);
在虚拟机中使用nc工具,向端口发送数据:
nc -lk 9999
测试数据:
sensor_1 1 1
sensor_1 2 2
查看MySQL中是否有结果:

本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
