【RocketMQ】Broker服务注册

Broker注册

在Broker的启动函数中,添加了定时向NameServer进行注册的任务,在启动后延迟10秒向NameServer进行注册,之后定时发送心跳包,关于发送周期,首先从Broker配置的周期与60000毫秒中选出最小的那个值,然后再与10000毫秒对比,选出最大的那个,所以最长10秒钟执行一次心跳发送

public class BrokerController {public void start() throws Exception {// ...// 定时向NameServer进行注册this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 向NameServer注册BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);// ...}
}

具体的注册逻辑是在BrokerOuterAPIregisterBrokerAll方法中实现的:

public class BrokerController {public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {   // ...if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.brokerConfig.getRegisterBrokerTimeoutMills())) {// 调用doRegisterBrokerAlldoRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);}}// 注册Brokerprivate void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,TopicConfigSerializeWrapper topicConfigWrapper) {// 调用BrokerOuterAPI的registerBrokerAll进行注册List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.getHAServerAddr(),topicConfigWrapper,this.filterServerManager.buildNewFilterServerList(),oneway,this.brokerConfig.getRegisterBrokerTimeoutMills(),this.brokerConfig.isCompressedRegister());// ...}
}

发送注册请求

registerBrokerAll方法的处理逻辑如下:

  1. 封装请求头,设置当前Broker的IP、Name等信息
  2. 封装请求体,主要是设置主题配置信息和消息过滤服务器列表并对请求体的内容计算CRC32校验和,NameServer收到请求时会对数据进行校验
  3. 遍历所有的NameServer服务列表,对每一个NameServer进行注册,为了提升效率注册任务是放在线程池中开启多线程执行的,所以使用了CountDownLatch,调用await方法等待所有的NameServer都注册完毕
  4. 底层使用Netty进行网络通信,向NameServer发送注册请求,请求对应的类型为REGISTER_BROKER
  5. 处理请求响应数据,将结果封装到RegisterBrokerResult中返回
public class BrokerOuterAPI {public List<RegisterBrokerResult> registerBrokerAll(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final boolean oneway,final int timeoutMills,final boolean compressed) {// 创建list,保存注册结果final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();if (nameServerAddressList != null && nameServerAddressList.size() > 0) {// 封装请求头final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);// 设置Broker地址requestHeader.setBrokerId(brokerId); // 设置Broker IdrequestHeader.setBrokerName(brokerName); // 设置Broker NamerequestHeader.setClusterName(clusterName);// 设置集群名称requestHeader.setHaServerAddr(haServerAddr);requestHeader.setCompressed(compressed);// 设置请求体RegisterBrokerBody requestBody = new RegisterBrokerBody();requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); // 设置主题配置requestBody.setFilterServerList(filterServerList);// 设置消息过滤服务器列表final byte[] body = requestBody.encode(compressed);// 计算CRC32final int bodyCrc32 = UtilAll.crc32(body);requestHeader.setBodyCrc32(bodyCrc32);final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());// 遍历NameServer服务列表for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new Runnable() {@Overridepublic void run() {try {// 进行注册,底层通过Netty进行网络通信RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);if (result != null) {// 将注册结果加入到列表registerBrokerResultList.add(result);}log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);} catch (Exception e) {log.warn("registerBroker Exception, {}", namesrvAddr, e);} finally {countDownLatch.countDown();}}});}try {// 等待所有的NameServer都注册完毕countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}}return registerBrokerResultList;}// 通过Netty发送注册请求private RegisterBrokerResult registerBroker(final String namesrvAddr,final boolean oneway,final int timeoutMills,final RegisterBrokerRequestHeader requestHeader,final byte[] body) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,InterruptedException {// 创建请求命令,这里发送的请求是REGISTER_BROKER类型RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);// 设置请求体request.setBody(body);if (oneway) {try {this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);} catch (RemotingTooMuchRequestException e) {// Ignore}return null;}// 发送请求RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {// 获取注册结果RegisterBrokerResponseHeader responseHeader =(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);RegisterBrokerResult result = new RegisterBrokerResult();// 设置Master Broker的地址result.setMasterAddr(responseHeader.getMasterAddr());result.setHaServerAddr(responseHeader.getHaServerAddr());if (response.getBody() != null) {result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));}return result;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());}
}

NameServer处理

NameServer在启动时注册了一个请求处理器DefaultRequestProcessor,当收到其他服务发送的请求时,会进入到processRequest方法中,通过Switch CASE对请求类型判断,进行不同的处理。

上文可知Broker注册时发送的请求类型为REGISTER_BROKER,对应的处理方法为registerBroker

public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {// 处理请求@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {if (ctx != null) {log.debug("receive request, {} {} {}",request.getCode(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),request);}// 判断请求类型switch (request.getCode()) {case RequestCode.PUT_KV_CONFIG:return this.putKVConfig(ctx, request);case RequestCode.GET_KV_CONFIG:return this.getKVConfig(ctx, request);case RequestCode.DELETE_KV_CONFIG:return this.deleteKVConfig(ctx, request);case RequestCode.QUERY_DATA_VERSION:return queryBrokerTopicConfig(ctx, request);case RequestCode.REGISTER_BROKER: // 如果是Broker注册请求Version brokerVersion = MQVersion.value2Version(request.getVersion());if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {return this.registerBrokerWithFilterServer(ctx, request);} else {// 处理Broker的注册请求return this.registerBroker(ctx, request);}case RequestCode.UNREGISTER_BROKER:return this.unregisterBroker(ctx, request);// ...default:break;}return null;}
}

Broker注册请求处理

对Broker请求注册的处理在registerBroker方法中:

  1. 对请求数据进行CRC32校验,检查数据的合法性
  2. 通过RouteInfoManager的registerBroker方法对Broker进行注册
  3. 将注册结果设置到响应数据中,返回给Broker
  public RemotingCommand registerBroker(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {// 创建响应命令对象final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();// 获取请求头final RegisterBrokerRequestHeader requestHeader =(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);// 进行crc32校验if (!checksum(ctx, request, requestHeader)) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("crc32 not match");return response;}TopicConfigSerializeWrapper topicConfigWrapper;// 如果请求体不为空if (request.getBody() != null) {// 获取请求中的主题配置topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);} else {topicConfigWrapper = new TopicConfigSerializeWrapper();topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));topicConfigWrapper.getDataVersion().setTimestamp(0);}// 进行注册RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(requestHeader.getClusterName(),requestHeader.getBrokerAddr(),requestHeader.getBrokerName(),requestHeader.getBrokerId(),requestHeader.getHaServerAddr(),topicConfigWrapper,null,ctx.channel());responseHeader.setHaServerAddr(result.getHaServerAddr());responseHeader.setMasterAddr(result.getMasterAddr());byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);// 设置响应内容response.setBody(jsonValue);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);// 返回响应return response;}

注册处理

注册处理的主要逻辑在RouteInfoManager中,首先来看三个与Broker信息相关的Map集合。

brokerAddrTable

brokerAddrTable是一个Map集合,主要存储Broker相关信息,key为Broker名称,value为BrokerData:

private final HashMap<String, BrokerData> brokerAddrTable; // key为Broker名称,value为BrokerData

BrokerData对象中记录了Broker所属集群名称、Broker名称以及Broker地址集合,其中KEY为Broker ID, value为Broker地址,因为可以搭建主从模式的Broker集群,此时BrokerName一致但是Broker ID和地址不同,所以使用Map记录Broker ID和地址的对应关系:

public class BrokerData implements Comparable<BrokerData> {private String cluster; // 集群名称private String brokerName; // Broker名称private HashMap<Long, String> brokerAddrs; // Broker地址集合,KEY为Broker ID, value为Broker 地址
}

clusterAddrTable

clusterAddrTable主要存储集群与Broker的对应关系,key为集群名称,value为该集群下的所有Broker Name集合:

private final HashMap<String, Set<String> clusterAddrTable; // key为集群名称,value为该集群下的所有Broker Name集合

brokerLiveTable

brokerLiveTable主要存储Broker的心跳发送信息,key为Broker地址,value为Broker发送心跳信息记录对象BrokerLiveInfo:

private final HashMap<String, BrokerLiveInfo> brokerLiveTable; // key为Broker地址,value为Broker发送心跳信息记录BrokerLiveInfo

BrokerLiveInfo中记录了NameServer收到Broker发送心跳包的时间、数据版本、与Broker网络通信的Channel以及HA Server地址:

class BrokerLiveInfo {private long lastUpdateTimestamp; // 收到心跳发送的时间private DataVersion dataVersion; // 版本private Channel channel; // 通信Channelprivate String haServerAddr; // HA Server地址,也就是所属Master Broker的地址
}

注册Broker

由于一个NameServer可能同时收到多个Broker的注册请求,所以在处理注册请求时使用了读写锁,在进行修改的时候添加写锁,处理逻辑如下:

  1. 根据集群名称从clusterAddrTable中查找对应的BrokerName集合,如果查找到,将当前的BrokerName加入到集合中,如果未查找到,新建集合将BrokerName加入到集合中,并添加到clusterAddrTable中。
  2. 根据Broker名称从brokerAddrTable获取BrokerData对象,如果获取为空,新建BrokerData对象并加入到brokerAddrTable
  3. 从BrokerData中获取同一BrokerName的所有地址信息,它是一个map集合,key为Broker ID, value为Broker地址
  4. 遍历Broker的地址信息集合,如果地址一致但是Broker ID不一致,删除旧的信息,保证同一个地址在map集合中只能有一条数据
  5. 将Broker加入到brokerLiveTable中,并记录收到注册请求的时间戳,在进行心跳检测的时候需要根据这个时间戳来判断是否在规定时间内未收到Broker的请求
  6. 如果发送请求的Broker不是Master,需要获取其所属的Master地址设置到注册结果中返回给Broker
public class RouteInfoManager {// 读写锁private final ReadWriteLock lock = new ReentrantReadWriteLock();private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;private final HashMap<String, BrokerData> brokerAddrTable; // key为Broker名称 value为对应的Broker相关数据对象BrokerDataprivate final HashMap<String, Set<String> clusterAddrTable; // key为集群名称,value为该集群下的所有Broker Name集合private final HashMap<String, BrokerLiveInfo> brokerLiveTable; // key为Broker地址,value为Broker发送心跳信息记录BrokerLiveInfoprivate final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;/*** Broker注册* @param clusterName 集群名称* @param brokerAddr Broker地址* @param brokerName Broker名称* @param brokerId Broker ID* @param haServerAddr 所属Master Broker的地址* @param topicConfigWrapper 主题配置* @param filterServerList 服务过滤列表* @param channel 通信Channel* @return*/public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {try {// 添加写锁this.lock.writeLock().lockInterruptibly();// 根据集群名称获取Broker Name集合Set<String> brokerNames = this.clusterAddrTable.get(clusterName);// 如果集合为空if (null == brokerNames) {// 新建Broker集合brokerNames = new HashSet<String>();// 加入clusterAddrTable中this.clusterAddrTable.put(clusterName, brokerNames);}// 加入到Broker Name集合brokerNames.add(brokerName);boolean registerFirst = false;// 根据Broker名称从brokerAddrTable获取Broker信息BrokerData brokerData = this.brokerAddrTable.get(brokerName);// 如果获取为空if (null == brokerData) {registerFirst = true;// 新建BrokerData对象brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());// 加入到brokerAddrTablethis.brokerAddrTable.put(brokerName, brokerData);}// 获取Broker地址集合Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();// 遍历Broker地址集合Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();while (it.hasNext()) {// 获取Broker地址Entry<Long, String> item = it.next();// 如果地址一致但是Broker ID不一致,删除旧的信息,保证同一个地址在map集合中只能有一条数据if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {it.remove();}}// 将新的地址信息添加到Broker地址集合中,key为Broker ID, value为Broker地址String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);// 是否首次注册registerFirst = registerFirst || (null == oldAddr);// 处理主题配置if (null != topicConfigWrapper&& MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {ConcurrentMap<String, TopicConfig> tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {this.createAndUpdateQueueData(brokerName, entry.getValue());}}}}// 将Broker加入到brokerLiveTable中,key为Broker地址BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(), // 记录收到心跳的时间topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);}// 处理服务过滤列表if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);} else {this.filterServerTable.put(brokerAddr, filterServerList);}}// 如果发送请求的broker不是Masterif (MixAll.MASTER_ID != brokerId) {// 获取Master Broker地址String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {// 设置HA Server地址result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());// 将Broker集群中的Master地址设置到注册结果result.setMasterAddr(masterAddr);}}}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("registerBroker Exception", e);}return result;}
}

心跳检测

NameServer在启动时注册了定时检查处于不活跃状态Broker的任务:

   public boolean initialize() {// 定时扫描下线的Brokerthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {// 扫描下线的BrokerNamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);}

brokerLiveTable保存了当前NameServer收到的心跳数据,遍历brokerLiveTable,获取每一个Broker最近一次发送心跳的时间,如果上一次发送心跳的时间 + 过期时间(120s) 小于 当前时间,也就是超过120s没有收到某个Broker的心跳包,则认为此Broker已下线,需要关闭该Broker的Channel,将Broker移除:

public class RouteInfoManager {// Broker Channel过期时间:120sprivate final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;public void scanNotActiveBroker() {Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();// 遍历所有的Brokerwhile (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();// 获取上一次发送心跳的时间long last = next.getValue().getLastUpdateTimestamp();// 如果上一次发送心跳的时间 + 过期时间 小于 当前时间if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {// 关闭ChannelRemotingUtil.closeChannel(next.getValue().getChannel());// 从brokerLiveTable中移除Brokerit.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);// 清除Broker的相关信息this.onChannelDestroy(next.getKey(), next.getValue().getChannel());}}}
}

路由剔除

onChannelDestroy中主要是做一些清除处理:

  1. 如果channel不为空,从brokerLiveTable中根据channel查找Broker的地址信息,如果查找到,记录Broker的地址信息到brokerAddrFound,如果未查找到,就使用参数中传入的remoteAddr
  2. 根据Broker地址从brokerLiveTable中移除相关信息
  3. 根据Broker地址从filterServerTable中移除相关信息
  4. 遍历Broker地址记录表brokerAddrTable,根据地址查找BrokerData中记录的地址信息集合,如果地址和需要查找的一致,从BrokerData中删除,并记录Broker名称,这一步主要是清理brokerAddrTable中的数据
  5. 遍历clusterAddrTable,根据上一步记录的Broker名称从集群对应的BrokerName集合中查找,并清除相关数据,这一步主要是清理clusterAddrTable中的数据
  6. 遍历topicQueueTable,清除当前Broker的主题信息
   /*** 销毁处理* @param remoteAddr Broker地址* @param channel 通信Channel*/public void onChannelDestroy(String remoteAddr, Channel channel) {String brokerAddrFound = null;// 如果channel不为空if (channel != null) {try {try {// 添加读锁,因为这里是查找不做修改所以添加了读锁this.lock.readLock().lockInterruptibly();Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =this.brokerLiveTable.entrySet().iterator();// 从brokerLiveTable中查找while (itBrokerLiveTable.hasNext()) {Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();// 根据Channel查找Brokerif (entry.getValue().getChannel() == channel) {// 如果查找到,记录Broker的地址brokerAddrFound = entry.getKey();break;}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}// 如果未查找到,就使用参数中传入的Brokerif (null == brokerAddrFound) {brokerAddrFound = remoteAddr;} else {log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);}if (brokerAddrFound != null && brokerAddrFound.length() > 0) {try {try {// 添加写锁this.lock.writeLock().lockInterruptibly();// 从brokerLiveTable中移除Brokerthis.brokerLiveTable.remove(brokerAddrFound);// 从filterServerTable中移除Brokerthis.filterServerTable.remove(brokerAddrFound);String brokerNameFound = null;boolean removeBrokerName = false;Iterator<Entry<String, BrokerData>> itBrokerAddrTable =this.brokerAddrTable.entrySet().iterator();// 遍历Broker地址记录表brokerAddrTablewhile (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {// BrokerDataBrokerData brokerData = itBrokerAddrTable.next().getValue();// 从BrokerData中获取同一BrokerName的所有地址集合Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();while (it.hasNext()) {Entry<Long, String> entry = it.next();// 获取Broker IDLong brokerId = entry.getKey();// 获取Broker地址String brokerAddr = entry.getValue();// 如果地址和需要查找的一致if (brokerAddr.equals(brokerAddrFound)) {// 记录BrokerNamebrokerNameFound = brokerData.getBrokerName();// 从brokerData中移除it.remove();log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",brokerId, brokerAddr);break;}}// 如果brokerData记录的地址集合为空if (brokerData.getBrokerAddrs().isEmpty()) {removeBrokerName = true;// 从brokerAddrTable中移除,因为brokerData中未记录任何Broker的地址,需要清除itBrokerAddrTable.remove();log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",brokerData.getBrokerName());}}// 从clusterAddrTable清除对应的Broker信息if (brokerNameFound != null && removeBrokerName) {// 遍历clusterAddrTableIterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();while (it.hasNext()) {Entry<String, Set<String>> entry = it.next();String clusterName = entry.getKey();Set<String> brokerNames = entry.getValue();// 从brokerNames集合中移除boolean removed = brokerNames.remove(brokerNameFound);if (removed) {log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",brokerNameFound, clusterName);// 如果移除后集合为空if (brokerNames.isEmpty()) {log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",clusterName);// 从clusterAddrTable中移除当前集群名称it.remove();}break;}}}// 如果移除的BrokerName不为空if (removeBrokerName) {// 遍历主题队列,从topicQueueTable中清除当前Broker的主题信息Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =this.topicQueueTable.entrySet().iterator();while (itTopicQueueTable.hasNext()) {Entry<String, List<QueueData>> entry = itTopicQueueTable.next();String topic = entry.getKey();List<QueueData> queueDataList = entry.getValue();Iterator<QueueData> itQueueData = queueDataList.iterator();while (itQueueData.hasNext()) {QueueData queueData = itQueueData.next();// 根据BrokerName查找if (queueData.getBrokerName().equals(brokerNameFound)) {// 移除itQueueData.remove();log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",topic, queueData);}}// 如果数据为空if (queueDataList.isEmpty()) {// 从itTopicQueueTable中移除itTopicQueueTable.remove();log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",topic);}}}} finally {// 释放锁this.lock.writeLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}}

参考
丁威、周继锋《RocketMQ技术内幕》

RocketMQ版本:4.9.3


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部