hera源码剖析:一次任务触发的执行流程

文章目录

    • 触发任务
    • work端
    • master端
      • run方法

hera 中,任务被触发的方式有多种,比如分析师在前端手动执行触发、定时任务触发、依赖任务触发、重跑任务触发、信号丢失的触发等等。但是不管是哪种触发方式最后的入口都是在 Master#run 方法(开发中心任务触发接口在 Master#debug )。

这里就讲一下手动执行的任务触发流程

触发任务

在最新版本中,任务手动触发类型分为手动执行、手动恢复、超级恢复三种,具体区别就不再赘述,可以通过 hera 操作文档查看,这里以手动恢复为例
在这里插入图片描述
当我们点击执行之后,会发送一个请求到后端

work端

首先看下 work 端的堆栈信息

writeAndFlush:28, NettyChannel (com.dfire.core.netty)
writeAndFlush:32, FailFastCluster (com.dfire.core.netty.cluster)
buildMessage:100, WorkerHandleWebRequest (com.dfire.core.netty.worker.request)
handleWebExecute:29, WorkerHandleWebRequest (com.dfire.core.netty.worker.request)
executeJobFromWeb:312, WorkClient (com.dfire.core.netty.worker)
execute:409, ScheduleOperatorController (com.dfire.controller)
invoke:-1, ScheduleOperatorController$$FastClassBySpringCGLIB$$cddb34c8 (com.dfire.controller)
invoke:204, MethodProxy (org.springframework.cglib.proxy)
invokeJoinpoint:738, CglibAopProxy$CglibMethodInvocation (org.springframework.aop.framework)
proceed:157, ReflectiveMethodInvocation (org.springframework.aop.framework)
proceed:85, MethodInvocationProceedingJoinPoint (org.springframework.aop.aspectj)
around:72, HeraAspect (com.dfire.config)
//省略部分

通过堆栈信息我们可以看到,在 controller 方法被调用之前会先调用一个通过 AOP 实现的权限校验的方法HeraAspect#around,当权限校验通过后会继续调用ScheduleOperatorController#execute 方法,该方法就是任务执行的入口。再往后就是调用 WorkerHandleWebRequest#handleWebExecuteWorkerHandleWebRequest#buildMessage 方法来创建 netty 消息体,最后通过一个快速失败的容错方式(FailFastCluster#writeAndFlush)来向 master 发送一条 netty 消息


下面仔细分析下,controller 入口

    @RequestMapping(value = "/manual", method = RequestMethod.GET)@ResponseBody@ApiOperation("手动执行接口")public JsonResponse execute(@JsonSerialize(using = ToStringSerializer.class) @ApiParam(value = "版本id", required = true) Long actionId, @ApiParam(value = "触发类型,2手动执行,3手动恢复,6超级恢复", required = true) Integer triggerType,@RequestParam(required = false) @ApiParam(value = "任务执行组", required = false) String execUser) throws InterruptedException, ExecutionException, HeraException, TimeoutException {//省略部分校验代码String configs = heraJob.getConfigs();//新建hera_action_history 对象,并向mysql插入执行记录HeraJobHistory actionHistory = HeraJobHistory.builder().build();actionHistory.setJobId(heraAction.getJobId());actionHistory.setActionId(heraAction.getId());actionHistory.setTriggerType(triggerTypeEnum.getId());actionHistory.setOperator(heraJob.getOwner());actionHistory.setIllustrate(execUser);actionHistory.setStatus(StatusEnum.RUNNING.toString());actionHistory.setStatisticEndTime(heraAction.getStatisticEndTime());actionHistory.setHostGroupId(heraAction.getHostGroupId());heraJobHistoryService.insert(actionHistory);heraAction.setScript(heraJob.getScript());heraAction.setHistoryId(actionHistory.getId());heraAction.setConfigs(configs);heraAction.setAuto(heraJob.getAuto());heraAction.setHostGroupId(heraJob.getHostGroupId());heraJobActionService.update(heraAction);//向master 发送任务执行的请求workClient.executeJobFromWeb(JobExecuteKind.ExecuteKind.ManualKind, actionHistory.getId());String ownerId = getOwnerId();if (ownerId == null) {ownerId = "0";}//添加操作记录addJobRecord(heraJob.getId(), String.valueOf(actionId), RecordTypeEnum.Execute, execUser, ownerId);return new JsonResponse(true, String.valueOf(actionId));}

这部分的代码很简单,主要分为三部分
1.创建 hera_action_history 对象,向 mysql 插入任务的执行记录
2.通过 nettymaster 发送任务执行的消息
3.添加任务执行记录

需要我们关注的主要是第二部分,通过上面的堆栈信息继续往下看

    public void executeJobFromWeb(ExecuteKind kind, Long id) throws ExecutionException, InterruptedException, TimeoutException {RpcWebResponse.WebResponse response = WorkerHandleWebRequest.handleWebExecute(workContext, kind, id).get(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS);if (response.getStatus() == ResponseStatus.Status.ERROR) {ErrorLog.error(response.getErrorText());}}

这部分代码调用了 WorkerHandleWebRequest.handleWebExecute 并返回一个future,通过 future.get 来阻塞我们的请求,继续往下看

    public static Future<WebResponse> handleWebExecute(final WorkContext workContext, ExecuteKind kind, Long id) {return buildMessage(WebRequest.newBuilder().setRid(AtomicIncrease.getAndIncrement()).setOperate(WebOperate.ExecuteJob).setEk(kind).setId(String.valueOf(id)).build(), workContext, "[执行]-任务超出" + HeraGlobalEnv.getRequestTimeout() + "秒未得到master消息返回:" + id);}private static Future<WebResponse> buildMessage(WebRequest request, WorkContext workContext, String errorMsg) {CountDownLatch latch = new CountDownLatch(1);WorkResponseListener responseListener = new WorkResponseListener(request, false, latch, null);workContext.getHandler().addListener(responseListener);Future<WebResponse> future = workContext.getWorkWebThreadPool().submit(() -> {latch.await(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS);if (!responseListener.getReceiveResult()) {ErrorLog.error(errorMsg);}workContext.getHandler().removeListener(responseListener);return responseListener.getWebResponse();});try {workContext.getServerChannel().writeAndFlush(SocketMessage.newBuilder().setKind(SocketMessage.Kind.WEB_REQUEST).setBody(request.toByteString()).build());SocketLog.info("1.WorkerHandleWebRequest: send web request to master requestId ={}", request.getRid());} catch (RemotingException e) {workContext.getHandler().removeListener(responseListener);ErrorLog.error("1.WorkerHandleWebRequest: send web request to master exception requestId =" + request.getRid(), e);}return future;}

handleWebExecute 方法中,新建了一个 WebRequest 对象,需要注意的是该对象的 operator 参数为 WebOperate.ExecuteJobidhera_action_history 记录的 id
然后在 buildMessage 方法中有三个比较关键的代码
1.CountDownLatch latch = new CountDownLatch(1); 该锁会在一个线程池的异步操作中等待,并且会在WorkResponseListener 中被释放。
2.WorkResponseListener responseListener = new WorkResponseListener(request, false, latch, null);

public class WorkResponseListener extends ResponseListenerAdapter {private RpcWebRequest.WebRequest request;private volatile Boolean receiveResult;private CountDownLatch latch;private RpcWebResponse.WebResponse webResponse;@Overridepublic void onWebResponse(RpcWebResponse.WebResponse response) {if (request.getRid() == response.getRid()) {try {webResponse = response;receiveResult = true;} catch (Exception e) {ErrorLog.error("work release exception {}", e);} finally {latch.countDown();}}}
}

onWebResponse 方法中,当发现request.getRid() == response.getRid()时会释放锁,并标志 receiveResulttrue
3.调用 workContext.getServerChannel().writeAndFlush 方法来向master发送任务执行的消息,在上篇hera源码剖析:项目启动之分布式锁 已经说过 workContext 是什么时候设置的 serverChannel

master端

master 接收所有 netty 消息的处理类为 MasterHandler,也就是说上面work 发送的执行任务请求最终会在MasterHandler#channelRead被调用

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {SocketMessage socketMessage = (SocketMessage) msg;Channel channel = ctx.channel();switch (socketMessage.getKind()) {//省略部分代码case WEB_REQUEST:final WebRequest webRequest = WebRequest.newBuilder().mergeFrom(socketMessage.getBody()).build();switch (webRequest.getOperate()) {case ExecuteJob:completionService.submit(() ->new ChannelResponse(FailBackCluster.wrap(channel), MasterHandlerWebResponse.handleWebExecute(masterContext, webRequest)));break;//省略部分代码}//省略部分代码}}

MasterHandler 直接把 work 的任务执行请求异步分发给 MasterHandlerWebResponse.handleWebExecute 来处理,并且返回了一个失败重试封装的 channel

  public static WebResponse handleWebExecute(MasterContext context, WebRequest request) {if (request.getEk() == ExecuteKind.ManualKind || request.getEk() == ExecuteKind.ScheduleKind) {Long historyId = Long.parseLong(request.getId());HeraJobHistory heraJobHistory = context.getHeraJobHistoryService().findById(historyId);HeraJobHistoryVo history = BeanConvertUtils.convert(heraJobHistory);context.getMaster().run(history, context.getHeraJobService().findById(history.getJobId()));WebResponse webResponse = WebResponse.newBuilder().setRid(request.getRid()).setOperate(WebOperate.ExecuteJob).setStatus(Status.OK).build();TaskLog.info("MasterHandlerWebResponse: send web execute response, actionId = {} ", history.getJobId());return webResponse;} else if (request.getEk() == ExecuteKind.DebugKind) {Long debugId = Long.parseLong(request.getId());HeraDebugHistoryVo debugHistory = context.getHeraDebugHistoryService().findById(debugId);TaskLog.info("2-1.MasterHandlerWebResponse: receive web debug response, debugId = " + debugId);context.getMaster().debug(debugHistory);WebResponse webResponse = WebResponse.newBuilder().setRid(request.getRid()).setOperate(WebOperate.ExecuteJob).setStatus(Status.OK).build();TaskLog.info("2-2.MasterHandlerWebResponse : send web debug response, debugId = {}", debugId);return webResponse;}return WebResponse.newBuilder().setRid(request.getRid()).setErrorText("未识别的操作类型" + request.getEk()).setStatus(Status.ERROR).build();}

在这里主要是根据request.getEk() 来判断是开发中心的任务执行还是调度中心的任务执行。在我们手动恢复时,该值为:ExecuteKind.ManualKind,直接看 if 部分代码。

  • 首先根据 hera_action_historyid 来查询在 work 端插入的那条记录

  • 调用 master#run 方法

  • 创建 webResponse 对象,返回执行任务 ok 的标志

run方法

 public void run(HeraJobHistoryVo heraJobHistory, HeraJob heraJob) {Long actionId = heraJobHistory.getActionId();//重复job检测//1:检测任务是否已经在队列或者正在执行if (checkJobExists(heraJobHistory, false)) {return;}HeraAction heraAction = masterContext.getHeraJobActionService().findById(actionId);Set<String> areaList = areaList(heraJob.getAreaId());//2:非执行区域任务直接设置为成功if (!areaList.contains(HeraGlobalEnv.getArea()) && !areaList.contains(Constants.ALL_AREA)) {ScheduleLog.info("非{}区域任务,直接设置为成功:{}", HeraGlobalEnv.getArea(), heraJob.getId());heraAction.setLastResult(heraAction.getStatus());heraAction.setStatus(StatusEnum.SUCCESS.toString());heraAction.setHistoryId(heraJobHistory.getId());heraAction.setReadyDependency("{}");String host = "localhost";heraAction.setHost(host);Date endTime = new Date();heraAction.setStatisticStartTime(endTime);heraAction.setStatisticEndTime(endTime);masterContext.getHeraJobActionService().update(heraAction);heraJobHistory.getLog().append("非" + HeraGlobalEnv.getArea() + "区域任务,直接设置为成功");heraJobHistory.setStatusEnum(StatusEnum.SUCCESS);heraJobHistory.setEndTime(endTime);heraJobHistory.setStartTime(endTime);heraJobHistory.setExecuteHost(host);masterContext.getHeraJobHistoryService().update(BeanConvertUtils.convert(heraJobHistory));HeraJobSuccessEvent successEvent = new HeraJobSuccessEvent(actionId, heraJobHistory.getTriggerType(), heraJobHistory);masterContext.getDispatcher().forwardEvent(successEvent);return;}//3.先在数据库中set一些执行任务所需的必须值 然后再加入任务队列heraAction.setLastResult(heraAction.getStatus());heraAction.setStatus(StatusEnum.RUNNING.toString());heraAction.setHistoryId(heraJobHistory.getId());heraAction.setStatisticStartTime(new Date());heraAction.setStatisticEndTime(null);masterContext.getHeraJobActionService().update(heraAction);heraJobHistory.getLog().append(ActionUtil.getTodayString() + " 进入任务队列");masterContext.getHeraJobHistoryService().update(BeanConvertUtils.convert(heraJobHistory));boolean isFixed;int priorityLevel = 3;Map<String, String> configs = StringUtil.convertStringToMap(heraAction.getConfigs());String priorityLevelValue = configs.get("run.priority.level");if (priorityLevelValue != null) {priorityLevel = Integer.parseInt(priorityLevelValue);}String areaFixed = HeraGlobalEnv.getArea() + Constants.POINT + Constants.HERA_EMR_FIXED;if (configs.containsKey(Constants.HERA_EMR_FIXED) || configs.containsKey(areaFixed)) {isFixed = Boolean.parseBoolean(configs.get(areaFixed)) || Boolean.parseBoolean(configs.get(Constants.HERA_EMR_FIXED));} else {isFixed = Boolean.parseBoolean(getInheritVal(heraAction.getGroupId(), areaFixed, Constants.HERA_EMR_FIXED));}Integer endMinute = masterContext.getHeraJobService().findMustEndMinute(heraAction.getJobId());//4.组装JobElementJobElement element = JobElement.builder().jobId(heraJobHistory.getActionId()).hostGroupId(heraJobHistory.getHostGroupId()).priorityLevel(priorityLevel).historyId(heraJobHistory.getId()).fixedEmr(isFixed).owner(heraJob.getOwner()).costMinute(endMinute).build();try {element.setTriggerType(heraJobHistory.getTriggerType());HeraAction cacheAction = heraActionMap.get(element.getJobId());if (cacheAction != null) {cacheAction.setStatus(StatusEnum.RUNNING.toString());}//5.放入任务扫描队列switch (heraJobHistory.getTriggerType()) {case MANUAL:masterContext.getManualQueue().put(element);break;case AUTO_RERUN:masterContext.getRerunQueue().put(element);break;case MANUAL_RECOVER:case SCHEDULE:masterContext.getScheduleQueue().put(element);break;case SUPER_RECOVER:masterContext.getSuperRecovery().put(element);break;default:ErrorLog.error("不支持的调度类型:{},id:{}", heraJobHistory.getTriggerType().toName(), heraJobHistory.getActionId());break;}} catch (InterruptedException e) {ErrorLog.error("添加任务" + element.getJobId() + "失败", e);}}

run 方法的主要功能是将要执行的任务根据类型放到不同的队列。
由于代码较多分段分析

  1. checkJobExists 方法检测任务是否已经在队列或者正在执行,如果是允许重复执行任务或者任务重跑触发的任务不会进行检测
  2. 对于非执行区域任务直接设置为成功并且广播通知下游任务,该参数由application.yml中的hera.area 配置决定。另外,如果区域设置为 all,则所有区域都能执行。
  3. 在数据库中 set 一些执行任务所需的必须值 然后再加入任务队列
  4. 组装 JobElement,该对象最终会被放到执行队列中。主要参数有:costMinute(任务的预计最大执行分钟数)、jobId(任务的执行实例id)、hostGroupId(任务的执行机器组)、priorityLevel(任务的有限级别)、historyId(该任务对应的执行记录 id)、fixedEmr 是否在固定集群执行、owner任务的创建人所在组
  5. 将任务根据不同的触发类型,放入不同的任务扫描队列,等待 master 的扫描线程扫描


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部