Spring boot整合RibbitMQ

1 、搭建SpringBoot环境

我们选择基于Spring-Rabbit去操作RabbitMQ

https://github.com/spring-projects/spring-amqp

使用spring-boot-starter-amqp会自动添加spring-rabbit依赖,如下:

org.springframework.bootspring‐boot‐starter‐amqporg.springframework.bootspring‐boot‐starter‐testorg.springframework.bootspring‐boot‐starter‐logging

2、配置

1、配置application.yml

配置连接rabbitmq的参数

server:port: 44000spring:application:name: test‐rabbitmq‐producerrabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtualHost: /

2、定义RabbitConfifig类,配置ExchangeQueue、及绑定交换机。

本例配置Topic交换机。

package com.xuecheng.test.rabbitmq.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitmqConfig {public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";public static final String QUEUE_INFORM_SMS = "queue_inform_sms";public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";/*** 交换机配置* ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置* @return the exchange*/@Bean(EXCHANGE_TOPICS_INFORM)public Exchange EXCHANGE_TOPICS_INFORM() {//durable(true)持久化,消息队列重启后交换机仍然存在return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();}//声明队列@Bean(QUEUE_INFORM_SMS)public Queue QUEUE_INFORM_SMS() {Queue queue = new Queue(QUEUE_INFORM_SMS);return queue;}//声明队列@Bean(QUEUE_INFORM_EMAIL)public Queue QUEUE_INFORM_EMAIL() {Queue queue = new Queue(QUEUE_INFORM_EMAIL);return queue;}/** channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#");* 绑定队列到交换机 .** @param queue the queue* @param exchange the exchange* @return the binding*/@Beanpublic Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs();}@Beanpublic Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs();}}

注意:以上的第一步搭建springboot环境和第二部的配置,不管在生产端还是在消费端都是必须且一样的。

3 、生产端

使用RarbbitTemplate发送消息

package com.xuecheng.test.rabbitmq;import com.xuecheng.test.rabbitmq.config.RabbitmqConfig;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest@RunWith(SpringRunner.class)public class Producer05_topics_springboot {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void testSendByTopics(){for (int i=0;i<5;i++){String message = "sms email inform to user"+i;rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,"inform.sms.email",message);System.out.println("Send Message is:'" + message + "'");}}}

4、消费端

使用@RabbitListener注解监听队列。

package com.xuecheng.test.rabbitmq.mq;import com.rabbitmq.client.Channel;import com.xuecheng.test.rabbitmq.config.RabbitmqConfig;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class ReceiveHandler {//监听email队列@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})public void receive_email(String msg,Message message,Channel channel){System.out.println(msg);}//监听sms队列@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS})public void receive_sms(String msg,Message message,Channel channel){System.out.println(msg);}}

测试


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部