Kafka数据可靠性和ACK应答机制
Kafka生产者发送数据流程
!](https://img-blog.csdnimg.cn/85d5d31e5bab470ba5abdabd930f458b.png)
一、ACK应答级别
一、0
-
介绍:生产者发送过来的数据,不需要等数据落盘应答
-
数据可靠性分析:容易丢数据
-
丢失数据原因:生产者发送完成后,Leader没有接收到数据,但是生产者认为已经发送成功了
二、1
-
介绍:生产者发送过来的数据,Leader收到数据后应答
-
数据可靠性分析:容易丢数据
-
丢失数据原因:应答完成后,还没开始同步副本,Leader挂了,新的Leader不会收到同步的消息,因为生产者已经认为发送成功了
三、-1(all)
- 介绍:生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答
- 数据可靠性分析:可靠
二、Leader收到数据后,有一个Follower因为某种故障,迟迟不能与Leader进行同步,如何解决
Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。这样就不用等长期联系不上或者已经故障的节点
三、数据完全可靠条件
数据完全可靠条件=ACK级别设置为-1+分区副本大于等于2+ISR里应答的最小副本数量大于等于2
四、可靠性分析
- acks=0,生产者发送过来数据就不管了,可靠性差,效率高
- acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等
- acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follower应答,可靠性高,效率低
- 在生产环境中,acks=0很少使用,acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景
五、Java代码实现
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducerAck {public static void main(String[] args) {// 1.创建kafka配置对象Properties properties = new Properties();// 2.配置对应参数properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 设置ackproperties.put(ProducerConfig.ACKS_CONFIG,"all");// 重试次数retries,默认是int最大值2147483647properties.put(ProducerConfig.RETRIES_CONFIG,3);// 3.创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 4.调用send方法,发送消息for(int i=0;i<5;i++){kafkaProducer.send(new ProducerRecord<>("first","testMessage"+1));}// 5.关闭资源kafkaProducer.close();}
}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducerAck {public static void main(String[] args) {// 1.创建kafka配置对象Properties properties = new Properties();// 2.配置对应参数properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 设置ackproperties.put(ProducerConfig.ACKS_CONFIG,"all");// 重试次数retries,默认是int最大值2147483647properties.put(ProducerConfig.RETRIES_CONFIG,3);// 3.创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 4.调用send方法,发送消息for(int i=0;i<5;i++){kafkaProducer.send(new ProducerRecord<>("first","testMessage"+1));}// 5.关闭资源kafkaProducer.close();}
}
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
