RabbitMQ 高级特性 Consumer Ack

RabbitMQ 高级特性 Consumer Ack

  • ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
  • 三种确认方式:
    1. acknowledge=“none”
    2. acknowledge=“manual”
    3. 根据异常情况确认:acknowledge=“auto”,(使用麻烦,不演示)
  • 自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
  • 手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

  • 消费者步骤:
    1. 创建工程
      在这里插入图片描述

    2. pom.xml导包

       <dependencies><dependency><groupId>org.springframeworkgroupId><artifactId>spring-contextartifactId><version>5.1.7.RELEASEversion>dependency><dependency><groupId>org.springframework.amqpgroupId><artifactId>spring-rabbitartifactId><version>2.1.8.RELEASEversion>dependency><dependency><groupId>junitgroupId><artifactId>junitartifactId><version>4.12version>dependency><dependency><groupId>org.springframeworkgroupId><artifactId>spring-testartifactId><version>5.1.7.RELEASEversion>dependency>dependencies><build><plugins><plugin><groupId>org.apache.maven.pluginsgroupId><artifactId>maven-compiler-pluginartifactId><version>3.8.0version><configuration><source>1.8source><target>1.8target>configuration>plugin>plugins>build>
    3. resources目录下创建rabbitmq.properties文件

      rabbitmq.host=192.168.20.146
      rabbitmq.port=5672
      rabbitmq.username=test
      rabbitmq.password=test
      rabbitmq.virtual-host=/test
      
    4. resources目录下创建spring-rabbitmq-producer.xml文件

      
      <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xmlns:rabbitmq="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><context:property-placeholder location="classpath:/rabbitmq.properties"/><rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.virtual-host}"publisher-confirms="true"publisher-returns="true"/><context:component-scan base-package="com.yang.listener"/>beans>
      
    5. 创建一个类实现MessageListener接口
      在这里插入图片描述

    6. spring-rabbitmq-producer.xml文件配置监听容器(ref是监听的类,queue-names是监听的队列 RabbitMQ 高级特性 - 消息的可靠性投递中的队列)
      在这里插入图片描述

      <rabbitmq:listener-container connection-factory="connectionFactory"><rabbit:listener ref="ackListener" queue-names="test_queue_confirm"/>rabbitmq:listener-container>
      
    7. AckListener类的内容

      package com.yang.listener;import org.springframework.amqp.core.Message;
      import org.springframework.amqp.core.MessageListener;
      import org.springframework.stereotype.Component;@Component
      public class AckListener implements MessageListener {@Overridepublic void onMessage(Message message) {System.out.println(new String(message.getBody()));}
      }
    8. 在测试类写个死循环,看看能不能监听到

      package com.yang.test;import org.junit.Test;
      import org.junit.runner.RunWith;
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.test.context.ContextConfiguration;
      import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class)
      @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
      public class ConsumerTest {@Testpublic void test() {while(true) {}}}
    9. 运行结果
      在这里插入图片描述


  • Ack手动签收(刚刚其实就是自动签收(默认))
    • 步骤:
      1. 在spring-rabbitmq-producer.xml文件定义监听容器中配置手动签收acknowledge="manual"

        <rabbitmq:listener-container connection-factory="connectionFactory" acknowledge="manual"><rabbit:listener ref="ackListener" queue-names="test_queue_confirm"/>rabbitmq:listener-container>
        
    1. 监听器实现ChannelAwareMessageListener

      package com.yang.listener;import com.rabbitmq.client.Channel;
      import org.springframework.amqp.core.Message;
      import org.springframework.amqp.core.MessageListener;
      import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
      import org.springframework.stereotype.Component;import java.io.IOException;/*** Consumer ACK机制:*  1. 设置手动签收.acknowledge="manual"*  2. 让监听器实现ChannelAwareMessageListener*  3. 如果消息处理成功, 则调用channel的basicAck()签收*  4. 如果消息处理失败, 则调用channel的basicNack()拒绝签收.broker重新发送给consumer*/
      @Component
      public class AckListener implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1. 接受消息System.out.println(new String(message.getBody()));//2. 处理业务逻辑 打印一句话代表System.out.println("处理业务逻辑");//3. 手动签收/*basicAck(long deliveryTag, boolean multiple)参数说明:deliveryTag: tag标签multiple: true允许多条消息被签收*/channel.basicAck(deliveryTag,true);} catch (Exception e) {//4. 拒绝签收/*basicNack(long deliveryTag, boolean multiple, boolean requeue)参数说明:deliveryTag:  tag标签multiple:     true允许多条消息被签收requeue:      重回队列.如果为true.则重新回到queue,broker会重新发送消息给消费端*/channel.basicNack(deliveryTag,true,true);}}
      }
    2. 可以假装业务逻辑错误

      //2. 处理业务逻辑 打印一句话代表
      System.out.println("处理业务逻辑");
      int i = 3/0;
      

  • 小结
    • 在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
    • 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
    • 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息

消息可靠性总结

  1. 持久化
    • exchange要持久化
    • queue要持久化
    • message要持久化
  2. 生产方确认Confirm
  3. 消费方确认Ack
  4. Broker高可用


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部