数仓 数据漂移问题解决
数据漂移问题产生原因:

发生背景:离线数仓 以天为周期进行计算分析 使用Flume采集数据
详解:当业务数据产生时间临近一天结束时,数据传输到HDFS中需要一定时间,而Flume落盘时分区时间为Flume中Event对象header的timestamp时间信息,可能导致前一天的数据落盘到后一天的分区中,导致数据漂移,影响最终计算结果。
解决办法:
自定义Flume拦截器,将Event单元body中保存在真实的事件时间放入header头信息当中,使得落盘时数据落入正确的目的地中。

示例代码:
public class TimestampInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {//1、获取header和body的数据Map headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);//2、将body的数据类型转成jsonObject类型(方便获取数据)JSONObject jsonObject = JSONObject.parseObject(log);//3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题)String ts = jsonObject.getString("ts");headers.put("timestamp", ts);return event;}@Overridepublic List intercept(List list) {for (Event event : list) {intercept(event);}return list;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimestampInterceptor();}@Overridepublic void configure(Context context) {}}
}
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
