Kafka——》消费组管理kafka-consumer-groups

推荐链接:
    总结——》【Java】
    总结——》【Mysql】
    总结——》【Redis】
    总结——》【Kafka】
    总结——》【Spring】
    总结——》【SpringBoot】
    总结——》【MyBatis、MyBatis-Plus】

Kafka——》消费组管理kafka-consumer-groups

  • list:查看消费者组列表
  • describe:查看消费者组详情
  • state:查看消费者组状态
  • members:查看消费者组成员
  • delete:删除消费者组
  • reset-offsets:重置消费组的偏移量
  • delete-offsets:删除消费组的偏移量

KAFKA_HOME=/opt/app/install/kafka

参数描述
--bootstrap-serverkafka服务地址
--list查看消费者组列表
--group指定消费者组
--all-groups所有消费者组
--describe查看消费者组详情
--state查看消费者组状态
--members查看消费者组成员
--delete删除消费者组
--reset-offsets重置消费组的偏移量
--delete-offsets删除消费组的偏移量
--dry-run预先执行重置偏移量
--excute真正执行重置偏移量
--to-earliest将offset重置到最早
--to-latest将offset重置到最近

list:查看消费者组列表

# 查看消费者组列表
bin/kafka-consumer-groups.sh --bootstrap-server x.x.x.x:9092 --list

describe:查看消费者组详情

# 查看所有消费者组详情
bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --all-groups# 查看指定消费者组详情
bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --group listenerForSyncEsfCommunity1
结果列GROUPTOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-ID
描述消费组主题分区编号当前offset最新offset消息滞后(未消费)数量消费者ID主机客户端ID

state:查看消费者组状态

# 查看所有消费者组状态 
bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --all-groups --state# 查看指定消费者组状态
bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --group listenerForSyncEsfCommunity1 --state
结果列GROUPCOORDINATOR (ID)ASSIGNMENT-STRATEGYSTATE#MEMBERS
描述消费组协调者ID分配策略状态
- Stable:有消费者成员
- Empty:没有消费者成员
- Dead
- PreparingRebalance
- CompletingRebalance
成员数量

members:查看消费者组成员

# 查看所有消费者组成员
bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --all-groups --members# 查看指定消费者组成员    
bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --group listenerForSyncEsfCommunity1 --members
结果列GROUPCONSUMER-IDHOSTCLIENT-ID#PARTITION
描述消费组消费者ID主机客户端ID分区

delete:删除消费者组

# 删除所有消费者组
bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --delete --all-groups# 删除指定消费者组
bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --delete --group listenerForSyncEsfCommunity1# 只有这个消费组的所有客户端都停止消费/不在线才能够成功删除,否则会报下面异常
Error: Deletion of some consumer groups failed:
* Group 'listenerForSyncEsfCommunity1' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.

reset-offsets:重置消费组的偏移量

重置Offset的模式描述
--to-earliest重置到最早offset
--to-latest重置到最近offset
--to-current重置到当前offset
--to-datetime重置到指定时间offset
格式:YYYY-MM-DDTHH:mm:SS.sss
示例:2021-6-26T00:00:00.000
--to-offset重置到指定offset(多个分区都重置,一般不用这个)
--shift-by按照偏移量增加或者减少offset
- 正数:往前增加
- 负数:往后减少
--from-file根据CVS文档来重置
--dry-run预先执行重置偏移量
--excute真正执行重置偏移量
# 重置指定消费组的所有Topic的偏移量
bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --reset-offsets --to-earliest --group listenerForSyncEsfCommunity1 --all-topic --dry-run# 重置指定消费组的指定Topic的偏移量
bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --reset-offsets --to-earliest --group listenerForSyncEsfCommunity1 --topic test_topic --dry-run# 重置所有消费组的所有Topic的偏移量
bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --reset-offsets --to-earliest --all-group --all-topic --dry-run # 重置所有消费组的指定Topic的偏移量
bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --reset-offsets --to-earliest --all-group --topic test_topic --dry-run# 只有这个消费组不可用状态才能重置成功,否则会报下面异常:
Error: Assignments can only be reset if the group 'listenerForSyncEsfCommunity1' is inactive, but the current state is Stable.TOPIC                          PARTITION  NEW-OFFSET     

delete-offsets:删除消费组的偏移量

# 删除指定消费组的指定Topic的偏移量(下一次从头消费)
bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --delete-offsets --group listenerForSyncEsfCommunity1 --topic test_topic# 只有这个消费组不可用状态才能删除成功,否则会报异常


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部