SpringBoot集成Kafka实现消息的生产和消费

SpringBoot集成Kafka实现消息的生产和消费

    • Kafka
    • SpringBoot实现Kafka生产者

Kafka

Kafka作为一款优异的消息中间件以高吞吐而著名。我们在消息处理中往往也会用到

SpringBoot实现Kafka生产者

首先在pom文件中加入对Kafka支持的依赖

		org.springframework.kafkaspring-kafka

在相关application.yml文件中对kafka进行相关配置


```yaml
spring:kafka:#=============== provider  =======================# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。producer:retries: 0# 每次批量发送消息的数量,produce积累到一定数据,一次发送batch-size: 16384# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据buffer-memory: 33554432acks: 1consumer:enable-auto-commit: truegroup-id: qingpu-smart-city-testauto-offset-reset: latest#key的解码方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer#value的解码方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-commit-interval: 1000# kafka可以搭载多个,地址用逗号隔开bootstrap-servers: 127.0.0.1:9092

生产者

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;@Component
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;public void send(String message){String topic = "kafka-test";// 在这里消息的发送有kafka有很多种发送方法,可以进行参考,选择合适的发送方式ProducerRecord<String,String> producerRecord = new ProducerRecord<String, String>(topic, message);kafkaTemplate.send(producerRecord);System.out.println("Kafka发送消息:" + producerRecord.value());}
}

消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaTaskListener {@Autowiredprivate EventService eventService;@Autowiredprivate WorkOrderService workOrderService;@KafkaListener(topics = {"kafka-test"})public void receive(ConsumerRecord<String, String> message) {System.out.println("Kafka接收消息:" + message.value());}
}

以上就是Springboot实现kafka生产者和消费者的配置方式。另外如果需要在本地调试,需要在本地装配zookeeper和kafka环境,具体细节后续会继续做补充。


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部