Rabbitmq学习笔记(超详细)

RabbitMq

先了解一个概念

消息队列

存储消息的队列。

关键词:存储、消息、队列

存储:存数据

消息:某种数据结构,比如字符串、对象、二进制数据、json等等

队列:先进先出的数据结构

消息队列是特殊的数据库么?

可以这么理解。

应用场景(作用):在多个不同的系统、应用之间实现消息的传输(也可以存储)。不需要考虑传输应用的编程语言、系统、框架等等。

可以让java开发的应用发消息,让php开发的应用收消息,这样就不用把所有代码写到同一个项日里(应用解耦)。

消息队列的模型

生产者:Producer,类比为快递员,发送消息的人(客户端)

消费者:Consumer,类比为取快递的人,接受读取消息的人(客户端)

消息:Message,类比为快递,就是生产者要传输给消费者的数据

消息队列:Queue

为什么不接传输,要用消息队列?生产者不用关心你的消费者要不要消费、什么时候消费,我只需要把东西给消息

队列,我的工作就算完成了。

生产者和消费者实现了解耦,互不影响。

三种应用场景

RabbitMq下载安装

先下载erlang

Otp 25.3.2 - Erlang/OTPThe official home of the Erlang Programming Languagehttps://www.erlang.org/patches/otp-25.3.2

再下载

RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQhttps://www.rabbitmq.com/#getstarted

Installing on Windows — RabbitMQhttps://www.rabbitmq.com/install-windows.html

image.png

安装rabbitmg监控面板:

在rabbitmq安装目录的sbin中执行下述脚本:

rabbitmq-plugins.bat enable rabbitmq_management

访问:http:/localhost:15672,用户名密码都是guest

如果想要在远程服务器安装访问rabbitmq管理面板,你要自己创建一个管理员账号,不能用默认的guest,否侧

会被拦截(官方出于安全考虑)。

如果被栏截,可以自己创建管理员用户:

基本概念

4个角色

生产者:发消息的,通过交换机发送到各个队列

消费者:收消息的,从各个队列里取出消息

交换机:(exchange)将消息转发到各个队列里

队列:(queue)存放消息

基于AMPQ协议

AMQP,即Advanced Message Queuing Protocol(高级消息队列协议)

通俗来说,在异步通讯中,消息不会立刻到达接收方,而是被存放到一个容器中,当满足一定的条件之后,消息会被容器发送给接收方,这个容器即消息队列,而完成这个功能需要双方和容器以及其中的各个组件遵守统一的约定和规则,AMQP就是这样的一种协议,消息发送与接收的双方遵守这个协议可以实现异步通讯。这个协议约定了消息的格式和工作方式。

快速入门

RabbitMQ Tutorials — RabbitMQ

java文档

RabbitMQ tutorial - "Hello World!" — RabbitMQ

一对一发送消息

Helloword测试

https://github.com/rabbitmq/rabbitmq-tutorials/blob/main/java/Send.java

引入依赖,以后看官方文档都是这样

向这里就是让我们去Maven官方文档搜索com.rabbitmq

https://mvnrepository.com/

生产者代码
package com.zijin.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;public class SingleProduce {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.out.println(" [x] Sent '" + message + "'");}}
}

执行完代码,面板有hello即为成功

消费者代码
package com.zijin.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;public class SingleConsumer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {
//        /创建连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//创建队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println("[*]Waiting for messages.To exit press CTRL+C");//定义了如何处理消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("[x]Received '" + message + "'");};//消费消息,会持续阻塞channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

生产一条消息立马就被消费了

源码解析

参数:

queueName:消息队列名称(注意,同名称的消息队列,只能用同样的参数创建一次)

durabale:消息队列重启后,消息是否丢失

exclusive:是否只允许当前这个创建消息队列的连接操作消息队列

autoDelete:没有人用队列后,是否要删除队列

多消费者

RabbitMQ tutorial - Work Queues — RabbitMQ

一个生产者给多台机器发消息

队列持久化

durable设置为Ture,服务器重启后队列不丢失

 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);   

消息持久化, MessageProperties.PERSISTENT_TEXT_PLAIN,

channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
生产者代码
spackage com.zijin.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.util.Scanner;public class MultiProduce {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);Scanner scanner=new Scanner(System.in);while (scanner.hasNext()){String message = scanner.nextLine();channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}}
消费者代码
package com.zijin.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class MultiCousumer {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");final Connection connection = factory.newConnection();for (int i = 0; i < 2; i++) {final Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1);
//定义了如何处理消息int finalI=i;DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");try {System.out.println(" [x] Received '" +"编号"+finalI+":"+message + "'");
//                停20秒,模拟机器能力有限Thread.sleep(20000);} catch (InterruptedException e) {throw new RuntimeException(e);}finally {System.out.println("[x] Done");channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}};channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {});}}}

//  每一个消费者最多只能同时处理一个任务

channel.basicQos(1);

消息确认机制

为了保证消息被成功消费,rabbitmq提供了消息确认机制

ack:消息被消费

nack:消费失败

reject:拒绝

支持自动确认消息

 channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {});

指定确认某一条消息

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

默认写成false,怎么解释?(相当于你知道你快递站里面放了几个快递,一直没有去取,改成了Ture,它就自动帮你自动收货了,false,你需要你本人去确认收货)。

2个小技巧:

1.使用Scanner接受用户输入,便于快速发送多条消息

2.使用for循环创建多个消费者,便于快速验证队列模型工作机制

交换机

一个生产者给多个队列发消息

交换机:提供消息转发功能

fanout交换机

扇出、广播

程导航知

特点:消息会被转发到所有绑定到该交换机的队列

场景:很适用于发布订阅的场景。比如写日志,可以多个系统间共享交换机有多种类别:fanout、direct,topic,headers

生产者代码
package com.zijin.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.util.Scanner;public class FanoutProduce {private static final String EXCHANGE_NAME = "fanout-exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {
//        创建交换机channel.exchangeDeclare(EXCHANGE_NAME, "fanout");Scanner scanner=new Scanner(System.in);while (scanner.hasNext()) {String message=scanner.nextLine();channel.basicPublish(EXCHANGE_NAME,"",null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}
}
消费者代码
package com.zijin.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class FanoutConsumer {private static final String EXCHANGE_NAME = "fanout-exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel1 = connection.createChannel();Channel channel2 = connection.createChannel();
//声明交换机channel1.exchangeDeclare(EXCHANGE_NAME, "fanout");
//      创建队列,随机分配一个队列名称String queueName1 ="小苗的工作队列";channel1.queueDeclare(queueName1, true, false, false, null);
//    绑定队列channel1.queueBind(queueName1, EXCHANGE_NAME, "");
//创建队列String queueName2 ="小刚的工作队列";channel1.queueDeclare(queueName2, true, false, false, null);
//绑定队列channel2.queueBind(queueName2, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" 小苗 Received '" + message + "'");};DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" 小马 Received '" + message + "'");};channel1.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { });channel2.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });}
}
Direct交换机

RabbitMQ tutorial - Routing — RabbitMQ

绑定:可以让交换机和队列进行关联,可以指定让交互机把什么样的消息发送给哪个队列(类似于计算机网络中,两个路由器,或者网络设备相互连接,也可以理解为网线)

routing Key:路由键,控制消息要转发给哪个队列的(IP地址)

特点:消息会根据路由键转发到指定的队列

场景:特定的消息只交给特定的系统(程序)来处理

绑定关系:完全匹配字符串

可以绑定同样的路由键。

比如发日志的场景,希望用独立的程序来处理不同级别的日志,比如C1系统处理error日志,C2系统处理其他级别的日志

生产者代码
package com.zijin.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.util.Scanner;public class DirectProduce {private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "direct");Scanner scanner=new Scanner(System.in);while (scanner.hasNext()) {String userInput = scanner.nextLine();String[] strings = userInput.split(" ");if (strings.length<1){continue;}String message=strings[0];String routingKey=strings[1];channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'with routingKey:'" + routingKey + "'");}}}//..
}
消费者代码
package com.zijin.mq;import com.rabbitmq.client.*;public class DirectConsumer {private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "direct");String queueName1 ="xiaoyu_queue";channel.queueDeclare(queueName1, true, false, false, null);
//    绑定队列channel.queueBind(queueName1, EXCHANGE_NAME, "xiaoyu");String queueName2 ="xiaochen_queue";channel.queueDeclare(queueName2, true, false, false, null);
//    绑定队列channel.queueBind(queueName2, EXCHANGE_NAME, "xiaochen");DeliverCallback xiaoyudeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};DeliverCallback xiaochendeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName1, true, xiaoyudeliverCallback, consumerTag -> { });channel.basicConsume(queueName2, true, xiaochendeliverCallback, consumerTag -> { });}
}
topic交换机

RabbitMQ tutorial - Topics — RabbitMQ

特点:消息会根据一个模糊的路由键转发到指定的队列

场景:特定的一类消息可以交给特定的一类系统(程序)来处理

绑定关系:可以模糊匹配多个绑定

·*:匹配一个单词,比如*.orange,那么a.orange、b.orange都能匹配

·#:匹配0个或多个单词,比如a.#,那么a.a、a.b、a.a.a都能匹配

注意,这里的匹配和MySQL的k的%不一样,只能按照单词来匹配,每个'.分隔单词,如果是'#.',其实可以忽略,匹配0个词也ok

老板要下发一个任务,让多个组来处理,这个下发消息的方式类比数据库中模糊查询的方式(%like%)

生产者代码
package com.zijin.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.util.Scanner;public class TopicProduce {private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "topic");Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String userInput = scanner.nextLine();String[] strings = userInput.split(" ");if (strings.length < 1) {continue;}String message = strings[0];String routingKey = strings[1];channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");}}//..}
}

消费者代码
package com.zijin.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class TopicConsumer {private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();
//    创建队列String queueName1 ="frontend_queue";channel.queueDeclare(queueName1, true, false, false, null);
//    绑定队列channel.queueBind(queueName1, EXCHANGE_NAME, "#.前端.#");String queueName2 ="backend_queue";channel.queueDeclare(queueName2, true, false, false, null);
//    绑定队列channel.queueBind(queueName2, EXCHANGE_NAME, "#.后端.#");String queueName3 ="product_queue";channel.queueDeclare(queueName3, true, false, false, null);
//    绑定队列channel.queueBind(queueName3, EXCHANGE_NAME, "#.产品.#");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback xiaoadeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [xiaoa] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};DeliverCallback xiaobdeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [xiaob] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};DeliverCallback xiaocdeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [xiaoc] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName1, true, xiaoadeliverCallback, consumerTag -> { });channel.basicConsume(queueName2, true, xiaobdeliverCallback, consumerTag -> { });channel.basicConsume(queueName3, true, xiaocdeliverCallback, consumerTag -> { });}
}

结果

header交换机和RPC不推荐使用

核心特性

官方文档:Time-To-Live and Expiration — RabbitMQ

消息过期机制

可以给每条消息指定一个有效期,一段时间内未被消费者处理,就过期了。

示例场景:消费者(库存系统)挂了,一个订单15分钟还没被库存系统处理,这个订单其实已经失效了,哪怕库存系统再恢复,其实也不用扣减库存。

适用场景:清理过期数据、模拟延迟队列的实现(不开会员就慢速)、专门让某个程序处理过期请求

1)给队列中的所有消息指定过期时间

生产者代码

package com.zijin.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;public class TtlProduce {private final static String QUEUE_NAME = "ttl_queue";public static void main(String[] argv) throws Exception {
//        创建连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {
//            发送消息String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.out.println(" [x] Sent '" + message + "'");}}
}

消费者代码

package com.zijin.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;public class TtlConsumer {private final static String QUEUE_NAME = "ttl_queue";public static void main(String[] argv) throws Exception {
//        /创建连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//创建队列Map args = new HashMap();args.put("x-message-ttl", 5000);channel.queueDeclare(QUEUE_NAME, false, false, false, args);System.out.println("[*]Waiting for messages.To exit press CTRL+C");//定义了如何处理消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("[x]Received '" + message + "'");};channel.basicConsume(QUEUE_NAME, flase, deliverCallback, consumerTag -> {});}
}

如果在过期时间内,还没有消费者取消息,消息才会过期。

注意,如果消息已经接收到,但是没确认,是不会过期的。(即使消息过期时间到了,后续依然会去处理,而不会被丢掉)

2)给某一条消息指定过期时间
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("1000").build();
channel.basicPublish("", "routing-key", properties, message.getBytes(StandardCharsets.UTF_8));
死信队列

官方文档:Dead Letter Exchanges — RabbitMQ

死信:过期的消息、拒收的消息、消息队列满了、处理失败的消息的统称

死信队列:专门处理死信的队列(注意,它就是一个普通队列,只不过是专门用来处理死信的,你甚至可以理解这个队列的名称叫“死信队列”)

死信交换机:专门给死信队列转发消息的交换机(注意,它就是一个普通交换机,只不过是专门给死信队列发消息而已,理解为这个交换机的名称就叫“死信交换机”)。也存在路由绑定

死信可以通过死信交换机绑定到死信队列。

生产者代码
package com.zijin.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.util.Scanner;public class DlxDirectProduce {private static final String DEAD_EXCHANGE_NAME = "dlx_direct_exchange";private static final String EXCHANGE_NAME = "direct2_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {
//        声明死信交换机channel.exchangeDeclare(DEAD_EXCHANGE_NAME, "direct");
//        创建老板死信队列String queueName1 ="laoban_dlx_queue";channel.queueDeclare(queueName1, true, false, false, null);
//    绑定队列channel.queueBind(queueName1, DEAD_EXCHANGE_NAME, "laoban");//        创建外包死信队列String queueName2 ="waibao_dlx_queue";channel.queueDeclare(queueName2, true, false, false, null);
//    绑定队列channel.queueBind(queueName2, DEAD_EXCHANGE_NAME, "waibao");Scanner scanner=new Scanner(System.in);while (scanner.hasNext()) {String userInput = scanner.nextLine();String[] strings = userInput.split(" ");if (strings.length<1){continue;}String message=strings[0];String routingKey=strings[1];channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'with routingKey:'" + routingKey + "'");}}}//..
}
消费者代码
package com.zijin.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;public class DlxDirectConsumer {private static final String DEAD_EXCHANGE_NAME = "dlx_direct_exchange";private static final String EXCHANGE_NAME = "direct2_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//指定死信队列参数Map args1 = new HashMap<>();
//    指定要绑定到哪一个交换机args1.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
//    指定死信要转发到哪一个死信队列队列args1.put("x-dead-letter-routing-key", "waibao");String queueName1 ="xiaodog_queue";channel.queueDeclare(queueName1, true, false, false, args1);
//    绑定队列channel.queueBind(queueName1, EXCHANGE_NAME, "xiaodog");Map args2 = new HashMap<>();args2.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);args2.put("x-dead-letter-routing-key", "laoban");String queueName2 ="xiaocat_queue";channel.queueDeclare(queueName2, true, false, false, args2);
//    绑定队列channel.queueBind(queueName2, EXCHANGE_NAME, "xiaocat");DeliverCallback laobandeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
//        拒绝消息channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};DeliverCallback waibaodeliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
//          拒绝消息channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName1, false, laobandeliverCallback, consumerTag -> { });channel.basicConsume(queueName2, false, waibaodeliverCallback, consumerTag -> { });}
}

RabbitMQ重点知识

也是面试考点

1.消息队列的概念、模型、应用场景

2.交换机的类别、路由绑定的关系

3.消息可靠性

a.消息确认机制(ack、nack、reject)

b.消息持久化(durable)

c.消息过期机制

d.死信队列

4.延迟队列(类似死信部队列)

5.顺序消费、消费幂等性(本次不讲)

6.可扩展性(仅作了解)

a.集群

b.故障的恢复机制

c.镜像

7.运维监控告警

RabbitMQ项目实战

看官方文档:Getting Started | Messaging with RabbitMQ(英文不好就不要了)

引入依赖

注意这里的版本一定要与自己的springboot版本一致

org.springframework.bootspring-boot-starter-amqp2.7.2

在yml配置

rabbitmq:host: localhostport: 5672username: guestpassword: guest

创建交换机和队列

package com.zijin.bimq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class MqInitMain {public static void main(String[] args) {//    创建测试所需要的交换机和队列,只需要执行一次try {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();
//        创建交换机String EXCHANGE_NAME="code_exchange";channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//        创建队列String QUEUE_NAME="code_queue";channel.queueDeclare(QUEUE_NAME,true,false,false,null);channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"my_routingKey");} catch (Exception e) {}}
}

生产者代码

package com.zijin.bimq;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class MyMessageProducer {@Resourceprivate RabbitTemplate rabbitTemplate;public void sendMessage(String exchange,String routingKey,String message){rabbitTemplate.convertAndSend(exchange,routingKey,message);}
}

消费者代码

package com.zijin.bimq;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
@Slf4j
@Componentpublic class MyMessageConsumer {@RabbitListener(queues = {},ackMode = "MANUAL")@SneakyThrowspublic void receiveMessage(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){log.info("receiveMessage message= {}",message);channel.basicAck(deliveryTag,false);}}

生产者测试类

package com.zijin.bimq;import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest
class MyMessageProducerTest {@Resourceprivate MyMessageProducer myMessageProducer;@Testvoid sendMessage(){myMessageProducer.sendMessage("code_exchange","my_routingKey","你好呀");}}


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部