Kafka提交偏移量的五种方式

讨论偏移量我们首先要知道如何查看偏移量及消费者目前消费的偏移量

./kafka-consumer-groups.sh --describe --bootstrap-server 192.168.153.128:9092 --group ConsumerGroup3

Consumer group 'ConsumerGroup3' has no active members:可以看到我们这个消费者组现在是没有消费者的

test_topic这个topic其他分区没有消息,只有分区2有消费记录:

CURRENT-OFFSET:目前已被消费者消费的消息偏移量

LOG-END-OFFSET:消息最大偏移量

LAG:消息堆积的条数

1、自动提交方式

这种提交方式有两个很重要的参数:

enable.auto.commit=true(是否开启自动提交,true or false)

auto.commit.interval.ms=5000(提交偏移量的时间间隔,默认5000ms)

每隔5秒,消费者会自动把从poll方法接收到的最大偏移量提交上去。自动提交是在轮询中进行,消费者每次轮询时都会检查是否提交该偏移量。可是这种情况会发生重复消费和丢失消息的情况。

重复消费:如果我们设auto.commit.interval.ms=60000,16:34首次提交偏移量62,此时又拉取了2条消息,此时分区2对应的消费者宕机,发生了分区再均衡,(分区的所有权从一个消费者转到另一个消费者被称为再均衡。一般新增消费者,消费者关闭或改变分区数都会发生再均衡)分区2的消息由另一个消费者消费,新的消费者会读取16:34提交的那个偏移量,这样就会发生重复消费了,我们来实践一下:

我们开两个消费端consumer1和consumer2:

private static final String TOPIC_NAME = "test_topic";public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers","192.168.153.128:9092,192.168.153.128:9093,192.168.153.128:9094");props.put("group.id", "ConsumerGroup3");/* 是否自动确认offset */props.put("enable.auto.commit", "true");/* 自动确认offset的时间间隔 */props.put("auto.commit.interval.ms", "60000");props.put("session.timeout.ms", "30000");
//        props.put("auto.offset.reset", "earliest");props.put("auto.offset.reset", "latest");// 序列化类props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer consumer = new KafkaConsumer(props);consumer.subscribe(Collections.singletonList(TOPIC_NAME));try {for (; ; ) {ConsumerRecords records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord record : records)System.out.printf("消费消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s\n",record.topic(), record.partition(), record.offset(), record.key(), record.value());


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部