文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

实战与原理:如何基于RocketMQ实现分布式事务?

2024-11-30 02:24

关注

由于这是跨服务调用,因此会产生分布式事务。在这里,我们使用RocketMQ的事务消息来实现分布式事务。

首先,在订单服务的应用服务层处理支付逻辑,并调用RocketMQ发送事务消息:

@Override
public String payment(String orderSn) {
    // todo 集成支付宝支付
    // 支付流水号
    String outOrderNo = IdUtils.get32UUID();
    TradeOrder tradeOrder = Optional.ofNullable(tradeOrderService.getByOrderSn(orderSn)).orElseThrow(() -> new BusinessException("订单编号不存在"));

    // 如果订单处于待支付状态
    if (Objects.equals(tradeOrder.getStatus(), OrderStatusEnum.WAITING_PAYMENT.getStatus())) {

        OrderPaidEvent orderPaidEvent = new OrderPaidEvent(orderSn, outOrderNo);

        TransactionSendResult sendResult = enhanceTemplate.sendTransaction("TRADE-ORDER", "ORDER-PAID");

        if (SendStatus.SEND_OK == sendResult.getSendStatus() && sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
            return tradeOrder.getOrderSn();
        } else {
            throw new BusinessException("支付失败...");
        }
    } else {
        throw new BusinessException("订单已支付,请勿重复提交...");
    }
}

在订单服务的基础设施层,创建一个类实现 RocketMQLocalTransactionListener 接口:

该接口有两个方法:

@Component
@Slf4j
public class OrderPaidTransactionConsumer implements RocketMQLocalTransactionListener {
    
    @Resource
    private TransactionTemplate transactionTemplate;
    @Resource
    private TradeOrderService tradeOrderService;
    
   
  
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        
        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) message.getPayload(), OrderPaidEvent.class);
        try {
            // 放到同一个本地事务中
            this.transactionTemplate.executeWithoutResult(status -> {
                String orderSn = orderPaidEvent.getOrderSn();
                // 修改成待发货
                tradeOrderService.changeOrderStatus(orderSn, OrderStatusEnum.AWAITING_SHIPMENT);
            });
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("修改订单状态失败", e);
            // ROLLBACK 则回滚消息,rocketmq将废弃这条消息
            return RocketMQLocalTransactionState.ROLLBACK;
            // 如果是UNKNOWN, 则触发回查
        }

    }

    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        final OrderPaidEvent orderPaidEvent = JsonUtils.byte2Obj((byte[]) message.getPayload(), OrderPaidEvent.class);

        String orderSn = orderPaidEvent.getOrderSn();
        TradeOrder tradeOrder = tradeOrderService.getByOrderSn(orderSn);
        // 如果已经修改成待发货说明本地事务执行成功,此时消费端可以直接消费
        if (Objects.equals(tradeOrder.getStatus(), OrderStatusEnum.AWAITING_SHIPMENT.getStatus())) {
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            // 这里查不到的时候返回 UNKNOWN在于,有可能事务还没有提交,回查就开始了
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}

在库存服务的基础设施层,监听消息以执行库存扣减逻辑:

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "dailymart_inventory_group", topic = "TRADE-ORDER", selectorExpression = "ORDER-PAID")
public class InventoryDeductionConsumer extends EnhanceMessageHandler implements RocketMQListener {
    
    @Resource
    private InventoryDomainService inventoryDomainService;
    
    @Override
    public void onMessage(OrderPaidEvent orderPaidEvent) {
        super.dispatchMessage(orderPaidEvent);
    }
    
    @Override
    protected void handleMessage(OrderPaidEvent orderPaidEvent) throws Exception {
        // 执行库存扣减逻辑
        String orderSn = orderPaidEvent.getOrderSn();
        inventoryDomainService.deductionInventory(orderSn);
    }
}

通过以上步骤,我们完成了RocketMQ事务消息的发送,利用事务消息的特性保证分布式事务的最终一致性。与普通消息相比,事务消息在处理时需要实现 RocketMQLocalTransactionListener 接口,这是事务消息的核心。

介绍完事务消息的使用,接下来我们再来聊聊事务消息的原理。

事务消息的原理

首先,让我们思考一下,如果不使用事务消息会有什么问题。

很容易想到的一个问题就是消息丢失。当保存订单后由于网络问题导致消息丢失,如下图所示:

图片

在不使用RocketMQ的情况下,我们往往会通过 本地消息表 + 补偿重试 的机制来保证消息一定会发送出去。其原理可以参考上篇文章 [Dailymart26:微服务中躲不过的坑 - 分布式事务]。

那RocketMQ是如何解决这个问题的呢?

1. 发送half消息,探测MQ是否正常

在基于RocketMQ的事务消息中,我们不是先执行自身的订单支付逻辑,而是先让订单系统发送一条 half消息 到MQ去。这个half消息本质上是一个订单支付成功的消息,只不过此时库存系统是看不见这个half消息的。然后,我们等待接收这个half消息写入成功的响应通知。

图片

发送half消息的本质其实是为了探测MQ是否仍然正常运行。但问题来了,如上所述,消息会发生丢失,那么half消息丢失怎么办呢?

2. half消息发送失败

在发送half消息时,由于网络原因或者MQ直接挂了,就会导致half消息发送失败。这个时候订单系统需要执行一系列的回滚操作。在我们的场景中,应该执行退款操作,将钱退还给用户,并告知用户交易失败。

3. half消息成功,订单系统执行自己的业务逻辑

如果成功收到half消息的正常响应,此时订单系统应该执行自己的业务逻辑。在我们这个场景中,就是修改订单数据库状态,将其修改为待发货状态。这部分逻辑就对应上述代码中的executeLocalTransaction()方法。

图片

4. 订单本地事务执行失败

如果订单系统执行本地事务失败,则需要发送一个rollback请求给MQ,让其删除这条half消息。

图片

5. 订单本地事务执行成功

如果订单系统的本地事务执行正常,此时需要发送一个commit请求给MQ,要求MQ对之前的half消息进行commit操作,这样库存系统就可以消费这条消息了。

图片

订单创建消息处于half状态时,库存系统是看不见它的。必须等到订单系统执行commit请求,消息被commit后,库存系统才能看到并获取这条消息进行后续处理。

6. half消息发送成功,但是没收到half的响应

以上就是RocketMQ事务消息的正向流程。

然而,还有一个问题:如果订单系统发送half消息成功后却没有收到half消息的响应,该如何处理呢?

在这种情况下,订单系统可能会误以为是发送half消息到MQ失败了。订单系统就会执行回滚流程,退还支付金额,关闭订单。

图片

然而,此时MQ系统中已经存在了一条half消息。这条half消息又该如何处理呢?

在RocketMQ中,有一套补偿流程。RocketMQ会定期扫描处于half状态的消息。如果一直没有对这个消息执行 commit/rollback 操作,超过了一定的时间,RocketMQ就会回调你的订单系统的一个接口,用以确认你本地事务的情况。

当订单系统收到MQ的回查请求时,就需要检索一下数据库,根据订单状态决定执行commit还是rollback。

这部分逻辑就对应上述代码中checkLocalTransaction()方法。

图片

7. rollback 或者 commit 失败怎么办?

通过上述说明,可以看到,RocketMQ是根据rollback或commit操作来决定half消息的状态的。如果业务系统执行了commit操作,则将half消息设置为可见,库存系统可以消费;如果业务系统执行了rollback操作,MQ就会删除half消息。那么问题来了:如果订单系统在执行rollback或commit操作时失败又该如何处理呢?

这时候仍然依赖于前文提到的回查机制。

由于此时MQ中的消息一直处于half状态,超过一定的超时时间后,MQ会发现这个half消息有问题,然后回调你的订单系统的接口。此时订单系统需要根据订单状态来决定执行commit请求还是rollback请求。

以上,就是RocketMQ事务消息的原理。结合文章开头的代码,是不是已经很清晰了呢?

来源:JAVA日知录内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯