RabbitMQ路由模式RoutingKey模式
RabbitMQ路由模式RoutingKey模式
生产者发送消息到交换机并指定一个路由key,消费者队列绑定到交换机时要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)

例如:我们可以把路由key设置为insert ,那么消费者队列key指定包含insert才可以接收消息,消费者队列key定义为update或者delete就不能接收消息。很好的控制了更新,插入和删除的操作。
采用交换机direct模式
注:
前面做的demo中RoutingKey设置的空
RoutingKey有值的时候,那么 经过消息队列之后,需要在经过 RoutingKey进行判断决定 消费者
pom.xml
4.0.0 com.toov5.rabibitMQScribe rabibitMQScribe 0.0.1-SNAPSHOT com.rabbitmq amqp-client 3.6.5
producer
package com.toov5.routing;import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.toov5.utils.MQConnectionUtils;//生产者 交换机类型 producerFanout类型
public class RoutingProducer {//交换机名称private static final String EXCHANGE_NAME = "my_routing"; public static void main(String[] args) throws IOException, TimeoutException {//建立MQ连接Connection connection = MQConnectionUtils.newConnection();//创建通道Channel channel = connection.createChannel();//生产者绑定交换机channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //交换机名称 交换机类型String routingKey="email";//创建对应的消息 String msString = "my_Routing_destination_msg"+routingKey;//通过频道 发送消息System.out.println("生产者投递消息:"+msString);channel.basicPublish(EXCHANGE_NAME, routingKey, null, msString.getBytes());//关闭通道 和 连接channel.close();connection.close();}}
consumer:
package com.toov5.routing;import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.toov5.utils.MQConnectionUtils;//邮件消费者
public class ConsumerEmailRouting {private static final String EMAIL_QUEUE ="email_queue_routing";//交换机名称private static final String EXCHANGE_NAME = "my_routing"; public static void main(String[] args) throws IOException, TimeoutException {System.out.println("邮件消费者启动");//建立MQ连接Connection connection = MQConnectionUtils.newConnection(); //创建通道Channel channel = connection.createChannel();//消费者声明队列channel.queueDeclare(EMAIL_QUEUE, false, false, false, null);//消费者队列绑定 路由channel.queueBind(EMAIL_QUEUE, EXCHANGE_NAME, "email");//消费者监听消息DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {//重写监听方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)throws IOException { String msg = new String(body,"UTF-8");System.out.println("邮件消费者获取生产者消息"+msg);}};channel.basicConsume(EMAIL_QUEUE,true, defaultConsumer); //绑定队列 事件监听}
}
package com.toov5.routing;import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.toov5.utils.MQConnectionUtils;//邮件消费者
public class ConsumerSMSRouting {private static final String SMS_QUEUE ="sms_queue_routing";//交换机名称private static final String EXCHANGE_NAME = "my_routing"; public static void main(String[] args) throws IOException, TimeoutException {System.out.println("短信消费者启动");//建立MQ连接Connection connection = MQConnectionUtils.newConnection(); //创建通道Channel channel = connection.createChannel();//消费者声明队列channel.queueDeclare(SMS_QUEUE, false, false, false, null);//消费者队列绑定 路由channel.queueBind(SMS_QUEUE, EXCHANGE_NAME, "email");channel.queueBind(SMS_QUEUE, EXCHANGE_NAME, "sms");//消费者监听消息DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {//重写监听方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)throws IOException { String msg = new String(body,"UTF-8");System.out.println("邮件消费者获取生产者消息"+msg);}};channel.basicConsume(SMS_QUEUE,true, defaultConsumer); //绑定队列 事件监听}
}
运行结果:

绑定两个:



本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
