手把手带你撸zookeeper源码-zookeeper集群之间如何进行ping探活机制
上两篇文章
手把手带你撸zookeeper源码-zookeeper故障重启时如何恢复数据(一)
手把手带你撸zookeeper源码-zookeeper故障重启时如何恢复数据(二)
已经详细的把zookeeper故障重启时如何恢复数据给详细的剖析完了,这篇文章我们继续剖析一下在leader和follower数据同步完成之后,leader还会继续做哪些事情,以及如何实现ping机制的
在上篇文章讲到leader同步完数据之后,会发送一个UPTODATE的通知给follower,表示数据已经同步完毕,follower可以对外提供服务了
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
这段代码之后就是下面的代码,因为leader和每个follower建立连接之后都会创建一个LearnerHandler线程来和follower交互,此线程同步完数据之后,会进入一个while(true)循环,然后等待着follower转发过来的写请求,follower发送过来的ack,还有ping探活等
//2PC + 过半写机制----等待写请求while (true) {qp = new QuorumPacket();ia.readRecord(qp, "packet");long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;if (qp.getType() == Leader.PING) {traceMask = ZooTrace.SERVER_PING_TRACE_MASK;}if (LOG.isTraceEnabled()) {ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);}tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;ByteBuffer bb;long sessionId;int cxid;int type;switch (qp.getType()) {case Leader.ACK:if (this.learnerType == LearnerType.OBSERVER) {if (LOG.isDebugEnabled()) {LOG.debug("Received ACK from Observer " + this.sid);}}syncLimitCheck.updateAck(qp.getZxid());leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());break;case Leader.PING:// Process the touchesByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());DataInputStream dis = new DataInputStream(bis);while (dis.available() > 0) {long sess = dis.readLong();int to = dis.readInt();leader.zk.touch(sess, to);}break;case Leader.REVALIDATE:bis = new ByteArrayInputStream(qp.getData());dis = new DataInputStream(bis);long id = dis.readLong();int to = dis.readInt();ByteArrayOutputStream bos = new ByteArrayOutputStream();DataOutputStream dos = new DataOutputStream(bos);dos.writeLong(id);boolean valid = leader.zk.touch(id, to);if (valid) {try {//set the session owner// as the follower that// owns the sessionleader.zk.setOwner(id, this);} catch (SessionExpiredException e) {LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);}}if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,"Session 0x" + Long.toHexString(id)+ " is valid: "+ valid);}dos.writeBoolean(valid);qp.setData(bos.toByteArray());queuedPackets.add(qp);break;case Leader.REQUEST: bb = ByteBuffer.wrap(qp.getData());sessionId = bb.getLong();cxid = bb.getInt();type = bb.getInt();bb = bb.slice();Request si;if(type == OpCode.sync){si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());} else {si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());}si.setOwner(this);leader.zk.submitRequest(si);break;default:LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));break;}}
LearnerHandler这个线程的启动代码分析完毕之后,接下里继续Leader.lead()中的代码,我们接着往下看
readyToStart = true;// 获取epochlong epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());zk.setZxid(ZxidUtils.makeZxid(epoch, 0));synchronized(this){lastProposed = zk.getZxid();}newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),null, null);//等待epoch ackwaitForEpochAck(self.getId(), leaderStateSummary);self.setCurrentEpoch(epoch);try {// 等待new ackwaitForNewLeaderAck(self.getId(), zk.getZxid());} catch (InterruptedException e) {shutdown("Waiting for a quorum of followers, only synced with sids: [ "+ getSidSetString(newLeaderProposal.ackSet) + " ]");HashSet followerSet = new HashSet();for (LearnerHandler f : learners)followerSet.add(f.getSid());if (self.getQuorumVerifier().containsQuorum(followerSet)) {LOG.warn("Enough followers present. "+ "Perhaps the initTicks need to be increased.");}Thread.sleep(self.tickTime);self.tick.incrementAndGet();return;}
这块代码上两篇文章都分析过,此处不再分析
接下来我们主要看一下下面这一行代码
startZkServer();
启动一个zkServer(),我们之前分析follower启动之后,会调用zk.startup()方法来初始化follower processor调用链,对这块有疑问的小伙伴可以看看之前的这篇文章 手把手带你撸zookeeper源码-zookeeper的sessionId生成策略和follower调用链初始化
我们猜想一下这个方法也肯定会初始化leader的processor调用链,当一个写请求进来之后,然后去一步步执行初始化好的Processor调用链
private synchronized void startZkServer() {// Update lastCommitted and Db's zxid to a value representing the new epochlastCommitted = zk.getZxid();// 是不是似曾相识?zk.startup();self.updateElectionVote(getEpoch());zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());}
zk.startup()这个方法是不是似曾相识?它也是走到了父类ZookeeperServer这个方法里面的startup()
public synchronized void startup() {if (sessionTracker == null) {createSessionTracker();}// 启动一个定时任务,清理sessionstartSessionTracker();// 初始化 leader 、 follower、 observer的调用链setupRequestProcessors();registerJMX();setState(State.RUNNING);notifyAll();}
唯一不同的一点就是调用setupRequestProcessors()这个方法的实现这不同,leader对应于LeaderZookeeperServer类,follower对应于FollowerZookeeperServer类,而observer对应于ObserverZookeeperServer类,这块之前的文章都有剖析,感兴趣的小伙伴可以去之前的文章看一下 手把手带你撸zookeeper源码-zookeeper中follower启动的时候会做什么?
我们进入LeaderZookeeperServer.setupRequestProcessors()方法中
@Overrideprotected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader().toBeApplied);commitProcessor = new CommitProcessor(toBeAppliedProcessor,Long.toString(getServerId()), false,getZooKeeperServerListener());commitProcessor.start();ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,commitProcessor);proposalProcessor.initialize();firstProcessor = new PrepRequestProcessor(this, proposalProcessor);((PrepRequestProcessor)firstProcessor).start();}
画个图来分析一下

大概就是如上图来进行一些列的初始化和线程的启动,这块如何调用会在客户端发送请求,zookeeper集群如何处理请求时详细分析如何进行调用的
接下来的代码
String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");if (initialZxid != null) {long zxid = Long.parseLong(initialZxid);zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);}if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {self.cnxnFactory.setZooKeeperServer(zk);}
这块代码没什么好分析的,不重要,跳过
boolean tickSkip = true;while (true) {Thread.sleep(self.tickTime / 2);if (!tickSkip) {self.tick.incrementAndGet();}HashSet syncedSet = new HashSet();// lock on the followers when we use it.syncedSet.add(self.getId());for (LearnerHandler f : getLearners()) {if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {syncedSet.add(f.getSid());}f.ping();}// check leader running statusif (!this.isRunning()) {shutdown("Unexpected internal error");return;}if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {//if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {return;} tickSkip = !tickSkip;}
然后会进入一个while循环,中间每隔tickTime / 2时间执行一次,即在tickTime时间内执行两次,看关键代码
for (LearnerHandler f : getLearners()) {// Synced set is used to check we have a supporting quorum, so only// PARTICIPANT, not OBSERVER, learners should be usedif (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {syncedSet.add(f.getSid());}f.ping();}
getLearners()是一个set集合,里面保存了所有和leader建立连接的follower的处理连接请求的LearnerHandler,对这块有疑问的可以看一下上篇文章,当follower和leader创建连接之后,会创建LearnerHandler线程并启动去处理和follower的读写请求,启动LearnerHandler线程池,会把其保存到learners集合中
遍历所有的LearnerHandler,判断此LearnerHandler对应的follower是否一个PARTICIPANT参与者,如果是则加入到syncedSet集合中,此集合保存的是所有具有投票权的法定人数集合,接着重要中的关键点来了,就是f.ping()
public void ping() {long id;if (syncLimitCheck.check(System.nanoTime())) {synchronized(leader) {id = leader.lastProposed;}QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);queuePacket(ping);} else {LOG.warn("Closing connection to peer due to transaction timeout.");shutdown();}}
其实就是保障了一个ping请求发送出去,给follower和observer
接下来看看follower如何处理Ping请求的
上篇文章分析了,当follower和leader同步完数据之后会执行下面的代码等待leader发送请求过来
while (this.isRunning()) {readPacket(qp); //从leader读数据processPacket(qp);}
readPacket(qp)就是解析leader发送过来的数据,反序列化到qp对象之中
void readPacket(QuorumPacket pp) throws IOException {synchronized (leaderIs) {leaderIs.readRecord(pp, "packet");}long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;if (pp.getType() == Leader.PING) {traceMask = ZooTrace.SERVER_PING_TRACE_MASK;}if (LOG.isTraceEnabled()) {ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);}}
紧接着执行processPacket(qp),把读取到的数据交给processPacket方法去执行
我直接截取相关的很小部分代码
protected void processPacket(QuorumPacket qp) throws IOException{switch (qp.getType()) {case Leader.PING: ping(qp); break;}}
如果qp.getType == Leader.PING, 直接调用ping方法
protected void ping(QuorumPacket qp) throws IOException {// Send back the ping with our session dataByteArrayOutputStream bos = new ByteArrayOutputStream();DataOutputStream dos = new DataOutputStream(bos);HashMap touchTable = zk.getTouchSnapshot();for (Entry entry : touchTable.entrySet()) {dos.writeLong(entry.getKey());dos.writeInt(entry.getValue());}qp.setData(bos.toByteArray());writePacket(qp, true);}
简单来看就是包装了一些数据,然后给leader发送过去,这里可以稍微看一下,关键点就是zk.getTouchSnapshot()方法
protected HashMap getTouchSnapshot() {if (sessionTracker != null) {return ((LearnerSessionTracker) sessionTracker).snapshot();}return new HashMap();}
其实就是获取当前follower保存的所有session,然后每次ping时,每个follower都会把本地维护的所有session相关信息发送给leader
然后可以再去看看leader如何处理的,代码就是本篇文章最上面的代码,会执行如下的分支
case Leader.PING:// Process the touchesByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());DataInputStream dis = new DataInputStream(bis);while (dis.available() > 0) {long sess = dis.readLong();int to = dis.readInt();leader.zk.touch(sess, to);}break;
执行leader.zk.touch()方法
synchronized public boolean touchSession(long sessionId, int timeout) {SessionImpl s = sessionsById.get(sessionId);// Return false, if the session doesn't exists or marked as closingif (s == null || s.isClosing()) {return false;}//expireTime 计算下一次session过期时间long expireTime = roundToInterval(Time.currentElapsedTime() + timeout);//if (s.tickTime >= expireTime) {// Nothing needs to be donereturn true;}SessionSet set = sessionSets.get(s.tickTime);if (set != null) {set.sessions.remove(s);}s.tickTime = expireTime;set = sessionSets.get(s.tickTime);if (set == null) {set = new SessionSet();sessionSets.put(expireTime, set);}set.sessions.add(s);return true;}
最后执行到org.apache.zookeeper.server.SessionTrackerImpl#touchSession
执行的逻辑就是leader会判断发送过来的session来同步到自己本地维护的session中,如果expire则会移除,这块我们后面进行分析,大概知道这个流程即可
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
