RocketMQ 负载均衡时机和影响
全文转载自我的博客,更好的阅读体验和更多内容欢迎移步SSpiritsの秘密基地
本文综合 RocketMQ client 与 broker 的源码介绍负载均衡机制发生的时间、客户端发生负载对消费的影响(消息堆积/消费毛刺等)并且给出一些最佳实践的推荐
写在前面
网上大多数讲 RocketMQ 负载均衡的文章只介绍几种分配 MessageQueue 的策略或是长篇大论分析客户端 RebalanceService 的代码。但是其实负载均衡是客户端与服务端互相配合的过程,本文综合服务端和客户端代码回答如下三个问题:
- 何时会发生负载均衡
- 负载均衡对消费有何影响
- 如何减少负载均衡对消费的影响
如果不想看详细分析,这里直接给出结论:
负载均衡时机:
- 主动负载均衡
- 启动时立即进行负载均衡
- 定时(默认 20s)负载均衡一次
- 被动负载均衡(收到 broker 通知)
- 客户端上下线
- 上线
- 新客户端发送心跳到 broker
- 下线
- 客户端发送下线请求到 broker
- 底层连接异常:响应 netty channel 的 IDLE/CLOSE/EXCEPTION 事件
- 订阅关系变化:订阅新 topic 或有旧的 topic 不再订阅
负载均衡对消费的影响:
- 对于新分配的队列可能会重复消费,这也是官方要求消费要做好幂等的原因
- 对于不再负责的队列会短时间消费停止,如果原本的消费 TPS 很高或者正好出现生产高峰就会造成消费毛刺

源码分析
首先明确下上下文:源码分析基于 RocketMQ release-4.9.3 分支的代码以及集群消费模式的 push 消费者。广播模式不在本文的讨论范围内
Client 主动触发
在同一个 JVM 中不管创建多少 consumer,它们总是共享同一个 MQClientInstance,这个 MQClientInstance 接管和所有 consumer 和 broker 的交互以及协调负载均衡

MQClientInstance 有两个负载均衡相关的方法:rebalanceImmediately 和 doRebalance
前者在消费者启动和收到 Broker 通知时唤醒 RebalanceService 进行负载均衡,而 RebalanceService 调用后者执行负载均衡逻辑
跟踪 doRebalance 方法,我们发现实际的负载均衡逻辑在 RebalanceImpl#rebalanceByTopic 中实现:
private void rebalanceByTopic(final String topic, final boolean isOrder) {// 获取所有 MessageQueueSet<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);// 获取当前 group 所有在线的消费者List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);...// 排序Collections.sort(mqAll);Collections.sort(cidAll);// 获取当前客户端配置的负载均衡策略AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;...// 根据指定的负载均衡策略计算自己要负责的队列allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,...// 根据计算结果创建 ProcessQueue (用于拉取、消费消息的数据结构)boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
