RabbitMQ 高级特性 Consumer Ack
RabbitMQ 高级特性 Consumer Ack
- ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
- 三种确认方式:
- acknowledge=“none”
- acknowledge=“manual”
- 根据异常情况确认:acknowledge=“auto”,(使用麻烦,不演示)
- 自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
- 手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
- 消费者步骤:
-
创建工程

-
pom.xml导包
<dependencies><dependency><groupId>org.springframeworkgroupId><artifactId>spring-contextartifactId><version>5.1.7.RELEASEversion>dependency><dependency><groupId>org.springframework.amqpgroupId><artifactId>spring-rabbitartifactId><version>2.1.8.RELEASEversion>dependency><dependency><groupId>junitgroupId><artifactId>junitartifactId><version>4.12version>dependency><dependency><groupId>org.springframeworkgroupId><artifactId>spring-testartifactId><version>5.1.7.RELEASEversion>dependency>dependencies><build><plugins><plugin><groupId>org.apache.maven.pluginsgroupId><artifactId>maven-compiler-pluginartifactId><version>3.8.0version><configuration><source>1.8source><target>1.8target>configuration>plugin>plugins>build> -
resources目录下创建rabbitmq.properties文件
rabbitmq.host=192.168.20.146 rabbitmq.port=5672 rabbitmq.username=test rabbitmq.password=test rabbitmq.virtual-host=/test -
resources目录下创建spring-rabbitmq-producer.xml文件
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xmlns:rabbitmq="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><context:property-placeholder location="classpath:/rabbitmq.properties"/><rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.virtual-host}"publisher-confirms="true"publisher-returns="true"/><context:component-scan base-package="com.yang.listener"/>beans> -
创建一个类实现
MessageListener接口

-
spring-rabbitmq-producer.xml文件配置监听容器(
ref是监听的类,queue-names是监听的队列RabbitMQ 高级特性 - 消息的可靠性投递中的队列)

<rabbitmq:listener-container connection-factory="connectionFactory"><rabbit:listener ref="ackListener" queue-names="test_queue_confirm"/>rabbitmq:listener-container> -
AckListener类的内容
package com.yang.listener;import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Component;@Component public class AckListener implements MessageListener {@Overridepublic void onMessage(Message message) {System.out.println(new String(message.getBody()));} } -
在测试类写个死循环,看看能不能监听到
package com.yang.test;import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml") public class ConsumerTest {@Testpublic void test() {while(true) {}}} -
运行结果

-
- Ack手动签收(
刚刚其实就是自动签收(默认))- 步骤:
-
在spring-rabbitmq-producer.xml文件定义监听容器中配置手动签收
acknowledge="manual"<rabbitmq:listener-container connection-factory="connectionFactory" acknowledge="manual"><rabbit:listener ref="ackListener" queue-names="test_queue_confirm"/>rabbitmq:listener-container>
-
-
监听器实现ChannelAwareMessageListener
package com.yang.listener;import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component;import java.io.IOException;/*** Consumer ACK机制:* 1. 设置手动签收.acknowledge="manual"* 2. 让监听器实现ChannelAwareMessageListener* 3. 如果消息处理成功, 则调用channel的basicAck()签收* 4. 如果消息处理失败, 则调用channel的basicNack()拒绝签收.broker重新发送给consumer*/ @Component public class AckListener implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1. 接受消息System.out.println(new String(message.getBody()));//2. 处理业务逻辑 打印一句话代表System.out.println("处理业务逻辑");//3. 手动签收/*basicAck(long deliveryTag, boolean multiple)参数说明:deliveryTag: tag标签multiple: true允许多条消息被签收*/channel.basicAck(deliveryTag,true);} catch (Exception e) {//4. 拒绝签收/*basicNack(long deliveryTag, boolean multiple, boolean requeue)参数说明:deliveryTag: tag标签multiple: true允许多条消息被签收requeue: 重回队列.如果为true.则重新回到queue,broker会重新发送消息给消费端*/channel.basicNack(deliveryTag,true,true);}} } -
可以假装业务逻辑错误
//2. 处理业务逻辑 打印一句话代表 System.out.println("处理业务逻辑"); int i = 3/0;
- 步骤:
- 小结
- 在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
- 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
- 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
消息可靠性总结
- 持久化
- exchange要持久化
- queue要持久化
- message要持久化
- 生产方确认Confirm
- 消费方确认Ack
- Broker高可用
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
