今天就跟大家聊聊有关SpringBoot分布式事务中最大努力通知是怎样的,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
环境:springboot.2.4.9 + RabbitMQ3.7.4
什么是最大努力通知
这是一个充值的案例
交互流程 :
1、账户系统调用充值系统接口。
2、充值系统完成支付向账户系统发起充值结果通知 若通知失败,则充值系统按策略进行重复通知。
3、账户系统接收到充值结果通知修改充值状态。
4、账户系统未接收到通知会主动调用充值系统的接口查询充值结果。
通过上边的例子我们总结最大努力通知方案的目标 : 发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。 具体包括 :
1、有一定的消息重复通知机制。 因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。
2、消息校对机制。 如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息信息来满足需求。
最大努力通知与可靠消息一致性有什么不同?
1、解决方案思想不同 可靠消息一致性,发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知方来保证。 最大努力通知,发起通知方尽最大的努力将业务处理结果通知为接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方。
2、两者的业务应用场景不同 可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。 最大努力通知关注的是交易后的通知事务,即将交易结果可靠的通知出去。
3、技术解决方向不同 可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。 最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力地将消息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消费。
通过RabbitMQ实现最大努力通知
关于RabbitMQ相关文章《SpringBoot RabbitMQ消息可靠发送与接收 》,《RabbitMQ消息确认机制confirm 》。
项目结构
两个子模块users-mananger(账户模块),pay-manager(支付模块)
依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency>
子模块pay-manager
配置文件
server: port: 8080 --- spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / publisherConfirmType: correlated publisherReturns: true listener: simple: concurrency: 5 maxConcurrency: 10 prefetch: 5 acknowledgeMode: MANUAL retry: enabled: true initialInterval: 3000 maxAttempts: 3 defaultRequeueRejected: false
实体类
记录充值金额及账户信息
@Entity @Table(name = "t_pay_info") public class PayInfo implements Serializable{ @Id private Long id; private BigDecimal money ; private Long accountId ; }
DAO及Service
public interface PayInfoRepository extends JpaRepository<PayInfo, Long> { PayInfo findByOrderId(String orderId) ; }
@Service public class PayInfoService { @Resource private PayInfoRepository payInfoRepository ; @Resource private RabbitTemplate rabbitTemplate ; // 数据保存完后发送消息(这里发送消息可以应用确认模式或事物模式) @Transactional public PayInfo savePayInfo(PayInfo payInfo) { payInfo.setId(System.currentTimeMillis()) ; PayInfo result = payInfoRepository.save(payInfo) ; CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().replaceAll("-", "")) ; try { rabbitTemplate.convertAndSend("pay-exchange", "pay.#", new ObjectMapper().writeValueAsString(payInfo), correlationData) ; } catch (AmqpException | JsonProcessingException e) { e.printStackTrace(); } return result ; } public PayInfo queryByOrderId(String orderId) { return payInfoRepository.findByOrderId(orderId) ; } }
支付完成后发送消息。
Controller接口
@RestController @RequestMapping("/payInfos") public class PayInfoController { @Resource private PayInfoService payInfoService ; // 支付接口 @PostMapping("/pay") public Object pay(@RequestBody PayInfo payInfo) { payInfoService.savePayInfo(payInfo) ; return "支付已提交,等待结果" ; } @GetMapping("/queryPay") public Object queryPay(String orderId) { return payInfoService.queryByOrderId(orderId) ; } }
子模块users-manager
应用配置
server: port: 8081 --- spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / publisherConfirmType: correlated publisherReturns: true listener: simple: concurrency: 5 maxConcurrency: 10 prefetch: 5 acknowledgeMode: MANUAL retry: enabled: true initialInterval: 3000 maxAttempts: 3 defaultRequeueRejected: false
实体类
@Entity @Table(name = "t_users") public class Users { @Id private Long id; private String name ; private BigDecimal money ; }
账户信息表
@Entity @Table(name = "t_users_log") public class UsersLog { @Id private Long id; private String orderId ; // 0: 支付中,1:已支付,2:已取消 @Column(columnDefinition = "int default 0") private Integer status = 0 ; private BigDecimal money ; private Date createTime ; }
账户充值记录表(去重)
DAO及Service
public interface UsersRepository extends JpaRepository<Users, Long> { } public interface UsersLogRepository extends JpaRepository<UsersLog, Long> { UsersLog findByOrderId(String orderId) ; }
Service类
@Service public class UsersService { @Resource private UsersRepository usersRepository ; @Resource private UsersLogRepository usersLogRepository ; @Transactional public boolean updateMoneyAndLogStatus(Long id, String orderId) { UsersLog usersLog = usersLogRepository.findByOrderId(orderId) ; if (usersLog != null && 1 == usersLog.getStatus()) { throw new RuntimeException("已支付") ; } Users users = usersRepository.findById(id).orElse(null) ; if (users == null) { throw new RuntimeException("账户不存在") ; } users.setMoney(users.getMoney().add(usersLog.getMoney())) ; usersRepository.save(users) ; usersLog.setStatus(1) ; usersLogRepository.save(usersLog) ; return true ; } @Transactional public boolean saveLog(UsersLog usersLog) { usersLog.setId(System.currentTimeMillis()) ; usersLogRepository.save(usersLog) ; return true ; } }
消息监听
@Component public class PayMessageListener { private static final Logger logger = LoggerFactory.getLogger(PayMessageListener.class) ; @Resource private UsersService usersService ; @SuppressWarnings("unchecked") @RabbitListener(queues = {"pay-queue"}) @RabbitHandler public void receive(Message message, Channel channel) { long deliveryTag = message.getMessageProperties().getDeliveryTag() ; byte[] buf = null ; try { buf = message.getBody() ; logger.info("接受到消息:{}", new String(buf, "UTF-8")) ; Map<String, Object> result = new JsonMapper().readValue(buf, Map.class) ; Long id = ((Integer) result.get("accountId")) + 0L ; String orderId = (String) result.get("orderId") ; usersService.updateMoneyAndLogStatus(id, orderId) ; channel.basicAck(deliveryTag, true) ; } catch (Exception e) { logger.error("消息接受出现异常:{}, 异常消息:{}", e.getMessage(), new String(buf, Charset.forName("UTF-8"))) ; e.printStackTrace() ; try { // 应该将这类异常的消息放入死信队列中,以便人工排查。 channel.basicReject(deliveryTag, false); } catch (IOException e1) { logger.error("拒绝消息重入队列异常:{}", e1.getMessage()) ; e1.printStackTrace(); } } } }
Controller接口
@RestController @RequestMapping("/users") public class UsersController { @Resource private RestTemplate restTemplate ; @Resource private UsersService usersService ; @PostMapping("/pay") public Object pay(Long id, BigDecimal money) throws Exception { HttpHeaders headers = new HttpHeaders() ; headers.setContentType(MediaType.APPLICATION_JSON) ; String orderId = UUID.randomUUID().toString().replaceAll("-", "") ; Map<String, String> params = new HashMap<>() ; params.put("accountId", String.valueOf(id)) ; params.put("orderId", orderId) ; params.put("money", money.toString()) ; UsersLog usersLog = new UsersLog() ; usersLog.setCreateTime(new Date()) ; usersLog.setOrderId(orderId); usersLog.setMoney(money) ; usersLog.setStatus(0) ; usersService.saveLog(usersLog) ; HttpEntity<String> requestEntity = new HttpEntity<String>(new ObjectMapper().writeValueAsString(params), headers) ; return restTemplate.postForObject("http://localhost:8080/payInfos/pay", requestEntity, String.class) ; } }
以上是两个子模块的所有代码了
测试
初始数据
账户子模块控制台
支付子模块控制台
数据表数据
看完上述内容,你们对SpringBoot分布式事务中最大努力通知是怎样的有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注编程网行业资讯频道,感谢大家的支持。