Debug方式讲解Rabbitmq的自动ACK和手动ACK
文章首发于我的个人博客,到个人博客体验更佳阅读哦
https://www.itqiankun.com/article/1564534513
介绍Rabbitmq的手动ACK和自动ACK
当消息一旦被消费者接收,队列中的消息就会被删除。那么问题来了:RabbitMQ怎么知道消息被接收了呢?
这就要通过消息确认机制(Acknowlege)来实现了。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:
- 自动ACK:消息一旦被接收,消费者自动发送ACK
- 手动ACK:消息接收后,不会发送ACK,需要手动调用
这两ACK要怎么选择呢?这需要看消息的重要性:
- 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
- 如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
自动ACK
自动ACK的演示流程
Pom.xml代码如下
4.0.0 cn.itcast.rabbitmq itcast-rabbitmq 0.0.1-SNAPSHOT org.springframework.boot spring-boot-starter-parent 2.0.4.RELEASE 1.8 org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-test
首先工具类代码如下
package cn.itcast.rabbitmq.util;import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;public class ConnectionUtil {/*** 建立与RabbitMQ的连接* @return* @throws Exception*/public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址,设置自己的服务器密码factory.setHost("******");//端口factory.setPort(5672);//设置账号信息,用户名、密码、vhostfactory.setVirtualHost("/***");factory.setUsername("***");factory.setPassword("***");// 通过工程获取连接Connection connection = factory.newConnection();return connection;}}
然后是生产消息方代码
下面的代码我在debug之后,当下面的蓝色代码执行完毕之后,在rabbitmq里面的Connections里面就有下面的展示,这就表示发送方的链接和rabbitmq已经连上了
然后当执行完下面的红色代码的时候,在rabbitmq里面的Channels里面就有下面的展示,这就表示发送方的通道创建好了
然后下面的黄色代码执行完毕之后,在rabbitmq的队列里面就有Hello World!这个消息了,此时我们可以看到这个消息还没有被消费
然后我们点击上面的红框里面的东西,然后在点击下面的东西,就可以看到我们刚刚发送的消息
因为有下面的绿色代码,所以发送方在执行之后,rabbitmq里面的Connections的发送链接就关闭了,rabbitmq里面的Channels的通道链接就关闭了,所以此时rabbitmq里面的Connections是下面这样
此时rabbitmq里面的Channels是下面这样
package cn.itcast.rabbitmq.simple;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
/*** 生产者*/
public class Send {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {//蓝色代码注释开始// 获取到连接Connection connection = ConnectionUtil.getConnection();//蓝色代码注释结束//红色代码注释开始// 从连接中创建通道,使用通道才能完成消息相关的操作Channel channel = connection.createChannel();//红色代码注释结束// 声明(创建)队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//黄色代码注释开始// 消息内容String message = "Hello World!";// 向指定的队列中发送消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");//黄色代码注释结束//绿色代码注释开始//关闭通道和连接channel.close();connection.close();//绿色代码注释结束}
}
然后是接受方接受消息的代码
Pom.xml代码如下
4.0.0 cn.itcast.rabbitmq itcast-rabbitmq 0.0.1-SNAPSHOT org.springframework.boot spring-boot-starter-parent 2.0.4.RELEASE 1.8 org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-test
下面的当下面的蓝色代码执行完毕之后,在rabbitmq里面的Connections里面就有下面的展示,这就表示接受方的链接和rabbitmq已经连上了
然后当执行完下面的红色代码的时候,在rabbitma里面的Channels里面就有下面的展示,这就表示接受方的通道创建好了
下面的绿色代码表示创建一个消费对象,这个消费者并且有一个监听事件,如果有消息的时候,会被自动调用,下面的黄色代码表示把队列和消费者绑定到一起,当执行完毕之后,队列里面的消息就会没有了
此时要注意当接受消息之后,rabbitmq里面的queues里面的消息就会被删除了
注意因为下面的接受方没有连接和通道的关闭,所以此时的接收方的Connections连接和channels通道连接还是一直没有关闭的
你可以看到这里的程序一直没有停
然后接收方的Connections连接也没有消失
然后此时Idea接收方的channels连接也没有消失,
然后我们手动关闭这个接受方的程序,就是点击这个红色按钮
然后rabbitmq里面的Connections的发送链接就关闭了,rabbitmq里面的Channels的通道链接就关闭了,所以此时rabbitmq里面的Channels是下面这样
package cn.itcast.rabbitmq.simple;import java.io.IOException;import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import cn.itcast.rabbitmq.util.ConnectionUtil;/*** 消费者*/
public class Recv {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 蓝色代码开始// 获取到连接Connection connection = ConnectionUtil.getConnection();// 蓝色代码结束// 红色代码开始// 创建通道Channel channel = connection.createChannel();// 红色代码结束// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绿色代码开始// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [x] received : " + msg + "!");}};// 绿色代码结束// 黄色代码开始// 监听队列,第二个参数:是否自动进行消息确认。channel.basicConsume(QUEUE_NAME, true, consumer);// 黄色代码结束}
}
自动ACK的缺点
工具类代码如下所示
package cn.itcast.rabbitmq.util;import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;public class ConnectionUtil {/*** 建立与RabbitMQ的连接* @return* @throws Exception*/public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址factory.setHost("47.91.248.236");//端口factory.setPort(5672);//设置账号信息,用户名、密码、vhostfactory.setVirtualHost("/leyou");factory.setUsername("leyou");factory.setPassword("leyou");// 通过工程获取连接Connection connection = factory.newConnection();return connection;}}
比如下面的代码
首先pom.xml代码如下所示
4.0.0 cn.itcast.rabbitmq itcast-rabbitmq 0.0.1-SNAPSHOT org.springframework.boot spring-boot-starter-parent 2.0.4.RELEASE 1.8 org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-test
然后发送方的代码如下所示
package cn.itcast.rabbitmq.simple;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
/*** 生产者*/
public class Send {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 从连接中创建通道,使用通道才能完成消息相关的操作Channel channel = connection.createChannel();// 声明(创建)队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message = "Hello World!";// 向指定的队列中发送消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");//关闭通道和连接channel.close();connection.close();}
}
然后执行发送方的代码,然后消息队列里面就会有这个消息了
然后接受方代码如下所示,注意下面的绿色代码抛出异常了,然后此时我们执行接收方的代码,下面的蓝色代码表示自动进行ack
package cn.itcast.rabbitmq.simple;import java.io.IOException;import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import cn.itcast.rabbitmq.util.ConnectionUtil;/*** 消费者*/
public class Recv {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 创建通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);//绿色代码开始int i=1/0;//绿色代码结束//红色代码开始System.out.println(" [x] received : " + msg + "!");//红色代码结束}};//绿色代码开始// 监听队列,第二个参数:是否自动进行消息确认。channel.basicConsume(QUEUE_NAME, true, consumer);//绿色代码结束}
}
执行接收方代码之后,结果如下所示,此时可以看到根本没有执行上面的红色代码,也就是说此时不算接受成功(此时如果红色代码是重要的逻辑代码,那么在实际开发里面不就有问题了)
但是此时可以看到rabbitmq里面的消息队列里面的消息已经没有了,这就是自动ack的缺点
手动ack解决自动ack的缺点
工具类代码如下所示
package cn.itcast.rabbitmq.util;import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;public class ConnectionUtil {/*** 建立与RabbitMQ的连接* @return* @throws Exception*/public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址factory.setHost("47.91.248.236");//端口factory.setPort(5672);//设置账号信息,用户名、密码、vhostfactory.setVirtualHost("/leyou");factory.setUsername("leyou");factory.setPassword("leyou");// 通过工程获取连接Connection connection = factory.newConnection();return connection;}}
比如下面的代码
首先pom.xml代码如下所示
4.0.0 cn.itcast.rabbitmq itcast-rabbitmq 0.0.1-SNAPSHOT org.springframework.boot spring-boot-starter-parent 2.0.4.RELEASE 1.8 org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-test
然后发送方的代码如下所示
package cn.itcast.rabbitmq.simple;import cn.itcast.rabbitmq.util.ConnectionUtil;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
/*** 生产者*/
public class Send {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 从连接中创建通道,使用通道才能完成消息相关的操作Channel channel = connection.createChannel();// 声明(创建)队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message = "Hello World!";// 向指定的队列中发送消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");//关闭通道和连接channel.close();connection.close();}
}
然后执行发送方的代码,然后消息队列里面就会有这个消息了
然后接受方代码如下所示,注意下面的绿色代码抛出异常了,然后此时我们执行接收方的代码,此时红色代码就表示手动进行ack
package cn.itcast.rabbitmq.simple;import java.io.IOException;import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import cn.itcast.rabbitmq.util.ConnectionUtil;/*** 消费者,手动进行ACK*/
public class Recv2 {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 创建通道final Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);//绿色代码开始int i=1/0;//绿色代码结束//红色代码开始System.out.println(" [x] received : " + msg + "!");// 手动进行ACKchannel.basicAck(envelope.getDeliveryTag(), false);//红色代码结束}};//红色代码开始// 监听队列,第二个参数false,手动进行ACKchannel.basicConsume(QUEUE_NAME, false, consumer);//红色代码开始}
}
执行接收方代码之后,结果如下所示,此时可以看到根本没有执行上面的红色代码,也就是说此时不算接受成功
但是此时可以看到rabbitmq里面的消息队列里面的消息还是有的,这就是手动ack解决了自动ack的缺点
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
