RabbitMQ学习---Java整合RabbitMQ四种模型
Java整合RabbitMQ四种模型
- 一. RabbitMQ支持的消息模型
- 二. 创建Maven项目导入依赖
- 三. 封装连接RabbitMQ工具类
- 四. 第一种模型(直连)
- 1. 生产者
- 2. 消费者
- 3. 测试
- 五. 第二种模型(work quene)
- 1. 生产者
- 2.消费者1
- 3.消费者2
- 4. 测试
- 5. 消息应答
- 1. 消息应答机制
- 2. 自动应答
- 3. 消息应答的方法
- 4. Multiple 的解释
- 5. 消息自动重新入队
- 6. 消息手动应答代码
- ⑥. RabbitMQ 持久化
- 1. 队列实现持久化
- 2. 消息实现持久化
- ⑦. 不公平分发
- ⑧. 预取值
- 六. 第三种模型(fanout)
- 1. 开发生产者
- 2. 消费者-1
- 3. 消费者-2
- 4. 测试
- 七. 第四种模型(Routing)
- 1. Routing 之订阅模型-Direct(直连)
- 1. 生产者
- 3. 开发消费者-1
- 4. 开发消费者-2
- 5. 测试
- 2 .Routing 之订阅模型-Topic
- 1.生产者
- 2.消费者1
- 3.消费者2
- 4.测试
- 八. GitHub传送门
一. RabbitMQ支持的消息模型


二. 创建Maven项目导入依赖
<dependencies><dependency><groupId>junitgroupId><artifactId>junitartifactId><version>4.12version>dependency><dependency><groupId>com.rabbitmqgroupId><artifactId>amqp-clientartifactId><version>5.7.2version>dependency>dependencies>
三. 封装连接RabbitMQ工具类

package com.xizi.utils;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class RabbitUtils {private static ConnectionFactory connectionFactory;//重量级资源,类加载执行一次static {//创建连接mq的连接工厂对象connectionFactory = new ConnectionFactory();//设置连接rabbitmq主机connectionFactory.setHost("121.5.139.XXX");//设置端口号connectionFactory.setPort(5672);//设置连接那个虚拟主机connectionFactory.setVirtualHost("/");
// connectionFactory.setVirtualHost("/ems");//设置访问虚拟主机的用户名和密码connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");}//定义提供连接对象的方法public static Connection getConnection(){try {return connectionFactory.newConnection();} catch (Exception e) {e.printStackTrace();}return null;}//关闭通道和关闭连接方法public static void ColseConnectionAndChanel(Connection connection,Channel channel) {//关闭通道,连接try {if(channel!=null){channel.close();}if(connection!=null){connection.close();}} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}}
}
四. 第一种模型(直连)

- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
1. 生产者
参数的说明,我都在代码里详细介绍了
package com.xizi.helloworld;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import com.xizi.utils.RabbitUtils;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Provider {//生成消息@Testpublic void testSendMessage() throws IOException, TimeoutException {//获取连接对象Connection connection = RabbitUtils.getConnection();//获取连接的通道Channel channel = connection.createChannel();//通道绑定对应消息队列//参数1:队列名称//参数2:队列是否要持久化//参数3:是否独占队列//参数4: 是否在消费完成之后自动删除队列//参数5:额外参数channel.queueDeclare("hello",true,false,false,null );//发布消息//参数1:交换机名称//参数2:队列名称//参数3:传递消息额外设置//参数4:消息的具体内容channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello rabbitmq".getBytes());//关闭通道,连接RabbitUtils.ColseConnectionAndChanel(connection,channel );}
}
2. 消费者
package com.xizi.helloworld;import com.rabbitmq.client.*;
import com.xizi.utils.RabbitUtils;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Customer {public static void main(String[] args) throws IOException, TimeoutException {//获取连接对象Connection connection = RabbitUtils.getConnection();//获取连接的通道Channel channel = connection.createChannel();//通道绑定对应消息队列//参数1:队列名称//参数2:队列是否要持久化//参数3:是否独占队列//参数4: 是否在消费完成之后自动删除队列//参数5:额外参数channel.queueDeclare("hello",true,false,false,null );//消费消息//参数1: 消费哪个队列的消息名称//参数2: 开始消息的自动确认直接//参数3: 消费时的回调接口channel.basicConsume("hello",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("========================"+new String(body));}});//关闭通道,连接
// channel.close();
// connection.close();}
}
3. 测试
先开启消费者,再开启生产者


五. 第二种模型(work quene)
- Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的

- P:生产者:任务的发布者
- C1:消费者-1,领取任务并且完成任务
- C2:消费者-2:领取任务并完成任务

1. 生产者
package com.xizi.workqueue;public class Provider {//生成消息@Testpublic void testSendMessage() throws IOException, TimeoutException {//获取连接对象Connection connection = RabbitUtils.getConnection();//获取连接的通道Channel channel = connection.createChannel();//通道绑定对应消息队列channel.queueDeclare("work",true,false,false,null );//发布消息for (int i = 1; i <= 20; i++) {channel.basicPublish("", "work", null, (i+"hello workqueuq").getBytes());}//关闭通道,连接RabbitUtils.ColseConnectionAndChanel(connection,channel );}
}
2.消费者1
package com.xizi.workqueue;public class Customer1 {public static void main(String[] args) throws IOException, TimeoutException {//获取连接对象Connection connection = RabbitUtils.getConnection();//获取连接的通道final Channel channel = connection.createChannel();//通道绑定对应消息队列//参数1:队列名称//参数2:队列是否要持久化//参数3:是否独占队列//参数4: 是否在消费完成之后自动删除队列//参数5:额外参数channel.queueDeclare("work",true,false,false,null );//channel.basicQos(1); //每次通道里只存放一个消息//消费消息//参数1: 消费哪个队列的消息名称//参数2: 开始消息的自动确认直接//参数3: 消费时的回调接口channel.basicConsume("work",false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者-2=="+new String(body));//参数1: 确认队列中那个具体消息//参数2: 是否开启多个消息同时确认// channel.basicAck(envelope.getDeliveryTag(),false);}});//关闭通道,连接
// channel.close();
// connection.close();}
}
3.消费者2
package com.xizi.workqueue;public class Customer2 {public static void main(String[] args) throws IOException, TimeoutException {//获取连接对象Connection connection = RabbitUtils.getConnection();//获取连接的通道final Channel channel = connection.createChannel();//通道绑定对应消息队列//参数1:队列名称//参数2:队列是否要持久化//参数3:是否独占队列//参数4: 是否在消费完成之后自动删除队列//参数5:额外参数channel.queueDeclare("work",true,false,false,null );//消费消息//参数1: 消费哪个队列的消息名称//参数2: 开始消息的自动确认直接//参数3: 消费时的回调接口channel.basicConsume("work",false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者-1==="+new String(body));//参数1: 确认队列中那个具体消息//参数2: 是否开启多个消息同时确认// channel.basicAck(envelope.getDeliveryTag(),false);}});//关闭通道,连接
// channel.close();
// connection.close();}
}
4. 测试
查看队列中有20个消息等待处理

测试效果,消费者1让它睡2秒

消费者1

消费者2

5. 消息应答
1. 消息应答机制
- 为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收
到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
2. 自动应答
- 消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权
衡,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
3. 消息应答的方法
-
Channel.basicAck(用于肯定确认) ,已知道该消息并且成功的处理消息,可以将其丢弃了
-
Channel.basicNack(用于否定确认)
-
Channel.basicReject(用于否定确认) 与 Channel.basicNack 相比少一个参数, 不处理该消息了直接拒绝,可以将其丢弃了
4. Multiple 的解释


5. 消息自动重新入队
- 如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

6. 消息手动应答代码
- 默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改
为手动应答
- 设置通道一次只能消费一个消息
- 关闭消息的自动确认,开启手动确认消息
消费者_C1
// 工作队列 线程轮询执行
public class Customer1 {public static void main(String[] args) throws IOException, TimeoutException {//获取连接对象Connection connection = RabbitUtils.getConnection();//获取连接的通道final Channel channel = connection.createChannel();//通道绑定对应消息队列//参数1:队列名称//参数2:队列是否要持久化//参数3:是否独占队列//参数4: 是否在消费完成之后自动删除队列//参数5:额外参数channel.queueDeclare("work",true,false,false,null );//不公平分发 能者多劳 默认0是轮询分发//channel.basicQos(1); //每次通道里只存放一个消息//预取值 为5channel.basicQos(1);//消费消息//参数1: 消费哪个队列的消息名称//参数2: 开始消息的自动确认直接 ---false 手动应答//参数3: 消费时的回调接口channel.basicConsume("work",false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(30000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("【消费者-2】=="+new String(body));//手动应答//【basicAck】---确认应答//【basicNack】----否定确认//【basicReject】---否定确认//参数1: 确认队列中那个具体消息//参数2: 是否开启多个消息同时确认 ---批量应答channel.basicAck(envelope.getDeliveryTag(),false);}});}
}
消费者_C2
public class Customer2 {public static void main(String[] args) throws IOException, TimeoutException {//获取连接对象Connection connection = RabbitUtils.getConnection();//获取连接的通道final Channel channel = connection.createChannel();//通道绑定对应消息队列//参数1:队列名称//参数2:队列是否要持久化//参数3:是否独占队列//参数4: 是否在消费完成之后自动删除队列//参数5:额外参数channel.queueDeclare("work", true, false, false, null);//不公平分发 能者多劳 默认0是轮询分发//channel.basicQos(1); //每次通道里只存放一个消息//预取值 为2channel.basicQos(1);//消费消息//参数1: 消费哪个队列的消息名称//参数2: 开始消息的自动确认直接//参数3: 消费时的回调接口channel.basicConsume("work", false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {SleepUtils.sleep(2);System.out.println("消费者-1===" + new String(body));//参数1: 确认队列中那个具体消息//参数2: 是否开启多个消息同时确认channel.basicAck(envelope.getDeliveryTag(), false);}});}}
生产者_P1
public class Provider {//生成消息@Testpublic void testSendMessage() throws IOException, TimeoutException {//获取连接对象Connection connection = RabbitUtils.getConnection();//获取连接的通道Channel channel = connection.createChannel();//通道绑定对应消息队列 ---队列持久化channel.queueDeclare("work",true,false,false,null );//发布消息for (int i = 1; i <= 7; i++) {//设置生产者发送消息为持久化消息(保存在磁盘上)channel.basicPublish("", "work", MessageProperties.PERSISTENT_TEXT_PLAIN, (i+"hello workqueuq").getBytes());}//关闭通道,连接RabbitUtils.ColseConnectionAndChanel(connection,channel );}
}


⑥. RabbitMQ 持久化
1. 队列实现持久化
- 之前我们创建的队列都是非持久化的,Rabbitmq如果重启的化,该队列就会被删除掉,如果要队列实现持久化需要在声明队列的时候把durable参数设置为持久化。

//通道绑定对应消息队列 ---队列持久化channel.queueDeclare("work",true,false,false,null );

2. 消息实现持久化
- 要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性。

- 将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。
⑦. 不公平分发

//不公平分发 能者多劳 默认0是轮询分发channel.basicQos(1); //每次通道里只存放一个消息


⑧. 预取值
- 能限制缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basicQos()方法设置"预取计数"值来完成的。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认
- 例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。




当两台消费者都正常运行测试


当两台消费者其中一台宕机测试


六. 第三种模型(fanout)

在广播模式下,消息发送流程
- 有多个消费者
- 每个消费者有自己的queue(队列)
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
- 交换机把消息发送给绑定过的所有队列
- 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
1. 开发生产者
package com.xizi.fanout;public class Provider {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();//将通道声明指定交换机//参数1: 交换机名称//参数2: 交换机类型 fanout 广播类型channel.exchangeDeclare("logs", "fanout");//发送消息channel.basicPublish("logs", "", null, "fanout type message".getBytes());RabbitUtils.ColseConnectionAndChanel(connection, channel);}
}
2. 消费者-1
package com.xizi.fanout;public class Customer1 {public static void main(String[] args) throws IOException {//获取连接Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();//通道绑定交换机channel.exchangeDeclare("logs", "fanout");//临时队列String queue = channel.queueDeclare().getQueue();//绑定交换机和队列channel.queueBind(queue, "logs", "");//消费消息channel.basicConsume(queue, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1: "+new String(body));}});}
}
3. 消费者-2
package com.xizi.fanout;public class Customer2 {public static void main(String[] args) throws IOException {//获取连接Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();//通道绑定交换机channel.exchangeDeclare("logs", "fanout");//临时队列String queue = channel.queueDeclare().getQueue();//绑定交换机和队列channel.queueBind(queue, "logs", "");//消费消息channel.basicConsume(queue, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2: "+new String(body));}});}
}
4. 测试


七. 第四种模型(Routing)
1. Routing 之订阅模型-Direct(直连)
- 在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
- 在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
- 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

- P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
- C1:消费者,其所在队列指定了需要routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
1. 生产者
package com.xizi.direct;public class Provider {public static void main(String[] args) throws IOException {//获取连接Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();//通道声明交换机//参数1:交换机名称//参数2: direct 路由模式channel.exchangeDeclare("logs_direct", "direct");//发送消息String routingkey="error";channel.basicPublish("logs_direct", routingkey, null, ("这是direct模型发布的基于route key:["+routingkey+"]发送消息").getBytes());RabbitUtils.ColseConnectionAndChanel(connection, channel);}
}
3. 开发消费者-1
package com.xizi.direct;public class Customer1 {public static void main(String[] args) throws IOException {//获取连接Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();//通道绑定交换机channel.exchangeDeclare("logs_direct", "direct");//临时队列String queue = channel.queueDeclare().getQueue();//绑定交换机和队列channel.queueBind(queue, "logs_direct", "error");//消费消息channel.basicConsume(queue, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1: "+new String(body));}});}
}
4. 开发消费者-2
package com.xizi.direct;public class Customer2 {public static void main(String[] args) throws IOException {//获取连接Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();//通道绑定交换机channel.exchangeDeclare("logs_direct", "direct");//临时队列String queue = channel.queueDeclare().getQueue();//绑定交换机和队列channel.queueBind(queue, "logs_direct", "error");channel.queueBind(queue, "logs_direct", "info");channel.queueBind(queue, "logs_direct", "warning");//消费消息channel.basicConsume(queue, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2: "+new String(body));}});}
}
5. 测试


2 .Routing 之订阅模型-Topic
- Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

//通配符* . 匹配不多不少恰好1个词# 匹配一个或多个词
1.生产者
package com.xizi.topic;public class Provider {public static void main(String[] args) throws IOException {//获取连接Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("topics", "topic");//发布消息String routekey="user";channel.basicPublish("topics", routekey, null, ("这里是动态路由模型,routekey:["+routekey+"]").getBytes());RabbitUtils.ColseConnectionAndChanel(connection, channel);}
}
2.消费者1
package com.xizi.topic;public class Customer1 {public static void main(String[] args) throws IOException {//获取连接Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();//通道绑定交换机channel.exchangeDeclare("topics", "topic");//临时队列String queue = channel.queueDeclare().getQueue();//绑定交换机和队列channel.queueBind(queue, "topics", "user.*");//消费消息channel.basicConsume(queue, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1: "+new String(body));}});}
}
3.消费者2
package com.xizi.topic;public class Customer2 {public static void main(String[] args) throws IOException {//获取连接Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();//通道绑定交换机channel.exchangeDeclare("topics", "topic");//临时队列String queue = channel.queueDeclare().getQueue();//绑定交换机和队列channel.queueBind(queue, "topics", "user.#");//消费消息channel.basicConsume(queue, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2: "+new String(body));}});}
}
4.测试

* 匹配不多不少恰好1个词

# 匹配一个或多个词

八. GitHub传送门
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
