hera源码剖析:项目启动之分布式锁
文章目录
- 前言
- 获取当前机器ip
- 分布式锁
- 知识点总结
- master服务
- work服务
- work连接master
前言
本文章主要是为了让使用者能够更加了解
hera的原理,并且能够在之基础上进行改进所进行。hera是一款分布式任务调度与开发平台,具体不再描述,开源地址:https://github.com/scxwhite/hera
获取当前机器ip
在 hera 中,有一些静态代码块,这里只说一个很重要的部分,WorkContext 类中有这样一部分代码
static {host = NetUtils.getLocalAddress();HeraLog.info("-----------------------------当前机器的IP为:{}-----------------------------", host);//省略部分代码}
该代码会获取当前机器的 ip 信息,由于多网卡的原因可能获取的ip不是很准确,此时你可以通过在 hera-admin/src/main/resources/config/hera.properties 文件,修改server.ip=127.0.0.1 配置为当前机器 ip 即可。具体的代码不再深入。
分布式锁
hera 是使用 spring boot 开发的,启动项目执行 AdminBootstrap.main 方法,由于 DistributeLock#init 方法使用了 @PostConstruct 注册,首先会进入该方法
@PostConstructpublic void init() {workClient.workSchedule.scheduleAtFixedRate(this::checkLock, 10, 60, TimeUnit.SECONDS);}
在该方法中会向 ScheduledThreadPoolExecutor 线程池提交一个每隔 60 秒执行的任务,具体任务内容在 DistributeLock#checkLock方法
public void checkLock() {try {String ON_LINE = "online";//1.从hera_lock表查询最新记录HeraLock heraLock = heraLockService.findBySubgroup(ON_LINE);//2.如果当前没有master,直接以当前机器ip插入hera_lock新记录if (heraLock == null) {Date date = new Date();heraLock = HeraLock.builder().id(1).host(WorkContext.host).serverUpdate(date).subgroup(ON_LINE).gmtCreate(date).gmtModified(date).build();Integer lock = heraLockService.insert(heraLock);if (lock == null || lock <= 0) {return;}}//3.master判断if (isMaster = WorkContext.host.equals(heraLock.getHost().trim())) {heraLock.setServerUpdate(new Date());heraLockService.update(heraLock);HeraLog.info("hold lock and update time");heraSchedule.startup();} else {long currentTime = System.currentTimeMillis();long lockTime = heraLock.getServerUpdate().getTime();long interval = currentTime - lockTime;long timeout = 1000 * 60 * 5L;if (interval > timeout && isPreemptionHost()) {Date date = new Date();Integer lock = heraLockService.changeLock(WorkContext.host, date, date, heraLock.getHost());if (lock != null && lock > 0) {ErrorLog.error("master 发生切换,{} 抢占成功", WorkContext.host);heraSchedule.startup();heraLock.setHost(WorkContext.host);//TODO 接入master切换通知} else {HeraLog.info("master抢占失败,由其它worker抢占成功");}} else {//非主节点,调度器不执行 主要是为了避免手动修改hera_lock表后出现多master问题heraSchedule.shutdown();}}//4.初始化work服务workClient.init();//5.连接masterworkClient.connect(heraLock.getHost().trim());} catch (Exception e) {//出现异常时,对master节点做failFast操作。避免出现双masterheraSchedule.shutdown();ErrorLog.error("检测锁异常", e);}}
在解释这些代码之前大家要知道 hera 系统有个 hera_lock 表,该表最多只会有一条记录,并且该记录保存着当前的 master ip。也就是说大家如果想切换 master,可以直接通过修改该条记录的 ip 来实现。

DistributeLock#checkLock每次被调用时第一步会从hera_lock表查询出最新的master信息,至于在这里使用了online进行过滤没有实际意义。如果当前没有master,即hera_lock等于null(一般在第一次部署启动hera时会有该情况),为了方便调试此时会自动把当前机器设置为master,当然如果插入当前hera_lock记录失败(被其它work插入了),会直接返回等待下次调用。- 在第
3部分,首先会判断当前机器的ip信息与hera_lock表的ip是否匹配,如果匹配则表示当前机器为master,然后更新数据库的server_update时间,所以一定要保证你的所有机器时钟一致哦,最后调用heraSchedule.startup()方法来启动master服务。 - 如果发现当前机器的
ip信息与hera_lock表的ip不匹配,则表示当前机器是work,此时会通过long interval = currentTime - lockTime;来计算master上次的心跳时间间隔,如果发现master已经超出5分钟未发送新的心跳,则通过isPreemptionHost方法判断当前机器是否允许抢占master,如果允许则通过Integer lock = heraLockService.changeLock(WorkContext.host, date, date, heraLock.getHost());方法来进行抢占。抢占sql为
# 乐观锁方式进行抢占,保证同一时间只有一台机器能够抢占成功
update hera_lock set gmt_modified = #{gmtModified},host = #{host},server_update = #{serverUpdate} where host = #{lastHost}
如果发现 work 抢占 master 成功,则调用 heraSchedule.startup();来启动master 服务。
private boolean isPreemptionHost() {List<String> preemptionHostList = hostGroupService.findPreemptionGroup(HeraGlobalEnv.preemptionMasterGroup);if (preemptionHostList.contains(WorkContext.host)) {return true;} else {HeraLog.info(WorkContext.host + " is not in master group " + preemptionHostList.toString());return false;}}
isPreemptionHost 方法主要是判断当前机器是否在允许抢占的机器组,该配置为:hera.preemptionMasterGroup
- 在第
4部分会进行work服务的初始化,此时需要注意的是:master会启动master服务和work服务,work只启动work服务。也就是说,你可以在本地idea启动hera来进行调试,你也可以在生产环境只有一台机器进行任务调度。 - 在第
5部分会进行work服务连接master服务的操作,即netty通信打开
看到这里,想必你已经了解了 DistributeLock 类是一个定时进行分布式锁检测的类,它决定着当前机器是启动 master 服务还是 work 服务
知识点总结
- 可以通过直接修改
hera_lock表的数据来切换master - 可以只启动一台机器来进行
hera的调度与开发 - 可以通过配置
hera.preemptionMasterGroup参数来让某些机器允许抢占master
master服务
在分布式锁中,如果当前机器抢到了 master,那么此时该机器会调用HeraSchedule#startup 启动 master 服务
public void startup() {if (!running.compareAndSet(false, true)) {return;}HeraLog.info("begin to start master context");masterContext.init();}
为了保证 master 服务只被启动一次,使用了原子类 AtomicBoolean
继续往下看
public void init() {//主要处理work的请求信息threadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory("master-wait-response"), new ThreadPoolExecutor.AbortPolicy());//主要管理master的一些延迟任务处理masterSchedule = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("master-schedule", false));masterSchedule.setKeepAliveTime(5, TimeUnit.MINUTES);masterSchedule.allowCoreThreadTimeOut(true);//开启quartz服务getQuartzSchedulerService().start();dispatcher = new Dispatcher();//初始化master端的netty消息handlerhandler = new MasterHandler(this);//初始化master servermasterServer = new MasterServer(handler);masterServer.start(HeraGlobalEnv.getConnectPort());master.init(this);//master的定时任务管理者choreService = new ChoreService(5, "chore-service");//重跑任务初始化rerunJobInit = new RerunJobInit(master);choreService.scheduledChore(rerunJobInit);//重跑任务启动rerunJobLaunch = new RerunJobLaunch(master);choreService.scheduledChore(rerunJobLaunch);//信号丢失检测lostJobCheck = new LostJobCheck(master, new DateTime().getMinuteOfHour());choreService.scheduledChore(lostJobCheck);//心跳检测heartCheck = new WorkHeartCheck(master);choreService.scheduledChore(heartCheck);//版本生成actionInit = new JobActionInit(master);choreService.scheduledChore(actionInit);//任务是否完成检测finishCheck = new JobFinishCheck(master);choreService.scheduledChore(finishCheck);//队列扫描queueScan = new JobQueueScan(master);choreService.scheduledChoreOnce(queueScan);HeraLog.info("end init master content success ");}
这部分代码主要功能是
- 初始化
master端的消息处理线程池threadPool - 初始化
master端的延迟任务线程池masterSchedule - 开启
quartz服务 - 初始化
master端的netty消息handler - 初始化
master的定时任务管理者,任务有:任务重跑初始化、重跑任务启动、信号丢失检测、心跳检测、版本生成、任务是否完成检测、队列扫描等
work服务
//保证只启动一次if (!clientSwitch.compareAndSet(false, true)) {return;}workContext.setWorkClient(this);//在这里目前是空的,公司内部初始化了一些公共数据源配置workContext.init();eventLoopGroup = new NioEventLoopGroup();bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new IdleStateHandler(0, 0, 5, TimeUnit.SECONDS)).addLast("frameDecoder", new ProtobufVarint32FrameDecoder()).addLast("decoder", new ProtobufDecoder(RpcSocketMessage.SocketMessage.getDefaultInstance())).addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender()).addLast("encoder", new ProtobufEncoder()).addLast(new WorkHandler(workContext));}});HeraLog.info("init work client success ");workSchedule.schedule(new Runnable() {private WorkerHandlerHeartBeat workerHandlerHeartBeat = new WorkerHandlerHeartBeat();private int failCount = 0;@Overridepublic void run() {try {if (workContext.getServerChannel() != null) {boolean send = workerHandlerHeartBeat.send(workContext);if (!send) {failCount++;ErrorLog.error("send heart beat failed ,failCount :" + failCount);} else {failCount = 0;HeartLog.debug("send heart beat success:{}", workContext.getServerChannel().getRemoteAddress());}} else {ErrorLog.error("server channel can not find on " + WorkContext.host);}} catch (Exception e) {ErrorLog.error("heart beat error:", e);} finally {workSchedule.schedule(this, (failCount + 1) * HeraGlobalEnv.getHeartBeat(), TimeUnit.SECONDS);}}}, HeraGlobalEnv.getHeartBeat(), TimeUnit.SECONDS);//省略定时更新日志的代码
在 work 服务这边同样使用了原子类保证只启动一次,然后就是初始化 work的 netty 服务,再往下就是定时发送心跳信息给 master
work连接master
DistributeLock#checkLock 方法做的最后一件事情就是 work 连接 master,打开通信,调用的方法为:workClient.connect(heraLock.getHost().trim());
public synchronized void connect(String host) throws Exception {if (workContext.getServerChannel() != null) {if (workContext.getServerHost().equals(host)) {return;} else {workContext.getServerChannel().close();workContext.setServerChannel(null);}}workContext.setServerHost(host);CountDownLatch latch = new CountDownLatch(1);ChannelFutureListener futureListener = (future) -> {try {if (future.isSuccess()) {workContext.setServerChannel(FailFastCluster.wrap(future.channel()));SocketLog.info(workContext.getServerChannel().toString());}} catch (Exception e) {ErrorLog.error("连接master异常", e);} finally {latch.countDown();}};ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress(host, HeraGlobalEnv.getConnectPort()));connectFuture.addListener(futureListener);if (!latch.await(HeraGlobalEnv.getRequestTimeout(), TimeUnit.SECONDS)) {connectFuture.removeListener(futureListener);connectFuture.cancel(true);throw new ExecutionException(new TimeoutException("connect server consumption of 2 seconds"));}if (!connectFuture.isSuccess()) {throw new RuntimeException("connect server failed " + host,connectFuture.cause());}SocketLog.info("connect server success");}
连接 master 时会首先判断当前是否已经连接了 master,如果已经连接并且和当前 master ip 一致则直接返回,否则关闭 netty 通信,重置当前 work 的 master 信息。
然后通过 CountDownLatch 的 await(long timeout, TimeUnit unit)方法,来进行work 与 master 的超时连接判断,通过ChannelFutureListener在 master 与work 通信连接成功时设置 ServerChannel,并且容错方式为快速失败FailFastCluster.wrap(future.channel())
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
