RabbitMQ异常处理方案设计

 

导语:根据业务给MQ异常处理设置优先级:如低、中等、紧急,当MQ发生异常时通过告警邮件通知和记录到数据库中,对于低和中等的异常采用定时任务轮询去重新投递,紧急的异常例如订单支付等则需要开发者尽快去手动处理最佳。对于MQ中发生的异常有以下三种: confirm异常、returnCallBack异常、队列监听消费异常,在此次实际项目中有监控模块(死信队列的监控,根据业务类型发送告警邮件;是否将异常写入数据库等待定时任务重新投递)和定时任务(创建重新投递任务,告知监控模块投递触发时机)模块协同处理,以下只是目前所想的方案,其实有着更多的可靠的方案吧,您怎么看?

confirm异常

  • confirm确认消息是否投递到MQ服务器。处理逻辑:当未投递成功时,发送到告警队列,发送报警邮件。

returnCallBack异常

  • 消息投递到队列发生异常时回调(当消息在交换机根据routeKey找不到投递的队列时发生异常),处理逻辑:发送报警邮件,异常写入数据库; sendErrorMessage()方法中,消息内容message优先级设置为低,根据业务需求设置

队列监听消费异常

  • 队列初始化配置死信队列
  • 消费的确认机制为手动确认,队列监听消费处理业务代码块做try/catch处理,发生异常nack,拒绝重新入队
  • 消费监听发生异常投递重新投递死信队列
  • 业务队列发生异常设置处理优先级为中等,订单支付设置为紧急,根据业务需求设置,处理逻辑:发送报警邮件,异常写入数据库。
  • MQ发送消息时需要给消息设置请求头属性,包含当前投递exchange、routeKey和异常处理优先级

定时任务

  • 通过定时任务轮询表status 1-异常 和优先级为(0-低 1-中等)的数据,重新投递到原始队列 (在原始监听队列 消费成功逻辑中补充,将消息标志id的status设置为2-已解决);
  • 重新投递需要知道消息绑定的原始exchange和routeKey, rabbitListenerErrorHandler回调时取不到相关信息,因此在发送消息时在请求头携带;根据业务在MQ发送消息时指定消息的优先级(0-低 1-中等 2-紧急);   
  • status1-异常和优先级为紧急(订单支付等业务)的异常数据手动处理;
  • 重新投递给消息加上消息头属性设置是否重新投递标志,用于MQ监听消费后将数据库该消息ID的修复状态设置为2-已解决

部分实现

  • 通发送消息时header设置当前exchenge等属性:
     CorrelationData correlationData = new CorrelationData(messageId);rabbitTemplate.convertAndSend(exchange, routeKey, MessageHelper.objToMsg(message.getContent()), message1 -> {message1.getMessageProperties().setHeader(MqConstants.MessageHeadProperties.HEAD_EXCHANGE, exchange);message1.getMessageProperties().setHeader(MqConstants.MessageHeadProperties.HEAD_ROUTE, routeKey);message1.getMessageProperties().setPriority(priority.intValue());message1.getMessageProperties().setHeader(MqConstants.MessageHeadProperties.IS_RETRY_DELIVER, true);return message1;}, correlationData);

     

  • 监听消费确认:
        @RabbitListener(queues = MqConstants.QaInfo.QA_INFO_TIMEOUT_QUEUE_NAME)public void consume(Message message, Channel channel,@Header(value = MqConstants.MQ_HEADER_CORRELATION_ID_KEY, required = false) String correlationId) {if (this.lockToConsume(correlationId)) {Boolean flag = true;try {consumer.consume(message);} catch (Exception e) {flag = false;e.printStackTrace();} finally {if (flag) {this.consumeConfirm(channel, correlationId, message.getMessageProperties().getDeliveryTag());String status = message.getMessageProperties().getHeader(MqConstants.MessageHeadProperties.IS_RETRY_DELIVER);if (StringUtils.isNotBlank(status) && Boolean.parseBoolean(status)) {//将数据库记录设置为已解决,此处需要确保代码处理无异常,省略,}} else {this.nack(channel, correlationId, message.getMessageProperties().getDeliveryTag());}}} else {log.warn("[MQ]消息重复消费, ud: {}", correlationId);this.nack(channel, correlationId, message.getMessageProperties().getDeliveryTag());}}

 

 


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部