Flink:JDBC Sink

Flink:JDBC Sink

使用JDBC Sink需要先导入依赖(我的Flink为1.12)

<dependency><groupId>org.apache.flinkgroupId><artifactId>flink-connector-jdbc_2.11artifactId><version>1.12.0version>
dependency>

编码:

创建JDBCSink类

package com.lzy.exer;import com.lzy.day02.bean.WaterSensor;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.sql.PreparedStatement;
import java.sql.SQLException;public class JDBCSink {public static void main(String[] args) throws Exception {// 创建流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从端口读取数据DataStreamSource<String> streamSource = env.socketTextStream("hadoop102", 9999);// 将读取到的数据转化为JavaBeanSingleOutputStreamOperator<WaterSensor> waterSensorDStream = streamSource.map(line -> {String[] fields = line.split(" ");return new WaterSensor(fields[0],Long.valueOf(fields[1]),Integer.valueOf(fields[2]));});// 使用JDBC Sink将数据写入MySQLwaterSensorDStream.addSink(JdbcSink.sink("insert into test_sensor values(?,?,?)",new JdbcStatementBuilder<WaterSensor>() {@Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {preparedStatement.setString(1,waterSensor.getId());preparedStatement.setLong(2,waterSensor.getTs());preparedStatement.setInt(3,waterSensor.getVc());}},new JdbcExecutionOptions.Builder().withBatchSize(1).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.jdbc.Driver").withUrl("jdbc:mysql://hadoop102:3306/flink_test?useSSL=false").withUsername("root").withPassword("123456").build()));// 执行env.execute();}
}

创建数据库flink_test:

create database flink_test;

创建表test_sensor:

create table test_sensor(id VARCHAR(255),ts bigint,vc int
);

在虚拟机中使用nc工具,向端口发送数据:

nc -lk 9999

测试数据:

sensor_1 1 1
sensor_1 2 2

查看MySQL中是否有结果:

MySQL中的结果


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部