flink学习(cancel以及整合springboot)
背景:
最近工作需要,刚好在研究flink,然后发现flink的资料百度少之又少,谷歌以及flink文档的说明也都不是很通俗,这里记录一下我在整合flink以及springboot的过程中问题比较麻烦的点。工作需求,我需要从springboot启动之后,根据redis里的配置进行动态创建任务,并且会根据redis的配置变化对任务进行一些重启的操作
实现
先讲一下,我是使用flink的StreamExecutionEnvironment.createRemoteEnvironment方式进行上传jar包,下面的流操作都跟具体业务场景一致即可
/*** 创建任务,可创建多个,这里根据你的业务来建* @param url flink连接* @param port flink端口号* @param jarPath 上传的jar包这里需要传一个含有你业务逻辑的jar包,就是你在flink页去提交的包,里面的main可直接写个简单的启动flink方法,但是其他的类不可缺少,包路径你可以放在resources下* @throws Exception*/private static void createJob1(String url,Integer port ,String jarPath) throws Exception{StreamExecutionEnvironment env = null;//如果没有地址,则用本地环境if(StringUtils.isNotEmty(url)){env = StreamExecutionEnvironment.createRemoteEnvironment(url,port,jarPath);}else{env = StreamExecutionEnvironment.getExecutionEnvironment();}myRedisSource = new MyRedisSource("flink");DataStreamSource<FlinkDataEntity> elementsSource = env.addSource(myRedisSource);elementsSource.addSink(new CustomSinkFunction());jobClient = env.executeAsync("t1");envMap.put("t1",jobClient);}
这里的createRemoteEnvironment相当于你在flink页面进行提交jar包的操作,但是这里要注意,jar包里的main方法可以只写个简单的flink程序(这里有个疑问搞不懂,为啥flink提供了这种连接远程环境的方式,但是必须要有个“假包”上传过去,实际假包里的代码好像并没有跑,都是跑的上面程序的代码)
private static void main() throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//这里随意,能通就行DataStreamSource<FlinkDataEntity> elementsSource = env.addSource(new MyRedisSource("flink_2"));elementsSource.addSink(new CustomSinkFunction());env.execute();}
关闭任务的话,需要保存一下env.executeAsync()的返回值,是一个JobClient对象,他提供了一个cancel的方法,可以对任务进行关闭
JobClient jobClient = env.executeAsync();jobClient.cancel().get();
当然如果springboot关闭了这个对象就销毁了,那可以通过flink提供的RestClusterClient对象去操作,只要地址跟端口号即可,
Configuration configuration = new Configuration();configuration.set(JobManagerOptions.ADDRESS,"flink地址");configuration.set(JobManagerOptions.PORT,8081);//这里的原理也是发起http请求,你也可以去页面拿url通过httpClient等工具请求,但是用flink自带的还是比较优雅RestClusterClient<StandaloneClusterId> restClusterClient = new RestClusterClient<>(configuration,StandaloneClusterId.getInstance());//任务ID,这里你在启动程序的时候可以通过restClusterClient.listJobs()去循环关闭,先结束任务,再启动新的任务,或者你把这个ID你自己存起来,下次重启关闭都可以,我是直接全部关闭再起JobID jobID = JobID.fromHexString("任务ID");CompletableFuture<Acknowledge> completableFuture = restClusterClient.cancel(jobID);//这里要注意,上面这样如果关闭的请求比较长,是不生效的,需要get一下,这里自己设置等待时间,这里我吃过亏,o(╥﹏╥)o,没get一直以为是flink这个接口出问题了,点进去看了一波源码才知道,这里他需要等待执行结果,如果执行时间不够就直接kill掉了completableFuture.get(1,TimeUnit.SECONDS);
其他逻辑懒得写了,springboot就没啥好写的了,直接初始化个springboot项目,启动之后就可以操作任务了,我是整了俩个项目,一个springboot启动的时候去操作flink的任务(包括流的逻辑),一个是需要上传的jar包,主要的实现逻辑的类都要有,main方法里可以写个简单的flink程序,希望能避免更多人踩坑
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
