flink cdc DataStream api 时区问题
flink cdc DataStream api 时区问题
以postgrsql 作为数据源时,Date和timesatmp等类型cdc同步读出来时,会发现一下几个问题:
时间,日期等类型的数据对应的会转化为Int,long等类型。源表同步后,时间相差8小时。这是因为时区不同的缘故。
源表:

sink 表:

解决方案:在自定义序列化时进行处理。
java code
package pg.cdc.ds;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.text.SimpleDateFormat;
import java.time.ZoneId;
import java.util.Date;
import java.util.List;public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {ZoneId serverTimeZone;@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {//1.创建JSON对象用于存储最终数据JSONObject result = new JSONObject();Struct value = (Struct) sourceRecord.value();//2.获取库名&表名Struct sourceStruct = value.getStruct("source");String database = sourceStruct.getString("db");String schema = sourceStruct.getString("schema");String tableName = sourceStruct.getString("table");//3.获取"before"数据Struct before = value.getStruct("before");JSONObject beforeJson = new JSONObject();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");if (before != null) {Schema beforeSchema = before.schema();List<Field> beforeFields = beforeSchema.fields();for (Field field : beforeFields) {Object beforeValue = before.get(field);if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.MicroTimestamp".equals(field.schema().name())) {if (beforeValue != null) {long times = (long) beforeValue / 1000;String dateTime = sdf.format(new Date((times - 8 * 60 * 60 * 1000)));beforeJson.put(field.name(), dateTime);}}else if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.NanoTimestamp".equals(field.schema().name())) {if (beforeValue != null) {long times = (long) beforeValue;String dateTime = sdf.format(new Date((times - 8 * 60 * 60 * 1000)));beforeJson.put(field.name(), dateTime);}} else if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.Timestamp".equals(field.schema().name())) {if (beforeValue != null) {long times = (long) beforeValue;String dateTime = sdf.format(new Date((times - 8 * 60 * 60 )));beforeJson.put(field.name(), dateTime);}} else if("int32".equals(field.schema().type().getName()) && "io.debezium.time.Date".equals(field.schema().name())){if(beforeValue != null) {int times = (int) beforeValue;String dateTime = sdf1.format(new Date(times * 24 * 60 * 60L * 1000));beforeJson.put(field.name(), dateTime);}}else {beforeJson.put(field.name(), beforeValue);}}}//4.获取"after"数据Struct after = value.getStruct("after");JSONObject afterJson = new JSONObject();if (after != null) {Schema afterSchema = after.schema();List<Field> afterFields = afterSchema.fields();for (Field field : afterFields) {Object afterValue = after.get(field);if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.MicroTimestamp".equals(field.schema().name())) {if (afterValue != null) {long times = (long) afterValue / 1000;String dateTime = sdf.format(new Date((times - 8 * 60 * 60 * 1000)));afterJson.put(field.name(), dateTime);}}else if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.NanoTimestamp".equals(field.schema().name())) {if (afterValue != null) {long times = (long) afterValue;String dateTime = sdf.format(new Date((times - 8 * 60 * 60 * 1000)));afterJson.put(field.name(), dateTime);}} else if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.Timestamp".equals(field.schema().name())) {if (afterValue != null) {long times = (long) afterValue;String dateTime = sdf.format(new Date((times - 8 * 60 * 60)));afterJson.put(field.name(), dateTime);}}else if("int32".equals(field.schema().type().getName()) && "io.debezium.time.Date".equals(field.schema().name())){if(afterValue != null) {int times = (int) afterValue;String dateTime = sdf1.format(new Date(times * 24 * 60 * 60L * 1000));afterJson.put(field.name(), dateTime);}}else {afterJson.put(field.name(), afterValue);}}}//5.获取操作类型 CREATE UPDATE DELETEEnvelope.Operation operation = Envelope.operationFor(sourceRecord);String type = operation.toString().toLowerCase();if ("create".equals(type) || "read".equals(type)) {type = "insert";}//6.将字段写入JSON对象result.put("database", database);result.put("schema", schema);result.put("tableName", tableName);result.put("before", beforeJson);result.put("after", afterJson);result.put("type", type);//7.输出数据collector.collect(result.toJSONString());}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}
}
scala code
import com.ververica.cdc.debezium.DebeziumDeserializationSchema
import com.ververica.cdc.debezium.utils.TemporalConversions
import io.debezium.time._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
import org.apache.kafka.connect.data.{SchemaBuilder, Struct}
import org.apache.kafka.connect.source.SourceRecord
import java.sql
import java.time.{Instant, LocalDateTime, ZoneId}
import scala.collection.JavaConverters._
import scala.util.parsing.json.JSONObjectclass StructDebeziumDeserializationSchema(serverTimeZone: String) extends DebeziumDeserializationSchema[Row] {override def deserialize(sourceRecord: SourceRecord, collector: Collector[Row]): Unit = {// 解析主键val key = sourceRecord.key().asInstanceOf[Struct]val keyJs = parseStruct(key)// 解析值val value = sourceRecord.value().asInstanceOf[Struct]val source = value.getStruct("source")val before = parseStruct(value.getStruct("before"))val after = parseStruct(value.getStruct("after"))val row = Row.withNames()row.setField("table", s"${source.get("db")}.${source.get("table")}")row.setField("key", keyJs)row.setField("op", value.get("op"))row.setField("op_ts", LocalDateTime.ofInstant(Instant.ofEpochMilli(source.getInt64("ts_ms")), ZoneId.of(serverTimeZone)))row.setField("current_ts", LocalDateTime.ofInstant(Instant.ofEpochMilli(value.getInt64("ts_ms")), ZoneId.of(serverTimeZone)))row.setField("before", before)row.setField("after", after)collector.collect(row)}/** 解析[[Struct]]结构为json字符串 */private def parseStruct(struct: Struct): String = {if (struct == null) return nullval map = struct.schema().fields().asScala.map(field => {val v = struct.get(field)val typ = field.schema().name()println(s"$v, $typ, ${field.name()}")val value = v match {case long if long.isInstanceOf[Long] => convertLongToTime(long.asInstanceOf[Long], typ)case iv if iv.isInstanceOf[Int] => convertIntToDate(iv.asInstanceOf[Int], typ)case iv if iv == null => nullcase _ => convertObjToTime(v, typ)}(field.name(), value)}).filter(_._2 != null).toMapJSONObject.apply(map).toString()}/** 类型转换 */private def convertObjToTime(obj: Any, typ: String): Any = {typ match {case Time.SCHEMA_NAME | MicroTime.SCHEMA_NAME | NanoTime.SCHEMA_NAME =>sql.Time.valueOf(TemporalConversions.toLocalTime(obj)).toStringcase Timestamp.SCHEMA_NAME | MicroTimestamp.SCHEMA_NAME | NanoTimestamp.SCHEMA_NAME | ZonedTimestamp.SCHEMA_NAME =>sql.Timestamp.valueOf(TemporalConversions.toLocalDateTime(obj, ZoneId.of(serverTimeZone))).toStringcase _ => obj}}/** long 转换为时间类型 */private def convertLongToTime(obj: Long, typ: String): Any = {val time_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Time")val date_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Date")val timestamp_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Timestamp")typ match {case Time.SCHEMA_NAME =>org.apache.kafka.connect.data.Time.toLogical(time_schema, obj.asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toStringcase MicroTime.SCHEMA_NAME =>org.apache.kafka.connect.data.Time.toLogical(time_schema, (obj / 1000).asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toStringcase NanoTime.SCHEMA_NAME =>org.apache.kafka.connect.data.Time.toLogical(time_schema, (obj / 1000 / 1000).asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toStringcase Timestamp.SCHEMA_NAME =>val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTimejava.sql.Timestamp.valueOf(t).toStringcase MicroTimestamp.SCHEMA_NAME =>val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj / 1000).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTimejava.sql.Timestamp.valueOf(t).toStringcase NanoTimestamp.SCHEMA_NAME =>val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj / 1000 / 1000).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTimejava.sql.Timestamp.valueOf(t).toStringcase Date.SCHEMA_NAME =>org.apache.kafka.connect.data.Date.toLogical(date_schema, obj.asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDate.toStringcase _ => obj}}private def convertIntToDate(obj:Int, typ: String): Any ={val date_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Date")typ match {case Date.SCHEMA_NAME =>org.apache.kafka.connect.data.Date.toLogical(date_schema, obj).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDate.toStringcase _ => obj}}override def getProducedType: TypeInformation[Row] = {TypeInformation.of(classOf[Row])}
}
mysql cdc时区问题
mysql cdc也会出现上述时区问题,Debezium默认将MySQL中datetime类型转成UTC的时间戳({@link io.debezium.time.Timestamp}),时区是写死的无法更改,导致数据库中设置的UTC+8,到kafka中变成了多八个小时的long型时间戳 Debezium默认将MySQL中的timestamp类型转成UTC的字符串。
解决思路有两种:
1:自定义序列化方式的时候做时区转换。
2:自定义时间转换类,通过debezium配置文件指定转化格式。
这里主要使用第二种方式。
package com.zmn.schema;import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.function.Consumer;/*** 处理Debezium时间转换的问题* Debezium默认将MySQL中datetime类型转成UTC的时间戳({@link io.debezium.time.Timestamp}),时区是写死的无法更改,* 导致数据库中设置的UTC+8,到kafka中变成了多八个小时的long型时间戳* Debezium默认将MySQL中的timestamp类型转成UTC的字符串。* | mysql | mysql-binlog-connector | debezium |* | ----------------------------------- | ---------------------------------------- | --------------------------------- |* | date<br>(2021-01-28) | LocalDate<br/>(2021-01-28) | Integer<br/>(18655) |* | time<br/>(17:29:04) | Duration<br/>(PT17H29M4S) | Long<br/>(62944000000) |* | timestamp<br/>(2021-01-28 17:29:04) | ZonedDateTime<br/>(2021-01-28T09:29:04Z) | String<br/>(2021-01-28T09:29:04Z) |* | Datetime<br/>(2021-01-28 17:29:04) | LocalDateTime<br/>(2021-01-28T17:29:04) | Long<br/>(1611854944000) |** @see io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter*/
public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {private final static Logger logger = LoggerFactory.getLogger(MySqlDateTimeConverter.class);private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;private ZoneId timestampZoneId = ZoneId.systemDefault();@Overridepublic void configure(Properties props) {readProps(props, "format.date", p -> dateFormatter = DateTimeFormatter.ofPattern(p));readProps(props, "format.time", p -> timeFormatter = DateTimeFormatter.ofPattern(p));readProps(props, "format.datetime", p -> datetimeFormatter = DateTimeFormatter.ofPattern(p));readProps(props, "format.timestamp", p -> timestampFormatter = DateTimeFormatter.ofPattern(p));readProps(props, "format.timestamp.zone", z -> timestampZoneId = ZoneId.of(z));}private void readProps(Properties properties, String settingKey, Consumer<String> callback) {String settingValue = (String) properties.get(settingKey);if (settingValue == null || settingValue.length() == 0) {return;}try {callback.accept(settingValue.trim());} catch (IllegalArgumentException | DateTimeException e) {logger.error("The {} setting is illegal: {}",settingKey,settingValue);throw e;}}@Overridepublic void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {String sqlType = column.typeName().toUpperCase();SchemaBuilder schemaBuilder = null;Converter converter = null;if ("DATE".equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");converter = this::convertDate;}if ("TIME".equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");converter = this::convertTime;}if ("DATETIME".equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");converter = this::convertDateTime;}if ("TIMESTAMP".equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");converter = this::convertTimestamp;}if (schemaBuilder != null) {registration.register(schemaBuilder, converter);}}private String convertDate(Object input) {if (input instanceof LocalDate) {return dateFormatter.format((LocalDate) input);}if (input instanceof Integer) {LocalDate date = LocalDate.ofEpochDay((Integer) input);return dateFormatter.format(date);}return null;}private String convertTime(Object input) {if (input instanceof Duration) {Duration duration = (Duration) input;long seconds = duration.getSeconds();int nano = duration.getNano();LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);return timeFormatter.format(time);}return null;}private String convertDateTime(Object input) {if (input instanceof LocalDateTime) {return datetimeFormatter.format((LocalDateTime) input);}return null;}private String convertTimestamp(Object input) {if (input instanceof ZonedDateTime) {// mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间ZonedDateTime zonedDateTime = (ZonedDateTime) input;LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();return timestampFormatter.format(localDateTime);}return null;}
}
使用方式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("snapshot.mode", "schema_only"); // 增量读取//自定义时间转换配置properties.setProperty("converters", "dateConverters");properties.setProperty("dateConverters.type", "pg.cdc.ds.PgSQLDateTimeConverter");properties.setProperty("dateConverters.format.date", "yyyy-MM-dd");properties.setProperty("dateConverters.format.time", "HH:mm:ss");properties.setProperty("dateConverters.format.datetime", "yyyy-MM-dd HH:mm:ss");properties.setProperty("dateConverters.format.timestamp", "yyyy-MM-dd HH:mm:ss");properties.setProperty("dateConverters.format.timestamp.zone", "UTC+8");properties.setProperty("debezium.snapshot.locking.mode","none"); //全局读写锁,可能会影响在线业务,跳过锁设置 properties.setProperty("include.schema.changes", "true");// 使用flink mysql cdc 发现bigint unsigned类型的字段,capture以后转成了字符串类型,// 用的这个解析吧JsonDebeziumDeserializationSchema。properties.setProperty("bigint.unsigned.handling.mode","long");properties.setProperty("decimal.handling.mode","double");MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("192.168.10.102").port(3306).username("yusys").password("yusys").port(3306).databaseList("gmall").tableList("gmall.faker_user1").deserializer(new JsonDebeziumDeserializationSchema()).debeziumProperties(properties).serverId(5409).build();SingleOutputStreamOperator<string> dataSource = env.addSource(sourceFunction).setParallelism(10).name("binlog-source");
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
