二、Hera调度系统初始化、生成和清理版本

先了解Hera代码里一些基本数据结构

一、Event事件、Listener、Dispatcher

一、初始化 Listener、Dispatcher.jobHandlers
  1. 初始化 不同任务类型的Listener
  2. 获取前2天到当前的 所有action_id的详细信息 (默认配置2天)
  3. 将所有的 action_id 添加进 Dispatcher.jobHandlers
  4. 将action_id列表 放入内存中 heraActionMap
// Master
public void init(MasterContext masterContext) {...executeJobPool.execute(() -> {// 初始化 不同任务类型的ListenermasterContext.getDispatcher().addDispatcherListener(new HeraAddJobListener(this, masterContext));masterContext.getDispatcher().addDispatcherListener(new HeraJobFailListener());masterContext.getDispatcher().addDispatcherListener(new HeraDebugListener(masterContext));masterContext.getDispatcher().addDispatcherListener(new HeraJobSuccessListener(masterContext));// 获取前2天到当前的 所有action_id的详细信息 (默认配置2天)List<HeraAction> allJobList = masterContext.getHeraJobActionService().getAfterAction(getBeforeDayAction());heraActionMap = new HashMap<>(allJobList.size());allJobList.forEach(heraAction -> {// 将所有的 action_id 添加进 Dispatcher.jobHandlersmasterContext.getDispatcher().addJobHandler(new JobHandler(heraAction.getId().toString(), this, masterContext));// 将action_id列表 放入内存中 heraActionMapheraActionMap.put(heraAction.getId(), heraAction);});});
二、生成版本 generateBatchAction
  1. 获取hera_job所有的job信息
  2. actionMap 初始化为 heraActionMap (两天内的所有action_id)
  3. 将定时任务生成版本 generateScheduleJobAction (解析quartz表达式)
    1. 如果 hera_job列表里调度类型是 定时调度的job,则解析quartz表达式,生成action_id版本
    2. 生成的action_id版本,如果存在数据表hera_action中,去update操作,否则insert
    3. 生成的action_id版本,往actionMap里 添加上新增的版本
  4. 遍历所有依赖任务 单个生成版本 generateDependJobAction
    1. 如果该任务的依赖任务之前没有处理过,那么递归调用generateDependJobAction
    2. 直到该任务的所有依赖任务都处理过,优先找到该任务的所有依赖任务中版本数最少的
    3. 如果依赖任务中,版本数相同,找去时间较大的那个依赖 (action_id更大的)
    4. 该依赖任务的action_id, 是根据 所有依赖任务中 版本数最少 且 时间最大的 那个生成的
    5. 该依赖任务的 依赖action_id列表,是依赖任务中的所有版本 选一个时间最接近 step4生成的 action_id
    6. 生成action_id版本,往actionMap里 添加上新增的版本
  5. 如果执行的时间 在23点之前, heraActionMap = actionMap (更新缓存在内存中的 action_id列表)
  6. Dispatcher.jobHandlers 添加上 新增的 action_id对应的 JobHandler
  7. 如果生成的版本 大于当前时间,发送更新版本事件 HeraJobMaintenanceEvent
// Master
private boolean generateAction(boolean isSingle, Integer jobId) {try {//凌晨0点生成版本,早上7点以后开始再次生成版本boolean execute = executeHour == 0 || (executeHour > ActionUtil.ACTION_CREATE_MIN_HOUR && executeHour <= ActionUtil.ACTION_CREATE_MAX_HOUR);if (execute || isSingle) {String currString = ActionUtil.getCurrHourVersion();// 如果当前小时是 23点,生成版本时间更改成明天if (executeHour == ActionUtil.ACTION_CREATE_MAX_HOUR) {Tuple<String, Date> nextDayString = ActionUtil.getNextDayString();//例如:今天 2018.07.17 23:50  currString = 201807180000000000 now = 2018.07.18 23:50currString = nextDayString.getSource();now = nextDayString.getTarget();}Long nowAction = Long.parseLong(currString);Map<Long, HeraAction> actionMap = new HashMap<>(heraActionMap.size());List<HeraJob> jobList = new ArrayList<>();//批量 生成版本//step 1. 获取hera_job所有的job信息jobList = masterContext.getHeraJobService().getAll();String cronDate = ActionUtil.getActionVersionPrefix(now);Map<Integer, List<HeraAction>> idMap = new HashMap<>(jobList.size());Map<Integer, HeraJob> jobMap = new HashMap<>(jobList.size());//step 2. 定时任务生成版本 (解析quartz表达式)generateScheduleJobAction(jobList, cronDate, actionMap, nowAction, idMap, jobMap);for (Map.Entry<Integer, HeraJob> entry : jobMap.entrySet()) {//step 3. 依赖任务生成版本 (递归生成action_id的依赖关系)generateDependJobAction(jobMap, entry.getValue(), actionMap, nowAction, idMap);}if (executeHour < ActionUtil.ACTION_CREATE_MAX_HOUR) {heraActionMap = actionMap;}Dispatcher dispatcher = masterContext.getDispatcher();if (dispatcher != null) {if (actionMap.size() > 0) {for (Long id : actionMap.keySet()) {dispatcher.addJobHandler(new JobHandler(id.toString(), masterContext.getMaster(), masterContext));if (id >= Long.parseLong(currString)) {dispatcher.forwardEvent(new HeraJobMaintenanceEvent(Events.UpdateActions, id.toString()));}}}}ScheduleLog.info("[单个任务:{},任务id:{}]generate action success", isSingle, jobId);return true;}} catch (Exception e) {e.printStackTrace();} finally {isGenerateActioning = false;}return false;}
三、清理版本 clearInvalidAction
  1. 移除15分钟前的 定时任务版本
  2. 移除大于当前时间,并且不在heraActionMap中的版本(数据表hera_action删除、Dispatcher.jobHandlers也删除对应的JobHandler)
// Master
private void clearInvalidAction() {...Map<Long, HeraAction> actionMapNew = heraActionMap;List<AbstractHandler> handlers = dispatcher.getJobHandlers();List<JobHandler> shouldRemove = new ArrayList<>();if (handlers != null && handlers.size() > 0) {handlers.forEach(handler -> {JobHandler jobHandler = (JobHandler) handler;// 移除15分钟前的 定时任务版本if (Long.parseLong(actionId) < preCheckTime) {masterContext.getQuartzSchedulerService().deleteJob(actionId);} else if (aid >= currDate && aid < nextDay) {// 移除大于当前时间,并且不在heraActionMap中的版本if (!actionMapNew.containsKey(aid)) {masterContext.getQuartzSchedulerService().deleteJob(actionId);masterContext.getHeraJobActionService().delete(actionId);shouldRemove.add(jobHandler);}}});}}
下一篇 三、Hera调度系统 待执行队列 入队和出队时机


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部