以创建订单为例,假设下单后需要做两个操作:
❝在订单表生成订单。
在积分表增加本次订单增加的积分记录。
在单体架构下只需使用@Transactional开启事务,就可以保证数据的一致性。
@Transactional
public void order() {
String orderId = UUID.randomUUID().toString();
// 生成订单
orderService.createOrder(orderId);
// 增加积分
creditService.addCredits(orderId);
}
❝
但在分布式架构下,订单系统和积分系统可能是两个独立的服务,此时就不能使用上述的方法开启事务了,因为它们不处于同一个事务中。
- 在出错的情况下,无法进行全部回滚,只能对当前服务的事务进行回滚。
所以就有可能出现订单生成成功但是积分服务增加积分失败的情况(也可能相反),此时数据处于不一致的状态。
分布式架构下的事务
以下单流程为例,在分布式架构下的处理流程如下:
❝订单服务生成订单。
发送订单生成的MQ消息,积分服务订阅消息,有新的订单生成之后消费消息,增加对应的积分记录。
普通MQ消息存在的问题
❝假如订单创建成功,MQ消息发送成功,但是order方法在返回的前一刻,服务突然宕机。
由于开启了事务,事务还未提交(方法结束后才会正常提交)。
所以订单表并未生成记录,但是MQ却已经发送成功并且被积分服务消费,此时就会存在订单未创建但是积分记录增加的情况。
假如先发送MQ消息再创建订单,如果MQ消息发送成功,创建订单失败,那么同样处于不一致的状态。
@Transactional
public void order() {
String orderId = UUID.randomUUID().toString();
// 创建订单
Order order = orderService.createOrder(orderDTO.getOrderId());
// 发送订单创建的MQ消息
sendOrderMessge(order);
return;
}
可以使用RocketMQ事务消息解决上述问题。
RocketMQ事务消息基础流程
❝Apache RocketMQ在4.3.0版中已经支持分布式事务消息。
事务消息是 RocketMQ 提供的一种消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。
RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息。
基本流程
❝第一阶段:
- 发送 Message,Half Message,即半事务消息。
- 此类型的 Message 是不会被 Consumer 消费。
第二阶段:如果半事务消息投递成功,则会开始执行本地事务。
分为如下三种 Case:
- 本地事务执行成功:
- 会向 Broker 发送 commit 消息,被 commit 过后的 Message 才能被 Consumer 消费到。
- 本地事务执行失败:
会向 Broker 发送 rollback 消息,Broker 则会将刚刚投递的半事务消息删除,从而保证上下游数据的一致性。
如果 Producer 实例或者网络出现了问题,Producer 没能及时地将本地事务执行的结果通知 Broker。
Broker 会通过扫描发现某条 Message 长时间处于半事务消息状态。
Broker 会主动地向 Producer 询问此 Message 对应的事务状态。
值得注意的是:
❝
RocketMQ 并不会无休止的的信息事务状态回查,默认回查 15 次。
如果 15 次回查还是无法得知事务状态,RocketMQ 默认回滚该消息。
RocketMQ事务消息使用限制
❝
事务消息不支持延时消息和批量消息。
事务性消息可能不止一次被检查或消费,所以消费者端需要做好消费幂等。
事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。
- 与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
RocketMQ事务消息基本原理
采用2PC两阶段设计。
❝
将 Message 原本真实的 Topic 和 MessageQueue 进行备份。
- 放入到PROPERTY_REAL_TOPIC、PROPERTY_REAL_QUEUE_ID中保存。
将消息投递到一个内部Topic中RMQ_SYS_TRANS_HALF_TOPIC,该队列专门存储事务消息。
所有的 Half Message 全部都写入到 queueId 为 0 的 MessageQueue。
因为一个 Topic 下只有 1 个 MessageQueue:
- 这个 Topic 下的所有 Message 就是全局有序的,它们会按照先来后到的顺序被消费。
如果本地事务执行成功进行Commit,则将RMQ_SYS_TRANS_HALF_TOPIC 队列中的消息投递到真实的Topic中,供后续流程执行。
- 并删除这条 Half Message ,但删除也是假删除,只是给 Message 打上一个删除的 Tag。
如果本地事务执行失败进行rollback,则直接删除这条 Half Message ,但删除也是假删除。
如果本地事务迟迟没有返回结果 (默认时间是6s),则会触发事务回查机制
- 执行回查之前需要校验检查次数是否到达了最大值(需要手动设置,没有默认值)。
- 或者是当前 Half Message 存在是否超过了 Message 保存的上限,即 3天。
- 如果满足上面条件中的一种Half Message 会被放进 TRANS_CHECK_MAX_TIME_TOPIC Topic 当中。
- 一旦判定为需要执行事务回查逻辑,那么当前这条 Half Message 就算已经被消费了。
- 在没达到最大的校验次数之前,都还需要将其投递到事务队列当中,以便下次重试时再次执行 Check 逻辑。
- 如果回查成功则删除投递的 Half Message。
源码解读
发送事务消息调用的是TransactionMQProducer的sendMessageInTransaction方法:
主要有以下几个步骤:
❝获取事务监听器TransactionListener,如果获取为空或者本地事务执行器LocalTransactionExecuter为空将抛出异常。
因为需要通过TransactionListener或者LocalTransactionExecuter来执行本地事务,所以不能为空。
在消息中设置prepared属性,此时与普通消息(非事务消息)相比多了PROPERTY_TRANSACTION_PREPARED属性。
调用send方法发送prepared消息也就是half消息,发送消息的流程与普通消息一致。
根据消息的发送结果判断:
- 如果发送成功执行本地事务,并返回本地事务执行结果状态,如果返回的执行状态结果为空,将本地事务状态设置为UNKNOW。
- 发送成功之外的其他情况,包括FLUSH_DISK_TIMEOUT刷盘超时、FLUSH_SLAVE_TIMEOUT和SLAVE_NOT_AVAILABLE从节点不可用三种情况。
- 此时意味着half消息发送失败,本地事务状态置为ROLLBACK_MESSAGE回滚消息。
调用endTransaction方法结束事务。
参考
《RocketMQ技术内幕》
https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md。
https://github.com/apache/rocketmq/blob/master/docs/cn/design.md。