Flink写入Doris的实时应用

想,全是问题;做,全是办法。

想深入交流doris的私聊我,加微信

引言

做实时数仓的同学对目前比较流行的KFC(Kafka\Flink\ClickHouse)套餐非常熟悉,其实KFD也不错。
大数据组件越来越丰富,但是还没有出现一个兼容OLAP和OLTP的工具,即满足DB和日志的实时存储和复杂查询,又能满足在此基础上的数仓建设,我们尝试过ClickHouse,缺点在于难维护、实时写入效率低,内部碎片合并和数据走zk难以实现大量数据的实时存储;之后使用过impala+kudu,缺点是impala实在是太占用内存,两者结合用起来比较费劲,也是开发了实时同步DB的工具,维护成本太高,也放弃了;最终在参考百度的doris和作业帮的资料下,正式的开始使用Doris,实现了log和DB(包含分表合并)的准实时同步,以及基于doris的数仓建模。
接下来我会就Doris的实时写入部分简单的说一下实现方式,代码和注释为主

表设计

不要把字段设计成"not null",好处在于后期改表(加字段)不会影响正常的数据,其他的暂时不方便透露,之后会慢慢讲

JSONStreamLoad

为什么选择StreamLoad呢?一开始使用的是insert into,insert into是使用的FE资源的,导致FE繁忙,后期数据量上来会出问题,而streamload不存在这个问题,官方是这么说的(0.12的文档):

Stream load 中,Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。导入的最终结果由 Coordinator BE 返回给用户。^      +|      ||      | 1A. User submit load to FE|      ||   +--v-----------+|   | FE           |- Return result to user |   +--+-----------+|      ||      | 2. Redirect to BE|      ||   +--v-----------++---+Coordinator BE| 1B. User submit load to BE+-+-----+----+-+|     |    |+-----+     |    +-----+|           |          | 3. Distrbute data|           |          |+-v-+       +-v-+      +-v-+|BE |       |BE |      |BE |+---+       +---+      +---+

之后参考京东做法,不断的load小文件实现,实时的数据插入。
踩坑:

  • 尽量的数据量要大,避免多次提交,而会出现线程占有的问题
  • load是以DB为单位的,一个DB默认100个线程,控制好load的线程数
  • load是很耗内存的,一是线程,二是数据合并
  • streaming_load_max_batch_size_mb默认是100,根据业务进行更改
  • 如果要同步DB的数据注意多线程执行curl

实现起来比较简单,无非是在flinkSink代码中嵌入一段执行curl的代码

## 原curl
curl --location-trusted -u 用户名:密码 -T /xxx/test -H "format: json" -H "strip_outer_array: true" http://doris_fe:8030/api/{database}/{table}/_stream_load
## -u 不用解释了,用户名和密码
## -T json文件的地址,内容为[json,json,json],就是jsonlist
## -H 指定参数
## http 指定库名和表名

步骤:生成临时文件createFile,将数据写入临时文件mappedFile,执行execCurl, 删除临时文件deleteFile (简化版)

/*** 创建临时内存文件* @param fileName* @throws IOException*/public static void createFile(String fileName) throws IOException {File testFile = new File(fileName);File fileParent = testFile.getParentFile();if (!fileParent.exists()) {fileParent.mkdirs();}if (!testFile.exists())testFile.createNewFile();}/*** 删除临时内存文件* @param fileName* @return*/public static boolean deleteFile(String fileName) {boolean flag = false;File file = new File(fileName);// 路径为文件且不为空则进行删除if (file.isFile() && file.exists()) {file.delete();flag = true;}return flag;}/*** 写入内存文件* @param data* @param path*/public static void mappedFile(String data, String path) {CharBuffer charBuffer = CharBuffer.wrap(data);try {FileChannel fileChannel = FileChannel.open(Paths.get(path), StandardOpenOption.READ, StandardOpenOption.WRITE,StandardOpenOption.TRUNCATE_EXISTING);MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, data.getBytes().length*4);if (mappedByteBuffer != null) {mappedByteBuffer.clear();mappedByteBuffer.put(Charset.forName("UTF-8").encode(charBuffer));}fileChannel.close();} catch (IOException e) {e.printStackTrace();}}/*** 执行curl* @param curl* @return*/public static String execCurl(String[] curl) {ProcessBuilder process = new ProcessBuilder(curl);Process p;try {p = process.start();BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));StringBuilder builder = new StringBuilder();String line = null;while ((line = reader.readLine()) != null) {builder.append(line);builder.append(System.getProperty("line.separator"));}return builder.toString();} catch (IOException e) {System.out.print("error");e.printStackTrace();}return null;}/*** 生成Culr* @param filePath* @param databases* @param table* @return*/public static String[] createCurl(String filePath, String databases, String table){String[] curl = {"curl","--location-trusted", "-u", "用户名:密码", "-T",filePath, "-H","format: json", "-H", "strip_outer_array: true", "http://doris_fe:8030/api/"+databases+"/"+table+"/_stream_load"};return curl;}

flink

实现自定义Sink比较简单,这里就简单的分享一下我的怎么写的(简化版)。

class LogCurlSink(insertTimenterval:Long,insertBatchSize:Int) extends RichSinkFunction[(String, Int, Long, String)] with Serializable{private val Logger = LoggerFactory.getLogger(this.getClass)private val mesList = new java.util.ArrayList[String]()private var lastInsertTime = 0Loverride def open(parameters: Configuration): Unit ={val path = s"/tmp/doris/{databases}/{table}/{ThreadId}"CurlUtils.createFile(path)Logger.warn(s"init and create $topic filePath!!!")}// (topic,partition,offset,jsonstr)override def invoke(value: (String, Int, Long, String), context: SinkFunction.Context[_]): Unit = {if(mesList.size >= this.insertBatchSize || isTimeToDoInsert){//存入insertData(mesList)//此处可以进行受到维护offsetmesList.clear()this.lastInsertTime = System.currentTimeMillis()}mesList.add(value._4)}override def close(): Unit = {val path = s"/tmp/doris/{databases}/{table}/{ThreadId}"CurlUtils.deleteFile(path)Logger.warn("close and delete filePath!!!")}/*** 执行插入操作* @param dataList*/private def insertData(dataList: java.util.ArrayList[String]): Unit ={}/*** 根据时间判断是否插入数据** @return*/private def isTimeToDoInsert = {val currTime = System.currentTimeMilliscurrTime - this.lastInsertTime >= this.insertCkTimenterval}}


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部