flink设置TTL
flink中状态如果不清理就会越来越大,实际上很多状态是可以清理的,比如说我们在计算日活时,使用日期作为key划分流,为了过滤掉重复的用户,在每个key内都维护了一个MapState。而我们实际上只关注当前日期的日活(因为之前的日活我们已经知道了),所有可以将之前日期的状态都清理。手动清理很麻烦,我们可以为状态设置超时时间,当超过这个时间之后,flink会自动清除这些数据:
/*** 统计日活,pass掉已经登录过的日志*/static class RihuoProcessFunction extends ProcessFunction> {private MapState hasLogin;private ValueState count;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 设置超时时间,超过24小时后,就会被清除StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(24)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();MapStateDescriptor mapStateDescriptor=new MapStateDescriptor("has login?",String.class,Boolean.class);mapStateDescriptor.enableTimeToLive(ttlConfig);hasLogin = getRuntimeContext().getMapState(mapStateDescriptor);ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor<>("uv", Integer.class);count=getRuntimeContext().getState(valueStateDescriptor);}@Overridepublic void processElement(LoginLog loginLog, ProcessFunction>.Context context, Collector> collector) throws Exception {String uid = loginLog.getCommon().getUid();// 如果还没登陆过if (!hasLogin.contains(uid)){hasLogin.put(uid,true);if (count.value()==null){count.update(1);} else {count.update(count.value()+1);}Date date = new Date(Long.parseLong(loginLog.getTs()));// 这个month是从0到11的,我也懒得管它了String logDate = date.getYear()+1900+"-"+date.getMonth()+"-"+date.getDate();collector.collect(Tuple2.apply(logDate,count.value()));}}}
代码设置的超时时间是系统时间,就是在插入数据时设置一个计时器,计时器到时间之后就会被清除
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
