在这种情况下,简单的事务控制(如本地事务)无法有效地解决跨服务的数据一致性问题。为了解决这个挑战,事务外包(Transactional Outbox)模式 被提出,以确保服务在处理数据库操作时,同时能够可靠地发送消息,从而解决了数据库与消息队列之间的不一致问题。
什么是事务外包模式?
事务外包模式 是一种保证数据库操作与消息传递之间一致性的设计模式。它的核心思想是将所有需要发送的消息存储在数据库中,将其与数据库操作绑定在同一事务内。这样,当数据库操作成功提交时,消息也会被持久化到数据库,后续通过定时任务或事件轮询机制将这些消息发送到消息系统,如 Kafka、RabbitMQ 或其他外部系统。
传统的分布式事务通过两阶段提交(2PC)来保证一致性,但两阶段提交会带来较大的性能开销,且难以处理网络或系统故障。相比之下,事务外包模式提供了一种高效、灵活的替代方案:
- 事务一致性:通过将消息和数据库操作放在同一事务内,保证它们要么同时成功,要么同时失败。
- 异步处理:消息可以通过异步方式发送到消息队列,避免对数据库操作产生延迟。
- 高可用性和容错性:即使在消息系统不可用的情况下,消息依然能够可靠地保存在数据库中,等待消息系统恢复后发送。
通过这种方式,我们可以在保持服务间松耦合的同时,确保分布式系统的数据一致性和高可用性。
事务外包模式的工作原理
- 业务数据与消息一起持久化:当一个服务执行数据库操作时,消息并不会立即发送,而是与业务数据一起存储在数据库的 Outbox 表中。这样,业务数据和消息的持久化在同一个事务中被处理,确保两者的一致性。
- 定时轮询消息表:系统会通过定时任务轮询 Outbox 表,查找未发送的消息,并将其发送到目标消息系统(如 Kafka 或 RabbitMQ)。
- 消息传递确认:当消息成功发送后,Outbox 表中的相应记录会被删除或标记为已处理。
这种模式的核心思想是将消息的可靠传递变成一个可控的、异步的过程,并通过持久化机制保证即使消息系统暂时不可用,也不会丢失消息。
运行效果:
图片
若想获取项目完整代码以及其他文章的项目源码,且在代码编写时遇到问题需要咨询交流,欢迎加入下方的知识星球。
Spring Boot 实现事务外包模式
项目基础配置
为了实现事务外包模式,我们将使用 Spring Boot、JPA、Lombok 和 Thymeleaf,并通过定时任务来轮询数据库中的 Outbox 表。下面的 pom.xml 配置了项目所需的依赖:
4.0.0
org.springframework.boot
spring-boot-starter-parent
3.3.4
com.icoderoad
outbox
0.0.1-SNAPSHOT
outbox
Demo project for Spring Boot
17
org.springframework.boot
spring-boot-starter-data-jpa
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-thymeleaf
org.projectlombok
lombok
provided
com.mysql
mysql-connector-j
runtime
org.springframework.boot
spring-boot-starter-test
test
org.springframework.boot
spring-boot-maven-plugin
application.yaml 配置
我们使用 Mysql 数据库进行持久化,yaml 文件配置了数据库连接和 Outbox 的轮询间隔。
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/mydb?useSSL=false&allowPublicKeyRetrieval=true&serverTimeznotallow=UTC
username: root
password: root
jpa:
hibernate:
ddl-auto: update
show-sql: true
outbox:
polling-interval: 1000 # 设置轮询间隔为 1 秒
使用 @ConfigurationProperties 读取配置
为了方便管理和修改轮询间隔等配置项,我们使用 @ConfigurationProperties 注解将配置文件中的属性注入到 Java 类中。
package com.icoderoad.outbox.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Data;
@Data
@Component
@ConfigurationProperties(prefix = "outbox")
public class OutboxProperties {
private long pollingInterval;
}
实现事务外包模式
在 Spring Boot 中,事务外包模式可以通过一个简单的数据库表(如 OutboxEvent)来持久化所有未处理的消息。每次有业务操作时,生成相应的事件并持久化到数据库表中,然后通过定时任务处理这些事件。
数据库实体类
package com.icoderoad.outbox.entity;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import lombok.Data;
@Data
@Entity
public class OutboxEvent {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String aggregateType;
private String aggregateId;
private String eventType;
private String payload; // 存储事件内容
}
Order 类实现
package com.icoderoad.outbox.entity;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import lombok.Data;
@Data
@Entity
@Table(name = "orders")
public class Order {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String orderName; // 订单名称
}
OrderRepository 类实现
package com.icoderoad.outbox.repository;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import com.icoderoad.outbox.entity.Order;
@Repository
public interface OrderRepository extends JpaRepository {
}
OutboxEventRepository 类实现
package com.icoderoad.outbox.repository;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import com.icoderoad.outbox.entity.OutboxEvent;
@Repository
public interface OutboxEventRepository extends JpaRepository {
// 这里可以定义自定义查询方法,例如查询未处理的事件等
// List findByProcessedFalse();
}
业务服务类
业务逻辑中,当执行订单操作时,事件不会直接发送,而是先持久化到 Outbox 表中。
package com.icoderoad.outbox.service;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.icoderoad.outbox.entity.Order;
import com.icoderoad.outbox.entity.OutboxEvent;
import com.icoderoad.outbox.repository.OrderRepository;
import com.icoderoad.outbox.repository.OutboxEventRepository;
@Service
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxEventRepository outboxEventRepository;
public OrderService(OrderRepository orderRepository, OutboxEventRepository outboxEventRepository) {
this.orderRepository = orderRepository;
this.outboxEventRepository = outboxEventRepository;
}
@Transactional
public void placeOrder(Order order) {
// 先保存订单信息,确保生成 ID
Order savedOrder = orderRepository.save(order);
// 保存订单之后,才能获取订单的 ID
OutboxEvent event = new OutboxEvent();
event.setAggregateType("Order");
event.setAggregateId(savedOrder.getId().toString()); // 使用保存后的订单 ID
event.setEventType("OrderCreated");
event.setPayload(savedOrder.toString()); // 可以根据需要将订单信息序列化成 JSON
// 保存事件信息
outboxEventRepository.save(event);
}
}
定时轮询任务
定时任务用于从 Outbox 表中读取未处理的事件并将其发送至消息队列。
package com.icoderoad.outbox.poller;
import java.util.List;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.icoderoad.outbox.entity.OutboxEvent;
import com.icoderoad.outbox.repository.OutboxEventRepository;
@Component
public class OutboxPoller {
private final OutboxEventRepository outboxEventRepository;
public OutboxPoller(OutboxEventRepository outboxEventRepository) {
this.outboxEventRepository = outboxEventRepository;
}
@Scheduled(fixedDelayString = "${outbox.polling-interval}")
public void pollOutbox() {
List events = outboxEventRepository.findAll();
for (OutboxEvent event : events) {
// 发送消息至消息队列
// messageQueue.send(event);
// 删除或标记为已处理
outboxEventRepository.delete(event);
}
}
}
后端控制器
package com.icoderoad.outbox.controller;
import java.util.HashMap;
import java.util.Map;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.icoderoad.outbox.entity.Order;
import com.icoderoad.outbox.service.OrderService;
@RestController
@RequestMapping("/api/orders")
public class OrderController {
private final OrderService orderService;
public OrderController(OrderService orderService) {
this.orderService = orderService;
}
@PostMapping
public Map placeOrder(@RequestBody Order order) {
orderService.placeOrder(order);
Map response = new HashMap<>();
response.put("status", "success");
response.put("message", "订单提交成功!");
return response;
}
}
前端实现
使用 Thymeleaf 渲染页面,并使用 JQuery 通过 AJAX 请求后端 API,将结果以 Bootstrap 风格的提示框显示。
在 src/main/resources/templates 目录下创建 index.html 文件:
订单页面
订单表单
总结
事务外包模式 提供了一种简洁高效的解决方案,确保在微服务架构下的消息传递和数据一致性问题。通过将业务数据和事件存储在同一个数据库事务中,并结合定时轮询机制将事件发送至消息队列,开发者能够轻松处理分布式环境中的一致性挑战。与传统的两阶段提交相比,事务外包模式提供了更好的可扩展性、性能和可靠性。
同时,本文通过前后端结合的方式展示了如何使用 Thymeleaf、JQuery 和 Bootstrap 实现一个订单系统。这种架构可以进一步扩展,如支持更复杂的消息系统或集成更多服务,以满足不断增长的业务需求。