文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

分布式事务实现方案:一文详解RocketMQ事务消息

2024-12-13 14:19

关注

实现原理

RocketMQ事务消息执行流程如下:

  1. 生产者将消息发送至RocketMQ服务端。
  2. RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息(Half Message)。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。

二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

  1. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
  2. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  3. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

图片

代码实现

RocketMQ事务消息示例如下:

//演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。
private static boolean checkOrderById(String orderId) {
    return true;
}

//演示demo,模拟本地事务的执行结果。
private static boolean doLocalTransaction() {
    return true;
}

public static void main(String[] args) throws ClientException {
    ClientServiceProvider provider = new ClientServiceProvider();
    MessageBuilder messageBuilder = new MessageBuilderImpl();
    //构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。
    Producer producer = provider.newProducerBuilder()
            .setTransactionChecker(messageView -> {
                
                final String orderId = messageView.getProperties().get("OrderId");
                if (Strings.isNullOrEmpty(orderId)) {
                    // 错误的消息,直接返回Rollback。
                    return TransactionResolution.ROLLBACK;
                }
                return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
            })
            .build();
    //开启事务
    final Transaction transaction;
    try {
        transaction = producer.beginTransaction();
    } catch (ClientException e) {
        e.printStackTrace();
        //事务开启失败,直接退出。
        return;
    }
    Message message = messageBuilder.setTopic("topic")
            //设置消息索引键,可根据关键字精确查找某条消息。
            .setKeys("messageKey")
            //设置消息Tag,用于消费端根据指定Tag过滤消息。
            .setTag("messageTag")
            //一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。
            .addProperty("OrderId", "xxx")
            //消息体。
            .setBody("messageBody".getBytes())
            .build();
    //发送半事务消息
    final SendReceipt sendReceipt;
    try {
        sendReceipt = producer.send(message, transaction);
    } catch (ClientException e) {
        //半事务消息发送失败,事务可以直接退出并回滚。
        return;
    }
    
    boolean localTransactionOk = doLocalTransaction();
    if (localTransactionOk) {
        try {
            transaction.commit();
        } catch (ClientException e) {
            // 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。
            e.printStackTrace();
        }
    } else {
        try {
            transaction.rollback();
        } catch (ClientException e) {
            // 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。
            e.printStackTrace();
        }
    }
}

注意事项

总结

RocketMQ 事务消息是分布式事务中一种常见的实现方案,只是把发送消息和本地事务放在一个事务中,并且只保证最终一致性,无法保证强一致性。 原因有两点:

  1. 执行完成本地事务后,在commit事务消息之前,这段时间内数据是不一致的,所以只是保证了发送消息和本地事务的最终一致性。
  2. 在commit事务消息之后,然后把消息投递给消费者。至于消费者是否消费消息,什么时候消费?也都是不可控的,所以也只能尽量保证数据最终一致性。
来源:一灯架构内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯