hadoop2.7.3源码解析之datanode注册和心跳机制

文章目录

  • datanode注册和心跳
    • datanode注册
    • datanode心跳
  • namenode接收注册和心跳信息
    • DatanodeManager简单介绍
    • namednoe接收注册的信息
    • namenode 接收心跳信息

datanode注册和心跳

在hadoop启动的时候,正常的流程是先启动namenoe,然后启动datanode,因为namenode要接受datanode的注册,datanode的注册和心跳是在其启动的时候就开始了,入口方法自然是datanode的main方法。

通过跟踪代码发现在datanode的构造方法里,初始化了BlockPoolManager对象,通过其 blockPoolManager.refreshNamenodes(conf);从配置文件中获取该datanode相关的namenode信息,然后向其发生注册和心跳信息。

具体的是BlockPoolManager里面的startAll()方法,通过startAll方法,会将datanode上面的所有BPOfferService启动.

synchronized void startAll() throws IOException {try {UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Object>() {@Overridepublic Object run() throws Exception {for (BPOfferService bpos : offerServices) {bpos.start();}return null;}});} catch (InterruptedException ex) {IOException ioe = new IOException();ioe.initCause(ex.getCause());throw ioe;}}

通过BPOfferService的start方法循环启动BPServiceActor线程,以便BPServiceActor向其对应的namenode发送注册和心跳消息。

    //This must be called only by blockPoolManagervoid start() {for (BPServiceActor actor : bpServices) {actor.start();}}

具体的实现方法自然在BPServiceActor的run方法中。

 /*** 无论发生任何异常,都不会停止offerService方法,除非shouldRun 或者shouldServiceRun返回false* No matter what kind of exception we get, keep retrying to offerService().* That's the loop that connects to the NameNode and provides basic DataNode* functionality.** Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can* happen either at shutdown or due to refreshNamenodes.*/@Overridepublic void run() {LOG.info(this + " starting to offer service");try {while (true) {// init stufftry {// setup storage//连接到namenode,注册datanodeconnectToNNAndHandshake();break;} catch (IOException ioe) {.......}}runningState = RunningState.RUNNING;while (shouldRun()) {try {//心跳报告offerService();} catch (Exception ex) {.....}}runningState = RunningState.EXITED;} catch (Throwable ex) {...} finally {....}}

datanode注册

datanode端的注册相对来说比较简单,通过跟踪connectToNNAndHandshake方法,最后调用的是DatanodeProtocolServerSideTranslatorPB.registerDatanode(RpcController, RegisterDatanodeRequestProto)方法。

在这里构造了一个DatanodeRegistration对象作为参数,里面包含了namenode需要验证datanode的一些基本信息。

最后通过datanode和namenode直接交互的协议DatanodeProtocol接口的registerDatanode方法向namenode发送rpc请求来注册datanode。

该方法最后将datanode注册namenode之后返回的结果处理后返回。

 @Overridepublic RegisterDatanodeResponseProto registerDatanode(RpcController controller, RegisterDatanodeRequestProto request)throws ServiceException {DatanodeRegistration registration = PBHelper.convert(request.getRegistration());DatanodeRegistration registrationResp;try {registrationResp = impl.registerDatanode(registration);} catch (IOException e) {throw new ServiceException(e);}return RegisterDatanodeResponseProto.newBuilder().setRegistration(PBHelper.convert(registrationResp)).build();}

datanode心跳

datanode的心跳操作主要是在offerService方法中,这个方法会一直运行下去直到shouldRun返回false。
心跳操作,首先向namenode发送心跳的请求,然后根据返回的结果更新一些信息,然后处理从namenode带回来的各种命令(DatanodeCommand数组)

发送心跳的方法是sendHeartBeat,最终调用了DatanodeProtocolServerSideTranslatorPB类的sendHeartbeat(RpcController, HeartbeatRequestProto)方法来发送心跳的请求。

@Overridepublic HeartbeatResponseProto sendHeartbeat(RpcController controller,HeartbeatRequestProto request) throws ServiceException {HeartbeatResponse response;try {......//发送心跳response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),report, request.getCacheCapacity(), request.getCacheUsed(),request.getXmitsInProgress(),request.getXceiverCount(), request.getFailedVolumes(),volumeFailureSummary);} catch (IOException e) {throw new ServiceException(e);}//返回命令HeartbeatResponseProto.Builder builder = HeartbeatResponseProto.newBuilder();DatanodeCommand[] cmds = response.getCommands();.............}

具体的发送心跳的协议我们来看下DatanodeProtocol类的sendHeartbeat方法。

 /**** sendHeartbeat方法告诉namenode这个datanode还活着,当然也包含一些状态的信息. namenode也会通过心跳信息给datanode发送一些命令,通过DatanodeCommand对象来封装命令。通过这些DatanodeCommand命令,DataNode做一些删除本地无效的块、或者将本地的块复制到其他的datanode的操作。** sendHeartbeat() tells the NameNode that the DataNode is still* alive and well.  Includes some status info, too. * It also gives the NameNode a chance to return * an array of "DatanodeCommand" objects in HeartbeatResponse.* A DatanodeCommand tells the DataNode to invalidate local block(s), * or to copy them to other DataNodes, etc.* @param registration datanode registration information datanode的注册信息* @param reports utilization report per storage datanode上每个存储的利用率报告(datanode可以配置多个存储目录,这些存储目录可以是异构的,如内存、disk、ssd等)* @param xmitsInProgress number of transfers from this datanode to others* @param xceiverCount number of active transceiver threads* @param failedVolumes number of failed volumes* @param volumeFailureSummary info about volume failures* @throws IOException on error*/@Idempotentpublic HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,StorageReport[] reports,long dnCacheCapacity,long dnCacheUsed,int xmitsInProgress,int xceiverCount,int failedVolumes,VolumeFailureSummary volumeFailureSummary)throws IOException;

namenode接收注册和心跳信息

DatanodeManager简单介绍

首先介绍下DatanodeManager中的几个重要变量

 /*** * * datanodeMap这个map主要是存储了StorageID到DatanodeDescriptor的映射关系,如注释所说,datanode向namenode注册的时候分为三种情况。* *  1.如果是以一个新的storage id注册,直接放到map里。*  2.现有节点重复注册,这个时候用新的替换旧的就行*  3.一个已经存在的datanode以一个不同的storage id来注册* * Stores the datanode -> block map.  * 

* Done by storing a set of {@link DatanodeDescriptor} objects, sorted by * storage id. In order to keep the storage map consistent it tracks * all storages ever registered with the namenode.* A descriptor corresponding to a specific storage id can be*

    *
  • added to the map if it is a new storage id;
  • *
  • updated with a new datanode started as a replacement for the old one * with the same storage id; and
  • *
  • removed if and only if an existing datanode is restarted to serve a* different storage id.
  • *

*

* Mapping: StorageID -> DatanodeDescriptor*/private final NavigableMap<String, DatanodeDescriptor> datanodeMap= new TreeMap<String, DatanodeDescriptor>();/*** 集群的网络结构* Cluster network topology*/private final NetworkTopology networktopology;/*** host和DatanodeDescriptor和映射,因为一个节点上可能会有多个datanode。* 所以在Host2NodesMap内部其实是String到DatanodeDescriptor[]的映射,* 这样的话对这个节点上的某一个datanode进行增删改查操作一个普通的map就无能为力了,* Host2NodesMap主要就是对这些增删改查操作做了一下封装* Host names to datanode descriptors mapping.*/private final Host2NodesMap host2DatanodeMap = new Host2NodesMap();

namednoe接收注册的信息

不管是注册和心跳,datanode都是通过rpc调用了namenode中的同名方法,具体的实现是在NameNodeRpcServe中。

在registerDatanode方法中,调用了FSNamesystem的registerDatanode方法,最终的处理方法是在DatanodeManager.registerDatanode(DatanodeRegistration)中。

首先通过下面的两行代码获取了注册的datanode在datanodemanage中的两个map中的信息。

 
//这里用 nodeS表示从datanodeMap中获取的datanode信息
DatanodeDescriptor nodeS = getDatanode(nodeReg.getDatanodeUuid());
//用nodeN表示从host2DatanodeMap获取的信息
DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(nodeReg.getIpAddr(), nodeReg.getXferPort());

接下来对datanodemanage中datanodeMap的注释中说的的三种情况 分别进行处理

//此情况为数据节点存在,但是使用了新的存储IDif (nodeN != null && nodeN != nodeS) {NameNode.LOG.info("BLOCK* registerDatanode: " + nodeN);// nodeN previously served a different data storage, // which is not served by anybody anymore.//移除removeDatanode(nodeN);// physically remove node from datanodeMap//物理层面的移除,包含移除这个datanode下面的数据块等wipeDatanode(nodeN);nodeN = null;}//重复注册的情况,主要就是更新信息if (nodeS != null) {................nodeS.updateRegInfo(nodeReg);nodeS.setSoftwareVersion(nodeReg.getSoftwareVersion());nodeS.setDisallowed(false); // Node is in the include list//重新解析网络的位置信息// resolve network locationif(this.rejectUnresolvedTopologyDN) {nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));nodeS.setDependentHostNames(getNetworkDependencies(nodeS));} else {nodeS.setNetworkLocation(resolveNetworkLocationWithFallBackToDefaultLocation(nodeS));nodeS.setDependentHostNames(getNetworkDependenciesWithDefault(nodeS));}getNetworkTopology().add(nodeS);// also treat the registration message as a heartbeatheartbeatManager.register(nodeS);incrementVersionCount(nodeS.getSoftwareVersion());startDecommissioningIfExcluded(nodeS);success = true;...........................}//接下来处理从未注册的过的新节点注册的情况// resolve network location//解析网络信息,将其加入集群的网络拓扑中if(this.rejectUnresolvedTopologyDN) {nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr));nodeDescr.setDependentHostNames(getNetworkDependencies(nodeDescr));} else {nodeDescr.setNetworkLocation(resolveNetworkLocationWithFallBackToDefaultLocation(nodeDescr));nodeDescr.setDependentHostNames(getNetworkDependenciesWithDefault(nodeDescr));}networktopology.add(nodeDescr);nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());// register new datanodeaddDatanode(nodeDescr);// also treat the registration message as a heartbeat// no need to update its timestamp// because its is done when the descriptor is createdheartbeatManager.addDatanode(nodeDescr);incrementVersionCount(nodeReg.getSoftwareVersion());startDecommissioningIfExcluded(nodeDescr);success = true;

上述对于三种注册的情况分别进行了处理,针对新的节点注册的情况,最终调用了addDatanode方法进行注册,主要就是在那两个map中添加相应的datanode信息,以及将datanode加到网络拓扑中。

/** Add a datanode. */void addDatanode(final DatanodeDescriptor node) {// To keep host2DatanodeMap consistent with datanodeMap,// remove  from host2DatanodeMap the datanodeDescriptor removed// from datanodeMap before adding node to host2DatanodeMap.//加到datanodeMap和 host2DatanodeMap中synchronized(datanodeMap) {host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));}//加到网络中networktopology.add(node); // may throw InvalidTopologyExceptionhost2DatanodeMap.add(node);checkIfClusterIsNowMultiRack(node);if (LOG.isDebugEnabled()) {LOG.debug(getClass().getSimpleName() + ".addDatanode: "+ "node " + node + " is added to datanodeMap.");}}

namenode 接收心跳信息

namenode处理心跳信息是在和datanode同名的方法sendHeartbeat中,最终的处理方法是DatanodeManager.handleHeartbeat方法。

心跳的具体流程如下:

1.先获取datanode的信息,判断是否允许连接(比如在exclude中),如果不允许的话,直接抛出异常。
2.判断是否注册过,如果没注册过,直接返回注册命令
3.更新datanode的信息,主要就是更新DatanodeDescriptor中的信息,如使用空间,剩余空间等。
4.检查是否处于安全模式
5.检查租约情况
6.生成复制的命令
7.生成删除的命令
8.生成缓存相关的命令
9.生成带宽相关的命令
10.返回所有的命令

相关的代码如下:

/** Handle heartbeat from datanodes. */public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,StorageReport[] reports, final String blockPoolId,long cacheCapacity, long cacheUsed, int xceiverCount, int maxTransfers, int failedVolumes,VolumeFailureSummary volumeFailureSummary) throws IOException {synchronized (heartbeatManager) {synchronized (datanodeMap) {DatanodeDescriptor nodeinfo = null;try {//获取datanode的信息nodeinfo = getDatanode(nodeReg);} catch(UnregisteredNodeException e) {return new DatanodeCommand[]{RegisterCommand.REGISTER};}//是否允许连接// Check if this datanode should actually be shutdown instead. if (nodeinfo != null && nodeinfo.isDisallowed()) {setDatanodeDead(nodeinfo);throw new DisallowedDatanodeException(nodeinfo);}//检查是否注册过if (nodeinfo == null || !nodeinfo.isRegistered()) {return new DatanodeCommand[]{RegisterCommand.REGISTER};}//更新datanode的信息,如使用空间,剩余空间等heartbeatManager.updateHeartbeat(nodeinfo, reports,cacheCapacity, cacheUsed,xceiverCount, failedVolumes,volumeFailureSummary);//是否处于安全模式// If we are in safemode, do not send back any recovery / replication// requests. Don't even drain the existing queue of work.if(namesystem.isInSafeMode()) {return new DatanodeCommand[0];}//检查租约情况//check lease recoveryBlockInfoContiguousUnderConstruction[] blocks = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);if (blocks != null) {BlockRecoveryCommand brCommand = new BlockRecoveryCommand(blocks.length);.................................return new DatanodeCommand[] { brCommand };}final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();//生成复制命令//check pending replicationList<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(maxTransfers);if (pendingList != null) {cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,pendingList));}//检查无效的数据块,生成删除命令//check block invalidationBlock[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);if (blks != null) {cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,blockPoolId, blks));}//生成缓存相关的命令boolean sendingCachingCommands = false;long nowMs = monotonicNow();if (shouldSendCachingCommands && ((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=timeBetweenResendingCachingDirectivesMs)) {DatanodeCommand pendingCacheCommand =getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,DatanodeProtocol.DNA_CACHE, blockPoolId);if (pendingCacheCommand != null) {cmds.add(pendingCacheCommand);sendingCachingCommands = true;}DatanodeCommand pendingUncacheCommand =getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,DatanodeProtocol.DNA_UNCACHE, blockPoolId);if (pendingUncacheCommand != null) {cmds.add(pendingUncacheCommand);sendingCachingCommands = true;}if (sendingCachingCommands) {nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);}}blockManager.addKeyUpdateCommand(cmds, nodeinfo);//生成带宽相关的命令// check for balancer bandwidth updateif (nodeinfo.getBalancerBandwidth() > 0) {cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));// set back to 0 to indicate that datanode has been sent the new valuenodeinfo.setBalancerBandwidth(0);}//返回所有的命令if (!cmds.isEmpty()) {return cmds.toArray(new DatanodeCommand[cmds.size()]);}}}return new DatanodeCommand[0];}

欢迎关注我的公众号【大数据技术与应用实战】,获取更多干货资料!
在这里插入图片描述


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部