文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

消息消费失败如何处理?

2024-12-03 10:01

关注

本文转载自微信公众号「Java极客技术」,作者鸭血粉丝。转载本文请联系Java极客技术公众号。   

一、介绍

在介绍消息中间件 MQ 之前,我们先来简单的了解一下,为何要引用消息中间件。

例如,在电商平台中,常见的用户下单,会经历以下几个流程。

当用户下单时,创建完订单之后,会调用第三方支付平台,对用户的账户金额进行扣款,如果平台支付扣款成功,会将结果通知到对应的业务系统,接着业务系统会更新订单状态,同时调用仓库接口,进行减库存,通知物流进行发货!

 

试想一下,从订单状态更新、到扣减库存、通知物流发货都在一个方法内同步完成,假如用户支付成功、订单状态更新也成功,但是在扣减库存或者通知物流发货步骤失败了,那么就会造成一个问题,用户已经支付成功了,只是在仓库扣减库存方面失败,从而导致整个交易失败!

一单失败,老板可以假装看不见,但是如果上千个单子都因此失败,那么因系统造成的业务损失,将是巨大的,老板可能坐不住了!

因此,针对这种业务场景,架构师们引入了异步通信技术方案,从而保证服务的高可用,大体流程如下:

 

当订单系统收到支付平台发送的扣款结果之后,会将订单消息发送到 MQ 消息中间件,同时也会更新订单状态。

在另一端,由仓库系统来异步监听订单系统发送的消息,当收到订单消息之后,再操作扣减库存、通知物流公司发货等服务!

在优化后的流程下,即使扣减库存服务失败,也不会影响用户交易。

正如《人月神话》中所说的,软件工程,没有银弹!

当引入了 MQ 消息中间件之后,同样也会带来另一个问题,假如 MQ 消息中间件突然宕机了,导致消息无法发送出去,那仓库系统就无法接受到订单消息,进而也无法发货!

针对这个问题,业界主流的解决办法是采用集群部署,一主多从模式,从而实现服务的高可用,即使一台机器突然宕机了,也依然能保证服务可用,在服务器故障期间,通过运维手段,将服务重新启动,之后服务依然能正常运行!

但是还有另一个问题,假如仓库系统已经收到订单消息了,但是业务处理异常,或者服务器异常,导致当前商品库存并没有扣减,也没有发货!

这个时候又改如何处理呢?

今天我们所要介绍的正是这种场景,假如消息消费失败,我们应该如何处理?

二、解决方案

针对消息消费失败的场景,我们一般会通过如下方式进行处理:

 

当消息在客户端消费失败时,我们会将异常的消息加入到一个消息重试对象中,同时设置最大重试次数,并将消息重新推送到 MQ 消息中间件里,当重试次数超过最大值时,会将异常的消息存储到 MongoDB数据库中,方便后续查询异常的信息。

基于以上系统模型,我们可以编写一个公共重试组件,话不多说,直接干!

三、代码实践

本次补偿服务采用 rabbitmq 消息中间件进行处理,其他消息中间件处理思路也类似!

3.1、创建一个消息重试实体类

  1. @Data 
  2. @EqualsAndHashCode(callSuper = false
  3. @Accessors(chain = true
  4. public class MessageRetryDTO implements Serializable { 
  5.  
  6.     private static final long serialVersionUID = 1L; 
  7.  
  8.      
  9.     private String bodyMsg; 
  10.  
  11.      
  12.     private String sourceId; 
  13.  
  14.      
  15.     private String sourceDesc; 
  16.  
  17.      
  18.     private String exchangeName; 
  19.  
  20.      
  21.     private String routingKey; 
  22.  
  23.      
  24.     private String queueName; 
  25.  
  26.      
  27.     private Integer status = 1; 
  28.  
  29.      
  30.     private Integer maxTryCount = 3; 
  31.  
  32.      
  33.     private Integer currentRetryCount = 0; 
  34.  
  35.      
  36.     private Long retryIntervalTime = 0L; 
  37.  
  38.      
  39.     private String errorMsg; 
  40.  
  41.      
  42.     private Date createTime; 
  43.  
  44.     @Override 
  45.     public String toString() { 
  46.         return "MessageRetryDTO{" + 
  47.                 "bodyMsg='" + bodyMsg + '\'' + 
  48.                 ", sourceId='" + sourceId + '\'' + 
  49.                 ", sourceDesc='" + sourceDesc + '\'' + 
  50.                 ", exchangeName='" + exchangeName + '\'' + 
  51.                 ", routingKey='" + routingKey + '\'' + 
  52.                 ", queueName='" + queueName + '\'' + 
  53.                 ", status=" + status + 
  54.                 ", maxTryCount=" + maxTryCount + 
  55.                 ", currentRetryCount=" + currentRetryCount + 
  56.                 ", retryIntervalTime=" + retryIntervalTime + 
  57.                 ", errorMsg='" + errorMsg + '\'' + 
  58.                 ", createTime=" + createTime + 
  59.                 '}'
  60.     } 
  61.  
  62.      
  63.     public boolean checkRetryCount() { 
  64.         retryCountCalculate(); 
  65.         //检查重试次数是否超过最大值 
  66.         if (this.currentRetryCount < this.maxTryCount) { 
  67.             return true
  68.         } 
  69.         return false
  70.     } 
  71.  
  72.      
  73.     private void retryCountCalculate() { 
  74.         this.currentRetryCount = this.currentRetryCount + 1; 
  75.     } 
  76.  

3.2、编写服务重试抽象类

  1. public abstract class CommonMessageRetryService { 
  2.  
  3.     private static final Logger log = LoggerFactory.getLogger(CommonMessageRetryService.class); 
  4.  
  5.     @Autowired 
  6.     private RabbitTemplate rabbitTemplate; 
  7.  
  8.     @Autowired 
  9.     private MongoTemplate mongoTemplate; 
  10.  
  11.  
  12.      
  13.     public void initMessage(Message message) { 
  14.         log.info("{} 收到消息: {},业务数据:{}", this.getClass().getName(), message.toString(), new String(message.getBody())); 
  15.         try { 
  16.             //封装消息 
  17.             MessageRetryDTO messageRetryDto = buildMessageRetryInfo(message); 
  18.             if (log.isInfoEnabled()) { 
  19.                 log.info("反序列化消息:{}", messageRetryDto.toString()); 
  20.             } 
  21.             prepareAction(messageRetryDto); 
  22.         } catch (Exception e) { 
  23.             log.warn("处理消息异常,错误信息:", e); 
  24.         } 
  25.     } 
  26.  
  27.      
  28.     protected void prepareAction(MessageRetryDTO retryDto) { 
  29.         try { 
  30.             execute(retryDto); 
  31.             doSuccessCallBack(retryDto); 
  32.         } catch (Exception e) { 
  33.             log.error("当前任务执行异常,业务数据:" + retryDto.toString(), e); 
  34.             //执行失败,计算是否还需要继续重试 
  35.             if (retryDto.checkRetryCount()) { 
  36.                 if (log.isInfoEnabled()) { 
  37.                     log.info("重试消息:{}", retryDto.toString()); 
  38.                 } 
  39.                 retrySend(retryDto); 
  40.             } else { 
  41.                 if (log.isWarnEnabled()) { 
  42.                     log.warn("当前任务重试次数已经到达最大次数,业务数据:" + retryDto.toString(), e); 
  43.                 } 
  44.                 doFailCallBack(retryDto.setErrorMsg(e.getMessage())); 
  45.             } 
  46.         } 
  47.     } 
  48.  
  49.      
  50.     private void doSuccessCallBack(MessageRetryDTO messageRetryDto) { 
  51.         try { 
  52.             successCallback(messageRetryDto); 
  53.         } catch (Exception e) { 
  54.             log.warn("执行成功回调异常,队列描述:{},错误原因:{}", messageRetryDto.getSourceDesc(), e.getMessage()); 
  55.         } 
  56.     } 
  57.  
  58.      
  59.     private void doFailCallBack(MessageRetryDTO messageRetryDto) { 
  60.         try { 
  61.             saveMessageRetryInfo(messageRetryDto.setErrorMsg(messageRetryDto.getErrorMsg())); 
  62.             failCallback(messageRetryDto); 
  63.         } catch (Exception e) { 
  64.             log.warn("执行失败回调异常,队列描述:{},错误原因:{}", messageRetryDto.getSourceDesc(), e.getMessage()); 
  65.         } 
  66.     } 
  67.  
  68.      
  69.     protected abstract void execute(MessageRetryDTO messageRetryDto); 
  70.  
  71.      
  72.     protected abstract void successCallback(MessageRetryDTO messageRetryDto); 
  73.  
  74.      
  75.     protected abstract void failCallback(MessageRetryDTO messageRetryDto); 
  76.  
  77.      
  78.     private MessageRetryDTO buildMessageRetryInfo(Message message){ 
  79.         //如果头部包含补偿消息实体,直接返回 
  80.         Map messageHeaders = message.getMessageProperties().getHeaders(); 
  81.         if(messageHeaders.containsKey("message_retry_info")){ 
  82.             Object retryMsg = messageHeaders.get("message_retry_info"); 
  83.             if(Objects.nonNull(retryMsg)){ 
  84.                 return JSONObject.parseObject(String.valueOf(retryMsg), MessageRetryDTO.class); 
  85.             } 
  86.         } 
  87.         //自动将业务消息加入补偿实体 
  88.         MessageRetryDTO messageRetryDto = new MessageRetryDTO(); 
  89.         messageRetryDto.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8)); 
  90.         messageRetryDto.setExchangeName(message.getMessageProperties().getReceivedExchange()); 
  91.         messageRetryDto.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey()); 
  92.         messageRetryDto.setQueueName(message.getMessageProperties().getConsumerQueue()); 
  93.         messageRetryDto.setCreateTime(new Date()); 
  94.         return messageRetryDto; 
  95.     } 
  96.  
  97.      
  98.     private void retrySend(MessageRetryDTO retryDto){ 
  99.         //将补偿消息实体放入头部,原始消息内容保持不变 
  100.         MessageProperties messageProperties = new MessageProperties(); 
  101.         messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); 
  102.         messageProperties.setHeader("message_retry_info", JSONObject.toJSON(retryDto)); 
  103.         Message message = new Message(retryDto.getBodyMsg().getBytes(), messageProperties); 
  104.         rabbitTemplate.convertAndSend(retryDto.getExchangeName(), retryDto.getRoutingKey(), message); 
  105.     } 
  106.  
  107.  
  108.  
  109.      
  110.     private void saveMessageRetryInfo(MessageRetryDTO retryDto){ 
  111.         try { 
  112.             mongoTemplate.save(retryDto, "message_retry_info"); 
  113.         } catch (Exception e){ 
  114.             log.error("将异常消息存储到mongodb失败,消息数据:" + retryDto.toString(), e); 
  115.         } 
  116.     } 

3.3、编写监听服务类

在消费端应用的时候,也非常简单,例如,针对扣减库存操作,我们可以通过如下方式进行处理!

  1. @Component 
  2. public class OrderServiceListener extends CommonMessageRetryService { 
  3.  
  4.     private static final Logger log = LoggerFactory.getLogger(OrderServiceListener.class); 
  5.  
  6.      
  7.     @RabbitListener(queues = "mq.order.add"
  8.     public void consume(Message message) { 
  9.         log.info("收到订单下单成功消息: {}", message.toString()); 
  10.         super.initMessage(message); 
  11.     } 
  12.  
  13.  
  14.     @Override 
  15.     protected void execute(MessageRetryDTO messageRetryDto) { 
  16.         //调用扣减库存服务,将业务异常抛出来 
  17.     } 
  18.  
  19.     @Override 
  20.     protected void successCallback(MessageRetryDTO messageRetryDto) { 
  21.         //业务处理成功,回调 
  22.     } 
  23.  
  24.     @Override 
  25.     protected void failCallback(MessageRetryDTO messageRetryDto) { 
  26.         //业务处理失败,回调 
  27.     } 

当消息消费失败,并超过最大次数时,会将消息存储到 mongodb 中,然后像常规数据库操作一样,可以通过 web 接口查询异常消息,并针对具体场景进行重试!

四、小结

可能有的同学会问,为啥不将异常消息存在数据库?

起初的确是存储在 MYSQL 中,但是随着业务的快速发展,订单消息数据结构越来越复杂,数据量也非常的大,甚至大到 MYSQL 中的 text 类型都无法存储,同时这种数据结构也不太适合在 MYSQL 中存储,因此将其迁移到 mongodb!

本文主要围绕消息消费失败这种场景,进行基础的方案和代码实践讲解,可能有理解不到位的地方,欢迎批评指出!

五、参考

 

石杉的架构笔记 - 如何处理消息消费失败问题

 

来源:Java极客技术内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯