Flink DataStream读写Hudi
一、pom依赖
测试案例中,pom依赖如下,根据需要自行删减。
4.0.0 com.test Examples 1.0-SNAPSHOT 1.8 1.8 UTF-8 2.11.8 2.11 2.6.0 1.14.5 2.0.0 1.2.0 0.12.0 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version} org.apache.flink flink-statebackend-rocksdb_${scala.binary.version} ${flink.version} org.apache.flink flink-runtime-web_${scala.binary.version} ${flink.version} org.apache.flink flink-table-api-java-bridge_2.11 ${flink.version} org.apache.flink flink-table-planner_2.11 ${flink.version} org.apache.hudi hudi-flink1.14-bundle ${hudi.version} org.apache.hive hive-exec core 2.3.1 org.apache.hadoop hadoop-common ${hadoop.version} slf4j-log4j12 org.slf4j log4j log4j org.apache.hadoop hadoop-client ${hadoop.version} org.apache.hadoop hadoop-hdfs ${hadoop.version} slf4j-log4j12 org.slf4j log4j log4j org.apache.logging.log4j log4j-slf4j-impl provided 2.17.1 org.apache.logging.log4j log4j-api provided 2.17.1 org.apache.logging.log4j log4j-core provided 2.17.1 com.alibaba.fastjson2 fastjson2 2.0.16 org.springframework.boot spring-boot-starter-data-rest 2.7.0 org.springframework.boot spring-boot-starter-logging com.typesafe config 1.2.1 ${pom.artifactId}-${pom.version} org.apache.maven.plugins maven-compiler-plugin 3.3 1.8 1.8 maven-assembly-plugin jar-with-dependencies com.test.main.Examples make-assembly package single src/main/resources environment/dev/* environment/test/* environment/smoke/* environment/pre/* environment/online/* application.properties src/main/resources/environment/${environment} . dev dev true test test smoke smoke online online
Hudi官网文档链接:
Flink Guide | Apache Hudi
二、DataStream API方式读写Hudi
2.1 写Hudi
package com.test.hudi;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;import java.util.HashMap;
import java.util.Map;public class FlinkDataStreamWrite2HudiTest {public static void main(String[] args) throws Exception {// 1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.必须开启checkpoint 默认有5个checkpoint后,hudi目录下才会有数据;不然只有一个.hoodie目录String checkPointPath = "hdfs://hw-cdh-test02:8020/flinkinfo/meta/savepoints/FlinkDataStreamWrite2HudiTest";StateBackend backend = new EmbeddedRocksDBStateBackend(true);env.setStateBackend(backend);CheckpointConfig conf = env.getCheckpointConfig();// 任务流取消和故障应保留检查点conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);conf.setCheckpointInterval(1000);//millisecondsconf.setCheckpointTimeout(10 * 60 * 1000);//millisecondsconf.setMinPauseBetweenCheckpoints(2 * 1000);//相邻两次checkpoint之间的时间间隔conf.setCheckpointStorage(checkPointPath);// 3.准备数据DataStreamSource studentDS = env.fromElements(new Student(101L, "Johnson", 17L, "swimming"),new Student(102L, "Lin", 15L, "shopping"),new Student(103L, "Tom", 5L, "play"));// 4.创建Hudi数据流// 4.1 Hudi表名和路径String studentHudiTable = "ods_student_table";String studentHudiTablePath = "hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/" + studentHudiTable;Map studentOptions = new HashMap<>();studentOptions.put(FlinkOptions.PATH.key(), studentHudiTablePath);studentOptions.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());HoodiePipeline.Builder studentBuilder = HoodiePipeline.builder(studentHudiTable).column("id BIGINT").column("name STRING").column("age BIGINT").column("hobby STRING").pk("id")
// .pk("id,age")// 可以设置联合主键,用逗号分隔.options(studentOptions);// 5.转成RowData流DataStream studentRowDataDS = studentDS.map(new MapFunction() {@Overridepublic RowData map(Student value) throws Exception {try {Long id = value.id;String name = value.name;Long age = value.age;String hobby = value.hobby;GenericRowData row = new GenericRowData(4);row.setField(0, Long.valueOf(id));row.setField(1, StringData.fromString(name));row.setField(2, Long.valueOf(age));row.setField(3, StringData.fromString(hobby));return row;} catch (Exception e) {e.printStackTrace();return null;}}});studentBuilder.sink(studentRowDataDS, false);env.execute("FlinkDataStreamWrite2HudiTest");}public static class Student{public Long id;public String name;public Long age;public String hobby;public Student() {}public Student(Long id, String name, Long age, String hobby) {this.id = id;this.name = name;this.age = age;this.hobby = hobby;}public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Long getAge() {return age;}public void setAge(Long age) {this.age = age;}public String getHobby() {return hobby;}public void setHobby(String hobby) {this.hobby = hobby;}@Overridepublic String toString() {return "Student{" +"id=" + id +", name='" + name + '\'' +", age=" + age +", hobby='" + hobby + '\'' +'}';}}
}
案例中,通过env.fromElements造三条数据写入Hudi,通过查询,可证明3条数据写入成功:

在实际开发中,需要切换数据源,比如从kafka读取数据,写入Hudi,将上面的数据源进行替换,并完成RowData转换即可。(切记,一定要开启checkpoint,否则只有一个,hoodie目录。本人在这里踩过坑,调了一个下午,数据都没有写入成功,只有一个hoodie目录,后来经过研究才知道需要设置checkpoint。本案例中,由于是造的三条数据,跑完之后程序就停了,不设置checkpoint,数据也会写入hudi表;但是如果正在的流计算,从kafka读数据,写入hudi,如果不设置checkpoint,数据最终无法写入hudi表)。
2.2 读Hudi
package com.test.hudi;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;import java.util.HashMap;
import java.util.Map;public class FlinkDataStreamReadFromHudiTest {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.创建Hudi数据流String studentHudiTable = "ods_student_table";String studentHudiTablePath = "hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/" + studentHudiTable;Map studentOptions = new HashMap<>();studentOptions.put(FlinkOptions.PATH.key(), studentHudiTablePath);studentOptions.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());studentOptions.put(FlinkOptions.READ_AS_STREAMING.key(), "true");// this option enable the streaming readstudentOptions.put(FlinkOptions.READ_START_COMMIT.key(), "16811748000000");// specifies the start commit instant timestudentOptions.put(FlinkOptions.READ_STREAMING_CHECK_INTERVAL.key(), "4");//studentOptions.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");//HoodiePipeline.Builder studentBuilder = HoodiePipeline.builder(studentHudiTable).column("id BIGINT").column("name STRING").column("age BIGINT").column("hobby STRING").pk("id").options(studentOptions);DataStream studentRowDataDS = studentBuilder.source(env);// 3. 数据转换与输出DataStream studentDS = studentRowDataDS.map(new MapFunction() {@Overridepublic Student map(RowData value) throws Exception {try {String rowKind = value.getRowKind().name();Long id = value.getLong(0);String name = value.getString(1).toString();Long age = value.getLong(2);String hobby = value.getString(3).toString();Student student = new Student(id, name, age, hobby, rowKind);return student;} catch (Exception e) {e.printStackTrace();return null;}}});studentDS.print();env.execute("FlinkDataStreamReadFromHudiTest");}public static class Student{public Long id;public String name;public Long age;public String hobby;public String rowKind;public Student() {}public Student(Long id, String name, Long age, String hobby, String rowKind) {this.id = id;this.name = name;this.age = age;this.hobby = hobby;this.rowKind = rowKind;}public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Long getAge() {return age;}public void setAge(Long age) {this.age = age;}public String getHobby() {return hobby;}public void setHobby(String hobby) {this.hobby = hobby;}public String getRowKind() {return rowKind;}public void setRowKind(String rowKind) {this.rowKind = rowKind;}@Overridepublic String toString() {return "Student{" +"id=" + id +", name='" + name + '\'' +", age=" + age +", hobby='" + hobby + '\'' +", rowKind='" + rowKind + '\'' +'}';}}
}
输出结果:

其中,rowKind,是对行的描述,有 INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE,分别对应op的 +I, -U, +U, -D,表示 插入、更新前、更新后、删除 操作。
三、Table API方式读写Hudi
3.1 写Hudi
3.1.1 数据来自DataStream
package com.test.hudi;import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkDataStreamSqlWrite2HudiTest {public static void main(String[] args) throws Exception {// 1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);// 2.必须开启checkpoint 默认有5个checkpoint后,hudi目录下才会有数据;不然只有一个.hoodie目录String checkPointPath = "hdfs://hw-cdh-test02:8020/flinkinfo/meta/savepoints/FlinkDataStreamWrite2HudiTest";StateBackend backend = new EmbeddedRocksDBStateBackend(true);env.setStateBackend(backend);CheckpointConfig conf = env.getCheckpointConfig();// 任务流取消和故障应保留检查点conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);conf.setCheckpointInterval(1000);//millisecondsconf.setCheckpointTimeout(10 * 60 * 1000);//millisecondsconf.setMinPauseBetweenCheckpoints(2 * 1000);//相邻两次checkpoint之间的时间间隔conf.setCheckpointStorage(checkPointPath);// 3.准备数据,真实环境中,这里可以替换成从kafka读取数据DataStreamSource studentDS = env.fromElements(new Student(201L, "zhangsan", 117L, "eat"),new Student(202L, "lisi", 115L, "drink"),new Student(203L, "wangwu", 105L, "sleep"));// 由于后续没有DataStream的执行算子,可以会报错:// Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.// 不过不影响数据写入Hudi// 当然,也可以加一步DataStream的执行算子,比如 print
// studentDS.print("DataStream: ");// 4.通过DataStream创建表// 4.1 第一个参数:表名;第二个参数:DataStream;第三个可选参数:指定列名,可以指定DataStream中的元素名和列名的匹配关系,比如 "userId as user_id, name, age, hobby"tabEnv.registerDataStream("tmp_student_table", studentDS, "id, name, age, hobby");// 5.准备Hudi表的数据流,并将数据写入Hudi表tabEnv.executeSql("" +"CREATE TABLE out_ods_student_table(\n" +" id BIGINT COMMENT '学号',\n" +" name STRING\t COMMENT '姓名',\n" +" age BIGINT COMMENT '年龄',\n" +" hobby STRING COMMENT '爱好',\n" +" PRIMARY KEY (id) NOT ENFORCED\n" +")\n" +"WITH(\n" +" 'connector' = 'hudi',\n" +" 'path' = 'hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/ods_student_table',\n" +" 'table.type' = 'MERGE_ON_READ',\n" +" 'compaction.async.enabled' = 'true',\n" +" 'compaction.tasks' = '1',\n" +" 'compaction.trigger.strategy' = 'num_commits',\n" +" 'compaction.delta_commits' = '3',\n" +" 'hoodie.cleaner.policy'='KEEP_LATEST_COMMITS',\n" +" 'hoodie.cleaner.commits.retained'='30',\n" +" 'hoodie.keep.min.commits'='35' ,\n" +" 'hoodie.keep.max.commits'='40'\n" +")");tabEnv.executeSql("insert into out_ods_student_table select id,name,age,hobby from tmp_student_table");env.execute("FlinkDataStreamSqlWrite2HudiTest");}public static class Student{public Long id;public String name;public Long age;public String hobby;public Student() {}public Student(Long id, String name, Long age, String hobby) {this.id = id;this.name = name;this.age = age;this.hobby = hobby;}public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Long getAge() {return age;}public void setAge(Long age) {this.age = age;}public String getHobby() {return hobby;}public void setHobby(String hobby) {this.hobby = hobby;}@Overridepublic String toString() {return "Student{" +"id=" + id +", name='" + name + '\'' +", age=" + age +", hobby='" + hobby + '\'' +'}';}}
}
通过查看Hudi表,证明3条数据写入成功:

3.1.2 数据来自Table
package com.test.hudi;import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkValuesSqlWrite2HudiTest {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);// 2.必须开启checkpoint 默认有5个checkpoint后,hudi目录下才会有数据;不然只有一个.hoodie目录String checkPointPath = "hdfs://hw-cdh-test02:8020/flinkinfo/meta/savepoints/FlinkDataStreamWrite2HudiTest";StateBackend backend = new EmbeddedRocksDBStateBackend(true);env.setStateBackend(backend);CheckpointConfig conf = env.getCheckpointConfig();// 任务流取消和故障应保留检查点conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);conf.setCheckpointInterval(1000);//millisecondsconf.setCheckpointTimeout(10 * 60 * 1000);//millisecondsconf.setMinPauseBetweenCheckpoints(2 * 1000);//相邻两次checkpoint之间的时间间隔conf.setCheckpointStorage(checkPointPath);// 3.准备Hudi表的数据流,并将数据写入Hudi表tabEnv.executeSql("" +"CREATE TABLE out_ods_student_table(\n" +" id BIGINT COMMENT '学号',\n" +" name STRING\t COMMENT '姓名',\n" +" age BIGINT COMMENT '年龄',\n" +" hobby STRING COMMENT '爱好',\n" +" PRIMARY KEY (id) NOT ENFORCED\n" +")\n" +"WITH(\n" +" 'connector' = 'hudi',\n" +" 'path' = 'hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/ods_student_table',\n" +" 'table.type' = 'MERGE_ON_READ',\n" +" 'compaction.async.enabled' = 'true',\n" +" 'compaction.tasks' = '1',\n" +" 'compaction.trigger.strategy' = 'num_commits',\n" +" 'compaction.delta_commits' = '3',\n" +" 'hoodie.cleaner.policy'='KEEP_LATEST_COMMITS',\n" +" 'hoodie.cleaner.commits.retained'='30',\n" +" 'hoodie.keep.min.commits'='35' ,\n" +" 'hoodie.keep.max.commits'='40'\n" +")");tabEnv.executeSql("" +"insert into out_ods_student_table values\n" +" (301, 'xiaoming', 201, 'read'),\n" +" (302, 'xiaohong', 202, 'write'),\n" +" (303, 'xiaogang', 203, 'sing')");env.execute("FlinkValuesSqlWrite2HudiTest");}
}
通过查看Hudi表,证明3条数据写入成功:

3.2 读Hudi
package com.test.hudi;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkSqlReadFromHudiTest {public static void main(String[] args) throws Exception {// 1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);// 2.准备Hudi表的数据流,并从Hudi表读取数据tabEnv.executeSql("" +"CREATE TABLE out_ods_student_table(\n" +" id BIGINT COMMENT '学号',\n" +" name STRING\t COMMENT '姓名',\n" +" age BIGINT COMMENT '年龄',\n" +" hobby STRING COMMENT '爱好',\n" +" PRIMARY KEY (id) NOT ENFORCED\n" +")\n" +"WITH(\n" +" 'connector' = 'hudi',\n" +" 'path' = 'hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/ods_student_table',\n" +" 'table.type' = 'MERGE_ON_READ',\n" +" 'compaction.async.enabled' = 'true',\n" +" 'compaction.tasks' = '1',\n" +" 'compaction.trigger.strategy' = 'num_commits',\n" +" 'compaction.delta_commits' = '3',\n" +" 'hoodie.cleaner.policy'='KEEP_LATEST_COMMITS',\n" +" 'hoodie.cleaner.commits.retained'='30',\n" +" 'hoodie.keep.min.commits'='35' ,\n" +" 'hoodie.keep.max.commits'='40'\n" +")");tabEnv.executeSql("select id,name,age,hobby from out_ods_student_table").print();env.execute("FlinkSqlReadFromHudiTest");}
}
输出结果:

四、补充
4.1 联合主键
在Flink Table操作Hudi的时候,可能会涉及到联合组件,可以在SQL中加入联合主键。比如:
tabEnv.executeSql("" +"CREATE TABLE out_ods_userinfo_table_test(\n" +" province_id BIGINT COMMENT '省份编号',\n" +" user_id BIGINT COMMENT '用户编号',\n" +" name STRING\t COMMENT '姓名',\n" +" age BIGINT COMMENT '年龄',\n" +" hobby STRING COMMENT '爱好',\n" +" PRIMARY KEY (province_id,user_id) NOT ENFORCED\n" +")\n" +"WITH(\n" +" 'connector' = 'hudi',\n" +" 'path' = 'hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/ods_userinfo_table_test',\n" +" 'table.type' = 'MERGE_ON_READ',\n" +" 'hoodie.datasource.write.keygenerator.class'='org.apache.hudi.keygen.ComplexKeyGenerator',\n" +" 'hoodie.datasource.write.recordkey.field'= 'province_id,user_id',\n" +" 'compaction.async.enabled' = 'true',\n" +" 'compaction.tasks' = '1',\n" +" 'compaction.trigger.strategy' = 'num_commits',\n" +" 'compaction.delta_commits' = '3',\n" +" 'hoodie.cleaner.policy'='KEEP_LATEST_COMMITS',\n" +" 'hoodie.cleaner.commits.retained'='30',\n" +" 'hoodie.keep.min.commits'='35' ,\n" +" 'hoodie.keep.max.commits'='40'\n" +")");
4.2 读取指定时间之后的数据
根据官方文档说明,可以读取指定提交时间之后的数据。比如指定时间20230318130057,那么读取到的都是提交时间在 2023-03-18 13:00:57 之后的数据,之前的数据读取不到。
4.2.1 DataStream API
options.put(FlinkOptions.READ_START_COMMIT.key(), "'20230318130057'"); // specifies the start commit instant time
4.2.2 Table API
'read.start-commit' = '20230318130057', -- specifies the start commit instant time
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
