SpringBoot连接多个RabbitMQ
目 录
- 1. 前 言
- 2. 重 写
- 2.1 重写与关联方one的连接工厂
- 2.2 重写与关联方two的连接工厂
- 2.3 创建队列及交换机并绑定
- 2.4 配置信息
- 2.5 注意点
- 3. 使 用
- 3.1 作为消费者
- 3.1 作为生产者
1. 前 言
在 SpringBoot 中整合单个 RabbitMQ 使用,是很简单的,只需要引入依赖,然后在配置里面配置好 MQ 的连接地址、账号、密码等信息,然后使用即可。但如果 MQ 的连接地址是多个,那这种连接方式就不奏效了。
前段时间,我开发的一个项目就遇到了这样的问题。那个项目,好几个关联方,每个关联方用的 MQ 的地址都不相同,也就意味着我这边要连接几个 RabbbitMQ 地址。SpringBoot 连接多个 RabbitMQ,怎么搞?
使用默认的连接方式是行不通的,我已经试过,而要实现 SpringBoot 连接多个 RabbitMQ,只能自定义重写一些东西,分别配置才可以,下面一起来走一下试试。
2. 重 写
首先要明确的是,下面的两个类是需要重写的:
- RabbitTemplate:往队列里面丢消息时,需要用到
- RabbitAdmin:声明队列、声明交换机、绑定队列和交换机用到
这里,我定义两个关联方,一个是 one,一个是 two,分别重写与它们的连接工厂。
2.1 重写与关联方one的连接工厂
package com.yuhuofei.mq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;/*** @author yuhuofei* @version 1.0* @description 重写与关联方one的连接工厂* @date 2022/10/3 16:57*/
@Slf4j
@Configuration
public class OneMQConfig {@Value("${one.spring.rabbitmq.host}")private String host;@Value("${one.spring.rabbitmq.port}")private int port;@Value("${one.spring.rabbitmq.username}")private String username;@Value("${one.spring.rabbitmq.password}")private String password;@Value("${one.spring.rabbitmq.virtual-host}")private String virtualHost;/*** 定义与one的连接工厂*/@Bean(name = "oneConnectionFactory")@Primarypublic ConnectionFactory oneConnectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);connectionFactory.setPublisherConfirms(true);connectionFactory.setPublisherReturns(true);return connectionFactory;}@Bean(name = "oneRabbitTemplate")@Primarypublic RabbitTemplate oneRabbitTemplate(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory) {RabbitTemplate oneRabbitTemplate = new RabbitTemplate(connectionFactory);oneRabbitTemplate.setMandatory(true);oneRabbitTemplate.setConnectionFactory(connectionFactory);oneRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** 确认消息送到交换机(Exchange)回调* @param correlationData* @param ack* @param cause*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("确认消息送到交换机(Exchange)结果:");log.info("相关数据:{}", correlationData);boolean ret = false;if (ack) {log.info("消息发送到交换机成功, 消息 = {}", correlationData.getId());//下面可自定义业务逻辑处理,如入库保存信息等} else {log.error("消息发送到交换机失败! 消息: {}}; 错误原因:cause: {}", correlationData.getId(), cause);//下面可自定义业务逻辑处理,如入库保存信息等}}});oneRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** 只要消息没有投递给指定的队列 就触发这个失败回调* @param message 投递失败的消息详细信息* @param replyCode 回复的状态码* @param replyText 回复的文本内容* @param exchange 当时这个消息发给那个交换机* @param routingKey 当时这个消息用那个路由键*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {//获取消息idString messageId = message.getMessageProperties().getMessageId();// 内容String result = null;try {result = new String(message.getBody(), "UTF-8");} catch (Exception e) {log.error("消息发送失败{}", e);}log.error("消息发送失败, 消息ID = {}; 消息内容 = {}", messageId, result);//下面可自定义业务逻辑处理,如入库保存信息等}});return oneRabbitTemplate;}@Bean(name = "oneFactory")@Primarypublic SimpleRabbitListenerContainerFactory oneFactory(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory,SimpleRabbitListenerContainerFactoryConfigurer configurer) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);configurer.configure(factory, connectionFactory);return factory;}@Bean(name = "oneRabbitAdmin")@Primarypublic RabbitAdmin oneRabbitAdmin(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}}
2.2 重写与关联方two的连接工厂
package com.yuhuofei.mq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author yuhuofei* @version 1.0* @description 重写与关联方two的连接工厂* @date 2022/10/3 17:52*/
@Slf4j
@Configuration
public class TwoMQConfig {@Value("${two.spring.rabbitmq.host}")private String host;@Value("${two.spring.rabbitmq.port}")private int port;@Value("${two.spring.rabbitmq.username}")private String username;@Value("${two.spring.rabbitmq.password}")private String password;@Value("${two.spring.rabbitmq.virtualHost}")private String virtualHost;/*** 定义与two的连接工厂*/@Bean(name = "twoConnectionFactory")public ConnectionFactory twoConnectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);connectionFactory.setPublisherConfirms(true);connectionFactory.setPublisherReturns(true);return connectionFactory;}@Bean(name = "twoRabbitTemplate")public RabbitTemplate twoRabbitTemplate(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory) {RabbitTemplate twoRabbitTemplate = new RabbitTemplate(connectionFactory);twoRabbitTemplate.setMandatory(true);twoRabbitTemplate.setConnectionFactory(connectionFactory);twoRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** 确认消息送到交换机(Exchange)回调* @param correlationData* @param ack* @param cause*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("确认消息送到交换机(Exchange)结果:");log.info("相关数据:{}", correlationData);boolean ret = false;if (ack) {log.info("消息发送到交换机成功, 消息 = {}", correlationData.getId());//下面可自定义业务逻辑处理,如入库保存信息等} else {log.error("消息发送到交换机失败! 消息: {}}; 错误原因:cause: {}", correlationData.getId(), cause);//下面可自定义业务逻辑处理,如入库保存信息等}}});twoRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** 只要消息没有投递给指定的队列 就触发这个失败回调* @param message 投递失败的消息详细信息* @param replyCode 回复的状态码* @param replyText 回复的文本内容* @param exchange 当时这个消息发给那个交换机* @param routingKey 当时这个消息用那个路由键*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {//获取消息idString messageId = message.getMessageProperties().getMessageId();// 内容String result = null;try {result = new String(message.getBody(), "UTF-8");} catch (Exception e) {log.error("消息发送失败{}", e);}log.error("消息发送失败, 消息ID = {}; 消息内容 = {}", messageId, result);//下面可自定义业务逻辑处理,如入库保存信息等}});return twoRabbitTemplate;}@Bean(name = "twoFactory")public SimpleRabbitListenerContainerFactory twoFactory(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory,SimpleRabbitListenerContainerFactoryConfigurer configurer) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);configurer.configure(factory, connectionFactory);return factory;}@Bean(name = "twoRabbitAdmin")public RabbitAdmin twoRabbitAdmin(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}
}
2.3 创建队列及交换机并绑定
package com.yuhuofei.mq.config;import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;
import javax.annotation.Resource;/*** @author yuhuofei* @version 1.0* @description 创建队列、交换机并绑定* @date 2022/10/3 18:15*/
public class QueueConfig {@Resource(name = "oneRabbitAdmin")private RabbitAdmin oneRabbitAdmin;@Resource(name = "twoRabbitAdmin")private RabbitAdmin twoRabbitAdmin;@Value("${one.out.queue}")private String oneOutQueue;@Value("${one.out.queue}")private String oneRoutingKey;@Value("${two.output.queue}")private String twoOutQueue;@Value("${two.output.queue}")private String twoRoutingKey;@Value("${one.topic.exchange.name}")private String oneTopicExchange;@Value("${two.topic.exchange.name}")private String twoTopicExchange;@PostConstructpublic void oneRabbitInit() {//声明交换机oneRabbitAdmin.declareExchange(new TopicExchange(oneTopicExchange, true, false));//声明队列oneRabbitAdmin.declareQueue(new Queue(oneOutQueue, true, false, false));//绑定队列及交换机oneRabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(oneOutQueue, true, false, false)).to(new TopicExchange(oneTopicExchange, true, false)).with(oneRoutingKey));}@PostConstructpublic void twoRabbitInit() {//声明交换机twoRabbitAdmin.declareExchange(new TopicExchange(twoTopicExchange, true, false));//声明队列twoRabbitAdmin.declareQueue(new Queue(twoOutQueue, true));//绑定队列及交换机twoRabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(twoOutQueue, true, false, false)).to(new TopicExchange(twoTopicExchange, true, false)).with(twoRoutingKey));}
}
2.4 配置信息
这里的配置信息,需要与各自的关联方约定好再配置
# 与关联方one的MQ配置
one.spring.rabbitmq.host=one.mq.com
one.spring.rabbitmq.port=5672
one.spring.rabbitmq.username=xxxxx
one.spring.rabbitmq.password=xxxxx
one.spring.rabbitmq.virtual-host=/xxxxx
one.out.queue=xxxaa.ssssd.cffs.xxxx
one.topic.exchange.name=oneTopExchange# 与关联方two的MQ配置
two.spring.rabbitmq.host=two.mq.com
two.spring.rabbitmq.port=5672
two.spring.rabbitmq.username=aaaaaaa
two.spring.rabbitmq.password=aaaaaaa
two.spring.rabbitmq.virtualHost=/aaaaaaa
two.out.queue=ddddd.sssss.hhhhh.eeee
two.topic.exchange.name=twoTopExchange
2.5 注意点
在连接多个 MQ 的情况下,需要在某个连接加上 @Primary 注解(见 2.1 中的代码),表示主连接,默认使用这个连接,如果不加,服务会起不来
3. 使 用
3.1 作为消费者
由于在前面的 2.3 中,声明了队列及交换机,并进行了绑定,那么作为消费者,监听相应的队列,获取关联方发送的消息进行处理即可。这里用监听关联方 one 的出队列做展示,two 的类似。
需要注意的地方是,在监听队列时,需要指定 ContainerFactory。
package com.yuhuofei.mq.service;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;/*** @author yuhuofei* @version 1.0* @description 监听关联方one的消息* @date 2022/10/3 18:38*/
@Slf4j
@Service
public class OneReceive {@RabbitListener(queues = "${one.out.queue}", containerFactory = "oneFactory")public void listenOne(Message message, Channel channel) {//获取MQ返回的数据channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);String data = new String(message.getBody(), StandardCharsets.UTF_8);log.info("MQ返回的数据:{}", data);//下面进行业务逻辑处理}
}
3.1 作为生产者
使用之前重写的 RabbitTemplate ,向各个关联方指定的队列发送消息。
package com.yuhuofei.mq.service;import com.google.gson.JsonObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Service;import javax.annotation.Resource;/*** @author yuhuofei* @version 1.0* @description 向关联方的队列发送消息* @date 2022/10/3 18:47*/
@Slf4j
@Service
public class SendMessage {@Resource(name = "oneRabbitTemplate")private RabbitTemplate oneRabbitTemplate;@Resource(name = "twoRabbitTemplate")private RabbitTemplate twoRabbitTemplate;public void sendToOneMessage(String messageId, OneMessageConverter message) {String exchange = message.getExchange();String routingKey = message.getRoutingKey();JsonObject data = message.getData();MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType("application/json");Message info = new Message(data.toString().getBytes(), messageProperties);info.getMessageProperties().setMessageId(messageId);oneRabbitTemplate.convertAndSend(exchange, routingKey, info, new CorrelationData(messageId));}public void sendToTwoMessage(String messageId, TwoMessageConverter message) {String exchange = message.getExchange();String routingKey = message.getRoutingKey();JsonObject data = message.getData();MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType("application/json");Message info = new Message(data.toString().getBytes(), messageProperties);info.getMessageProperties().setMessageId(messageId);twoRabbitTemplate.convertAndSend(exchange, routingKey, info, new CorrelationData(messageId));}
}
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
