三、Hera调度系统 待执行队列 入队和出队时机

简单介绍

待运行队列(MasterContext.scheduleQueue)
生产者:定时时间到、或者依赖任务都执行完,往队列里添加待执行的任务
消费者:worker 定时去扫描队列里 是否有任务 需要执行

一、定时任务如何往队列里 添加任务
1、在Hera启动初始化时

1、发送一条初始化的事件 Events.Initialize

// Master.init
executeJobPool.execute(() -> {...allJobList.forEach(heraAction -> {masterContext.getDispatcher().addJobHandler(new JobHandler(heraAction.getId().toString(), this, masterContext));heraActionMap.put(heraAction.getId(), heraAction);});// 发送一条初始化的事件masterContext.getDispatcher().forwardEvent(Events.Initialize);
});

2、Dispatcher 收到这条事件后,会对每一个JobHandler 去执行初始化的handle操作,执行createScheduleJob函数(等定时时间到后,往队列添加任务)

// JobHandler
public void handleInitialEvent() {/*** 如果是定时任务,启动定时程序,独立调度任务,创建quartz调度**/HeraActionVo heraActionVo = cache.getHeraActionVo();boolean isSchedule = heraActionVo.getAuto() && Objects.equals(heraActionVo.getScheduleType(), JobScheduleTypeEnum.Independent);if (isSchedule) {try {createScheduleJob(masterContext.getDispatcher(), heraActionVo);} catch (Exception e) {if (e instanceof SchedulerException) {heraActionVo.setAuto(false);ErrorLog.error("create job quartz schedule error");}throw new RuntimeException(e);}}}
2、在生成版本时

1、如果生成的版本大于当前时间,发送一条任务更新事件

// Master
private boolean generateAction(boolean isSingle, Integer jobId) {try {if (execute || isSingle) {Dispatcher dispatcher = masterContext.getDispatcher();if (dispatcher != null) {if (actionMap.size() > 0) {for (Long id : actionMap.keySet()) {dispatcher.addJobHandler(new JobHandler(id.toString(), masterContext.getMaster(), masterContext));if (id >= Long.parseLong(currString)) {// 如果生成的版本大于当前时间,发送一条任务更新事件dispatcher.forwardEvent(new HeraJobMaintenanceEvent(Events.UpdateActions, id.toString()));}}}}}}
}

2、Dispatcher 收到这条事件后,判断如果是定时任务的话,执行createScheduleJob函数(等定时时间到后,往队列添加任务)

// JobHandler
private void autoRecovery() {/*** 如果是独立任务,则重新创建quartz调度**/if (heraActionVo.getScheduleType() == JobScheduleTypeEnum.Independent) {try {createScheduleJob(masterContext.getDispatcher(), heraActionVo);} catch (SchedulerException e) {e.printStackTrace();}}
}
二、依赖任务如何往队列里 添加任务

1、当任务执行成功后,发送一条HeraJobSuccessEvent事件。所有的JobHandler去处理这条事件。
2、如果该任务 对应的依赖任务 正好是这条已经执行成功的任务,将其依赖任务的这条改成已经执行状态。
3、如果该任务 所有的依赖任务都处于执行成功状态,那么将该任务添加进执行队列

// JobHandler
private void handleSuccessEvent(HeraJobSuccessEvent event) {JobStatus jobStatus;synchronized (this) {jobStatus = heraJobActionService.findJobStatus(actionId);ScheduleLog.info(actionId + "received a success dependency job with actionId = " + jobId);// 将任务 对应的依赖任务状态 改成已经执行状态jobStatus.getReadyDependency().put(jobId, String.valueOf(System.currentTimeMillis()));heraJobActionService.updateStatus(jobStatus);}boolean allComplete = true;for (String key : heraActionVo.getDependencies()) {if (jobStatus.getReadyDependency().get(key) == null) {allComplete = false;break;}}// 如果该任务 所有的依赖任务都处于执行成功状态if (allComplete) {ScheduleLog.info("JobId:" + jobId + " all dependency jobs is ready,run!");// 那么将该任务添加进执行队列startNewJob(event.getTriggerType(), heraActionVo);} else {ScheduleLog.info(actionId + "some of dependency is not ready, waiting" + JSONObject.toJSONString(jobStatus.getReadyDependency().keySet()));}
}


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部