Flink集成Mybatis

        最近在使用Flink写了一个流式处理项目,最后是把处理的结果写到mysql里面,虽然Flink官方提供JDBC Connector,但是毕竟JDBC太原始了,用Mybatis+Druid不香吗,经过一段时间的摸索和测试,把最后的代码以及遇到的一些问题分享一下。

相关依赖:

        pom.xml

org.mybatismybatis3.3.0

mysqlmysql-connector-java6.0.4

com.alibabadruid1.2.4

org.projectlomboklombok1.16.22true

Mybatis配置:

           mybatis_conf.xml




Mapper :

        mapper.xml



insert into .........

 代码:

import com.alibaba.druid.pool.DruidDataSource;
import org.apache.ibatis.datasource.pooled.PooledDataSourceFactory;public class DruidDataSourceFactory extends PooledDataSourceFactory {public DruidDataSourceFactory() {this.dataSource = new DruidDataSource();}
}

        

import lombok.extern.log4j.Log4j;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;import java.io.InputStream;import static com.util.Utils.getResourceAsStream;@Log4j
public class MybatisSink extends RichSinkFunction {private static SqlSessionFactory sqlSessionFactory;static {try (InputStream inputStream = getResourceAsStream("mybatis_conf.xml")) {sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream);} catch (Exception e) {log.error(e.getMessage(), e);}}private String sqlId;public MybatisSink(String sqlId) {this.sqlId = sqlId;}@Overridepublic void invoke(T value, Context context) {try (SqlSession sqlSession = sqlSessionFactory.openSession(true)) {sqlSession.insert(sqlId, value); } catch (Exception ex) {log.error(ex.getMessage(), ex);}}
}
@Log4j
public class Utils {public static InputStream getResourceAsStream(String s) {return Utils.class.getClassLoader().getResourceAsStream(s);}
}

        在Flink代码中直接在类型DataStream上调用addSink(new MybatisSink<>("com.example.mapper.updateActive"))来使用MybatisSink来操作数据库。

        这个代码并不复杂,但是有一些值得注意的地方。Mybatis的使用主要问题就在于SqlSessionFactory和SqlSession的创建与使用,SqlSessionFactory在代码中应该使用单例模式;而SqlSession是线程不安全的,应该作为局部变量,并且应该及时调用close方法关闭资源,在这个代码中直接使用最简单的静态单例模式以及try-with-resources语句来解决这两个问题。最开始我的代码是这样写的:


public class MybatisSink extends RichSinkFunction {private String sql;private SqlSessionFactory sqlSessionFactory;public MybatisSink(String sql) {this.sql = sql;}@Overridepublic void open(Configuration parameters) throws Exception {sqlSessionFactory = getSqlSessionFactory();}@Overridepublic void invoke(Row value, Context context) {try (SqlSession sqlSession = sqlSessionFactory.openSession()) {}      }
}

        这样写其实是有问题的,SqlSessionFactory可能不是单例的。Flink是以多进程+多线程模式执行任务的,比如有3个Task Manager,6个Task Slot,那么一般而言就是3个进程,每个进程里面有2个线程在执行任务,每个线程叫做一个子任务(SubTask)。而每个子任务都会单独创建一个Sink的实例,按照上面这种写法每个Sink实例都会去创建一个SqlSessionFactory实例,而每创建一个SqlSessionFactory实例都会去创建一个DataSource实例,虽然这样写代码是可以运行的,但是显然是不科学的,在一个进程中所有的线程应该是共享一个线程池才对。


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部