hera源码剖析:项目启动之分布式锁

文章目录

  • 前言
    • 获取当前机器ip
    • 分布式锁
    • 知识点总结
    • master服务
    • work服务
    • work连接master

前言

本文章主要是为了让使用者能够更加了解 hera 的原理,并且能够在之基础上进行改进所进行。hera 是一款分布式任务调度与开发平台,具体不再描述,开源地址:https://github.com/scxwhite/hera

获取当前机器ip

hera 中,有一些静态代码块,这里只说一个很重要的部分,WorkContext 类中有这样一部分代码

static {host = NetUtils.getLocalAddress();HeraLog.info("-----------------------------当前机器的IP为:{}-----------------------------", host);//省略部分代码}

该代码会获取当前机器的 ip 信息,由于多网卡的原因可能获取的ip不是很准确,此时你可以通过在 hera-admin/src/main/resources/config/hera.properties 文件,修改server.ip=127.0.0.1 配置为当前机器 ip 即可。具体的代码不再深入。

分布式锁

hera 是使用 spring boot 开发的,启动项目执行 AdminBootstrap.main 方法,由于 DistributeLock#init 方法使用了 @PostConstruct 注册,首先会进入该方法

  @PostConstructpublic void init() {workClient.workSchedule.scheduleAtFixedRate(this::checkLock, 10, 60, TimeUnit.SECONDS);}

在该方法中会向 ScheduledThreadPoolExecutor 线程池提交一个每隔 60 秒执行的任务,具体任务内容在 DistributeLock#checkLock方法

  public void checkLock() {try {String ON_LINE = "online";//1.从hera_lock表查询最新记录HeraLock heraLock = heraLockService.findBySubgroup(ON_LINE);//2.如果当前没有master,直接以当前机器ip插入hera_lock新记录if (heraLock == null) {Date date = new Date();heraLock = HeraLock.builder().id(1).host(WorkContext.host).serverUpdate(date).subgroup(ON_LINE).gmtCreate(date).gmtModified(date).build();Integer lock = heraLockService.insert(heraLock);if (lock == null || lock <= 0) {return;}}//3.master判断if (isMaster = WorkContext.host.equals(heraLock.getHost().trim())) {heraLock.setServerUpdate(new Date());heraLockService.update(heraLock);HeraLog.info("hold lock and update time");heraSchedule.startup();} else {long currentTime = System.currentTimeMillis();long lockTime = heraLock.getServerUpdate().getTime();long interval = currentTime - lockTime;long timeout = 1000 * 60 * 5L;if (interval > timeout && isPreemptionHost()) {Date date = new Date();Integer lock = heraLockService.changeLock(WorkContext.host, date, date, heraLock.getHost());if (lock != null && lock > 0) {ErrorLog.error("master 发生切换,{} 抢占成功", WorkContext.host);heraSchedule.startup();heraLock.setHost(WorkContext.host);//TODO  接入master切换通知} else {HeraLog.info("master抢占失败,由其它worker抢占成功");}} else {//非主节点,调度器不执行 主要是为了避免手动修改hera_lock表后出现多master问题heraSchedule.shutdown();}}//4.初始化work服务workClient.init();//5.连接masterworkClient.connect(heraLock.getHost().trim());} catch (Exception e) {//出现异常时,对master节点做failFast操作。避免出现双masterheraSchedule.shutdown();ErrorLog.error("检测锁异常", e);}}

在解释这些代码之前大家要知道 hera 系统有个 hera_lock 表,该表最多只会有一条记录,并且该记录保存着当前的 master ip。也就是说大家如果想切换 master,可以直接通过修改该条记录的 ip 来实现。

在这里插入图片描述

  • DistributeLock#checkLock每次被调用时第一步会从 hera_lock 表查询出最新的 master 信息,至于在这里使用了 online 进行过滤没有实际意义。如果当前没有 master,即 hera_lock 等于 null (一般在第一次部署启动 hera 时会有该情况),为了方便调试此时会自动把当前机器设置为 master,当然如果插入当前 hera_lock 记录失败(被其它 work 插入了),会直接返回等待下次调用。
  • 在第 3 部分,首先会判断当前机器的 ip 信息与 hera_lock 表的 ip 是否匹配,如果匹配则表示当前机器为 master ,然后更新数据库的 server_update 时间,所以一定要保证你的所有机器时钟一致哦,最后调用heraSchedule.startup() 方法来启动 master 服务。
  • 如果发现当前机器的 ip 信息与 hera_lock 表的 ip 不匹配,则表示当前机器是 work,此时会通过 long interval = currentTime - lockTime; 来计算 master上次的心跳时间间隔,如果发现 master 已经超出5分钟未发送新的心跳,则通过 isPreemptionHost 方法判断当前机器是否允许抢占 master,如果允许则通过Integer lock = heraLockService.changeLock(WorkContext.host, date, date, heraLock.getHost()); 方法来进行抢占。抢占 sql
# 乐观锁方式进行抢占,保证同一时间只有一台机器能够抢占成功
update hera_lock set gmt_modified = #{gmtModified},host = #{host},server_update = #{serverUpdate} where host = #{lastHost}

如果发现 work 抢占 master 成功,则调用 heraSchedule.startup();来启动master 服务。

  private boolean isPreemptionHost() {List<String> preemptionHostList = hostGroupService.findPreemptionGroup(HeraGlobalEnv.preemptionMasterGroup);if (preemptionHostList.contains(WorkContext.host)) {return true;} else {HeraLog.info(WorkContext.host + " is not in master group " + preemptionHostList.toString());return false;}}

isPreemptionHost 方法主要是判断当前机器是否在允许抢占的机器组,该配置为:hera.preemptionMasterGroup

  • 在第 4 部分会进行 work 服务的初始化,此时需要注意的是:master 会启动 master 服务和 work 服务,work 只启动 work 服务。也就是说,你可以在本地 idea 启动 hera 来进行调试,你也可以在生产环境只有一台机器进行任务调度。
  • 在第 5 部分会进行 work 服务连接 master 服务的操作,即 netty 通信打开

看到这里,想必你已经了解了 DistributeLock 类是一个定时进行分布式锁检测的类,它决定着当前机器是启动 master 服务还是 work 服务

知识点总结

  • 可以通过直接修改 hera_lock 表的数据来切换 master
  • 可以只启动一台机器来进行 hera 的调度与开发
  • 可以通过配置hera.preemptionMasterGroup参数来让某些机器允许抢占master

master服务

在分布式锁中,如果当前机器抢到了 master,那么此时该机器会调用HeraSchedule#startup 启动 master 服务

    public void startup() {if (!running.compareAndSet(false, true)) {return;}HeraLog.info("begin to start master context");masterContext.init();}

为了保证 master 服务只被启动一次,使用了原子类 AtomicBoolean
继续往下看

 public void init() {//主要处理work的请求信息threadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory("master-wait-response"), new ThreadPoolExecutor.AbortPolicy());//主要管理master的一些延迟任务处理masterSchedule = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("master-schedule", false));masterSchedule.setKeepAliveTime(5, TimeUnit.MINUTES);masterSchedule.allowCoreThreadTimeOut(true);//开启quartz服务getQuartzSchedulerService().start();dispatcher = new Dispatcher();//初始化master端的netty消息handlerhandler = new MasterHandler(this);//初始化master servermasterServer = new MasterServer(handler);masterServer.start(HeraGlobalEnv.getConnectPort());master.init(this);//master的定时任务管理者choreService = new ChoreService(5, "chore-service");//重跑任务初始化rerunJobInit = new RerunJobInit(master);choreService.scheduledChore(rerunJobInit);//重跑任务启动rerunJobLaunch = new RerunJobLaunch(master);choreService.scheduledChore(rerunJobLaunch);//信号丢失检测lostJobCheck = new LostJobCheck(master, new DateTime().getMinuteOfHour());choreService.scheduledChore(lostJobCheck);//心跳检测heartCheck = new WorkHeartCheck(master);choreService.scheduledChore(heartCheck);//版本生成actionInit = new JobActionInit(master);choreService.scheduledChore(actionInit);//任务是否完成检测finishCheck = new JobFinishCheck(master);choreService.scheduledChore(finishCheck);//队列扫描queueScan = new JobQueueScan(master);choreService.scheduledChoreOnce(queueScan);HeraLog.info("end init master content success ");}

这部分代码主要功能是

  • 初始化master 端的消息处理线程池threadPool
  • 初始化 master 端的延迟任务线程池 masterSchedule
  • 开启 quartz 服务
  • 初始化 master 端的 netty 消息 handler
  • 初始化 master 的定时任务管理者,任务有:任务重跑初始化、重跑任务启动、信号丢失检测、心跳检测、版本生成、任务是否完成检测、队列扫描等

work服务

		//保证只启动一次if (!clientSwitch.compareAndSet(false, true)) {return;}workContext.setWorkClient(this);//在这里目前是空的,公司内部初始化了一些公共数据源配置workContext.init();eventLoopGroup = new NioEventLoopGroup();bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new IdleStateHandler(0, 0, 5, TimeUnit.SECONDS)).addLast("frameDecoder", new ProtobufVarint32FrameDecoder()).addLast("decoder", new ProtobufDecoder(RpcSocketMessage.SocketMessage.getDefaultInstance())).addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender()).addLast("encoder", new ProtobufEncoder()).addLast(new WorkHandler(workContext));}});HeraLog.info("init work client success ");workSchedule.schedule(new Runnable() {private WorkerHandlerHeartBeat workerHandlerHeartBeat = new WorkerHandlerHeartBeat();private int failCount = 0;@Overridepublic void run() {try {if (workContext.getServerChannel() != null) {boolean send = workerHandlerHeartBeat.send(workContext);if (!send) {failCount++;ErrorLog.error("send heart beat failed ,failCount :" + failCount);} else {failCount = 0;HeartLog.debug("send heart beat success:{}", workContext.getServerChannel().getRemoteAddress());}} else {ErrorLog.error("server channel can not find on " + WorkContext.host);}} catch (Exception e) {ErrorLog.error("heart beat error:", e);} finally {workSchedule.schedule(this, (failCount + 1) * HeraGlobalEnv.getHeartBeat(), TimeUnit.SECONDS);}}}, HeraGlobalEnv.getHeartBeat(), TimeUnit.SECONDS);//省略定时更新日志的代码

work 服务这边同样使用了原子类保证只启动一次,然后就是初始化 worknetty 服务,再往下就是定时发送心跳信息给 master

work连接master

DistributeLock#checkLock 方法做的最后一件事情就是 work 连接 master,打开通信,调用的方法为:workClient.connect(heraLock.getHost().trim());

 public synchronized void connect(String host) throws Exception {if (workContext.getServerChannel() != null) {if (workContext.getServerHost().equals(host)) {return;} else {workContext.getServerChannel().close();workContext.setServerChannel(null);}}workContext.setServerHost(host);CountDownLatch latch = new CountDownLatch(1);ChannelFutureListener futureListener = (future) -> {try {if (future.isSuccess()) {workContext.setServerChannel(FailFastCluster.wrap(future.channel()));SocketLog.info(workContext.getServerChannel().toString());}} catch (Exception e) {ErrorLog.error("连接master异常", e);} finally {latch.countDown();}};ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress(host, HeraGlobalEnv.getConnectPort()));connectFuture.addListener(futureListener);if (!latch.await(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS)) {connectFuture.removeListener(futureListener);connectFuture.cancel(true);throw new ExecutionException(new TimeoutException("connect server consumption of 2 seconds"));}if (!connectFuture.isSuccess()) {throw new RuntimeException("connect server failed " + host,connectFuture.cause());}SocketLog.info("connect server success");}

连接 master 时会首先判断当前是否已经连接了 master,如果已经连接并且和当前 master ip 一致则直接返回,否则关闭 netty 通信,重置当前 workmaster 信息。

然后通过 CountDownLatchawait(long timeout, TimeUnit unit)方法,来进行workmaster 的超时连接判断,通过ChannelFutureListenermasterwork 通信连接成功时设置 ServerChannel,并且容错方式为快速失败FailFastCluster.wrap(future.channel())


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部