Kafka——》消费组管理kafka-consumer-groups
推荐链接:
总结——》【Java】
总结——》【Mysql】
总结——》【Redis】
总结——》【Kafka】
总结——》【Spring】
总结——》【SpringBoot】
总结——》【MyBatis、MyBatis-Plus】
Kafka——》消费组管理kafka-consumer-groups
- list:查看消费者组列表
- describe:查看消费者组详情
- state:查看消费者组状态
- members:查看消费者组成员
- delete:删除消费者组
- reset-offsets:重置消费组的偏移量
- delete-offsets:删除消费组的偏移量
KAFKA_HOME=/opt/app/install/kafka
| 参数 | 描述 |
|---|---|
--bootstrap-server | kafka服务地址 |
--list | 查看消费者组列表 |
--group | 指定消费者组 |
--all-groups | 所有消费者组 |
--describe | 查看消费者组详情 |
--state | 查看消费者组状态 |
--members | 查看消费者组成员 |
--delete | 删除消费者组 |
--reset-offsets | 重置消费组的偏移量 |
--delete-offsets | 删除消费组的偏移量 |
--dry-run | 预先执行重置偏移量 |
--excute | 真正执行重置偏移量 |
--to-earliest | 将offset重置到最早 |
--to-latest | 将offset重置到最近 |
list:查看消费者组列表
# 查看消费者组列表
bin/kafka-consumer-groups.sh --bootstrap-server x.x.x.x:9092 --list
describe:查看消费者组详情
# 查看所有消费者组详情
bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --all-groups# 查看指定消费者组详情
bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --group listenerForSyncEsfCommunity1
| 结果列 | GROUP | TOPIC | PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG | CONSUMER-ID | HOST | CLIENT-ID |
|---|---|---|---|---|---|---|---|---|---|
| 描述 | 消费组 | 主题 | 分区编号 | 当前offset | 最新offset | 消息滞后(未消费)数量 | 消费者ID | 主机 | 客户端ID |
state:查看消费者组状态
# 查看所有消费者组状态
bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --all-groups --state# 查看指定消费者组状态
bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --group listenerForSyncEsfCommunity1 --state
| 结果列 | GROUP | COORDINATOR (ID) | ASSIGNMENT-STRATEGY | STATE | #MEMBERS |
|---|---|---|---|---|---|
| 描述 | 消费组 | 协调者ID | 分配策略 | 状态 - Stable:有消费者成员 - Empty:没有消费者成员 - Dead - PreparingRebalance - CompletingRebalance | 成员数量 |
members:查看消费者组成员
# 查看所有消费者组成员
bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --all-groups --members# 查看指定消费者组成员
bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --group listenerForSyncEsfCommunity1 --members
| 结果列 | GROUP | CONSUMER-ID | HOST | CLIENT-ID | #PARTITION |
|---|---|---|---|---|---|
| 描述 | 消费组 | 消费者ID | 主机 | 客户端ID | 分区 |
delete:删除消费者组
# 删除所有消费者组
bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --delete --all-groups# 删除指定消费者组
bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --delete --group listenerForSyncEsfCommunity1# 只有这个消费组的所有客户端都停止消费/不在线才能够成功删除,否则会报下面异常
Error: Deletion of some consumer groups failed:
* Group 'listenerForSyncEsfCommunity1' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
reset-offsets:重置消费组的偏移量
| 重置Offset的模式 | 描述 |
|---|---|
--to-earliest | 重置到最早offset |
--to-latest | 重置到最近offset |
--to-current | 重置到当前offset |
--to-datetime | 重置到指定时间offset 格式:YYYY-MM-DDTHH:mm:SS.sss 示例:2021-6-26T00:00:00.000 |
--to-offset | 重置到指定offset(多个分区都重置,一般不用这个) |
--shift-by | 按照偏移量增加或者减少offset - 正数:往前增加 - 负数:往后减少 |
--from-file | 根据CVS文档来重置 |
--dry-run | 预先执行重置偏移量 |
--excute | 真正执行重置偏移量 |
# 重置指定消费组的所有Topic的偏移量
bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --reset-offsets --to-earliest --group listenerForSyncEsfCommunity1 --all-topic --dry-run# 重置指定消费组的指定Topic的偏移量
bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --reset-offsets --to-earliest --group listenerForSyncEsfCommunity1 --topic test_topic --dry-run# 重置所有消费组的所有Topic的偏移量
bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --reset-offsets --to-earliest --all-group --all-topic --dry-run # 重置所有消费组的指定Topic的偏移量
bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --reset-offsets --to-earliest --all-group --topic test_topic --dry-run# 只有这个消费组不可用状态才能重置成功,否则会报下面异常:
Error: Assignments can only be reset if the group 'listenerForSyncEsfCommunity1' is inactive, but the current state is Stable.TOPIC PARTITION NEW-OFFSET
delete-offsets:删除消费组的偏移量
# 删除指定消费组的指定Topic的偏移量(下一次从头消费)
bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --delete-offsets --group listenerForSyncEsfCommunity1 --topic test_topic# 只有这个消费组不可用状态才能删除成功,否则会报异常
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
