RabbitMQ使用死信队列解决订单超时
1.什么是死信队列(DLX)?
DLX,Dead Letter Exchange 的缩写,又死信邮箱、死信交换机。DLX就是一个普通的交换机,和一般的交换机没有任何区别。
当消息在一个队列中变成死信(dead message)时,通过这个交换机将死信发送到死信队列中
死信的条件:
- 消息被否定确认,使用
channel.basicNack或channel.basicReject,并且此时requeue属性被设置为false。 - 消息在队列的存活时间超过设置的TTL时间。
- 消息队列的消息数量已经超过最大队列长度。
2.使用死信队列来解决订单超时未支付
我的环境是springboot2.3.0
首先明确一下大致思路:
- 用户下单后,发送消息(带有TTL过期时间)到一个普通队列,然后这个普通队列设置了死信交换机和路由
- 到达过期时间后,消息成为死信,由死信交换机转发到死信队列
- 消费者监听死信队列,处理过期订单
大概的模型如下:

2.1配置队列和交换机
我的配置文件信息:
#死信消息模型dead:exchange: seckill.dead.exchangerouting-key: seckill.dead.routingKeynormal-queue: seckill.dead.normal.queuenormal-exchange: seckill.dead.normal.exchangenormal-routing-key: seckill.dead.normal.routingKeyreal-dead-queue: seckill.dead.queue#设置过期时间expire: 1800000
先配置指定了死信交换机和死信路由的队列,这里容易搞混,这里的指定死信交换机和路由,并不是绑定,只是指定了在这个队列里成为死信的消息会发送到指定的死信交换机里,然后死信交换机会分发给真正的用于存储死信的队列
/** 普通的队列,但是指定了死信交换机和路由,在这个队列中的消息过期后会由死信交换机分发到真正的死信队列 */@Beanpublic Queue normalQueue(){Map<String, Object> args = new HashMap<>();//设置死信参数//指定死信交换机args.put("x-dead-letter-exchange",myRabbitProperties.getDead().getExchange());//指定死信路由args.put("x-dead-letter-routing-key",myRabbitProperties.getDead().getRoutingKey());return new Queue(myRabbitProperties.getDead().getNormalQueue(),true,false,false,args);}
配置普通交换机,并绑定普通交换机和普通队列
/** 创建普通交换机 */@Beanpublic Exchange normalExchange(){return new TopicExchange(myRabbitProperties.getDead().getNormalExchange(),true,false);}/** 绑定普通队列和普通交换机 */@Beanpublic Binding normalBind(){return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(myRabbitProperties.getDead().getNormalRoutingKey()).noargs();}
配置死信交换机和真实用来存储死信的队列,并绑定真实存储死信的队列和死信交换机
/** 创建死信交换机 */@Beanpublic Exchange deadExchange(){return new TopicExchange(myRabbitProperties.getDead().getExchange(),true,false);}/** 创建真实存储死信的队列,当死信队列中消息过期后,转发到此队列,真实存储死信队列需要绑定死信交换机和路由 */@Beanpublic Queue realDeadQueue(){return new Queue(myRabbitProperties.getDead().getRealDeadQueue(),true,false,false);}/** 绑定真实存储死信的队列与死信交换机 */@Beanpublic Binding realBindDead(){return BindingBuilder.bind(realDeadQueue()).to(deadExchange()).with(myRabbitProperties.getDead().getRoutingKey()).noargs();}
2.2 在服务中写具体发送业务,也就是生产者
public void sendDeadMsg(String orderId) {log.info("开始发送死信消息!");try {SecOrderAndUserInfo orderAndUserInfo = orderMapper.selectOrderAndUserByOrderId(orderId);if (orderAndUserInfo!=null) {rabbitTemplate.convertAndSend(myRabbitProperties.getDead().getNormalExchange(), myRabbitProperties.getDead().getNormalRoutingKey(), orderAndUserInfo, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {MessageProperties messageProperties = message.getMessageProperties();//设置过期时间TTLmessageProperties.setExpiration(String.valueOf(myRabbitProperties.getExpire()));//设置持久化messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}});}} catch (Exception e) {log.error("死信消息发送异常,消息为:{}", orderId);}}
2.3 消费者监听存储死信的队列
/*** 监听死信队列* @param orderAndUserInfo 收到的消息*/@RabbitListener(queues = "${mq.myrabbit.dead.real-dead-queue}",containerFactory = "singleListenerContainer")public void receiveDeadMsg(SecOrderAndUserInfo orderAndUserInfo){try {log.info("死信队列接收到消息:{}",orderAndUserInfo);if (orderAndUserInfo!=null){SecOrder order = orderMapper.selectOrderByOrderId(orderAndUserInfo.getOrderId());//订单存在,并且订单状态处于未处理状态if (order!=null&&orderAndUserInfo.getState()== MyproCostant.ORDER_NO_OPERATION){//将订单状态设置为已过期order.setState(MyproCostant.ORDER_EXPIRED);orderMapper.updateOrder(order);}}}catch (Exception e){log.error("死信队列接收消息异常!");e.printStackTrace();}}
3.进行测试
设置过期时间TTL=10秒,10秒后成功监听到了死信消息

本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
