文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

SpringBoot分布式事务之最大努力通知

2024-12-03 06:16

关注

环境:springboot.2.4.9 + RabbitMQ3.7.4

什么是最大努力通知

这是一个充值的案例

交互流程 :

账户系统调用充值系统接口。

充值系统完成支付向账户系统发起充值结果通知 若通知失败,则充值系统按策略进行重复通知。

账户系统接收到充值结果通知修改充值状态。

账户系统未接收到通知会主动调用充值系统的接口查询充值结果。

通过上边的例子我们总结最大努力通知方案的目标 : 发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。 具体包括 :

有一定的消息重复通知机制。 因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。

消息校对机制。 如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息信息来满足需求。

最大努力通知与可靠消息一致性有什么不同?

解决方案思想不同 可靠消息一致性,发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知方来保证。 最大努力通知,发起通知方尽最大的努力将业务处理结果通知为接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方。

两者的业务应用场景不同 可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。 最大努力通知关注的是交易后的通知事务,即将交易结果可靠的通知出去。

技术解决方向不同 可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。 最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力地将消息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消费。

通过RabbitMQ实现最大努力通知

关于RabbitMQ相关文章《SpringBoot RabbitMQ消息可靠发送与接收 》,《RabbitMQ消息确认机制confirm 》。

 

项目结构

两个子模块users-mananger(账户模块),pay-manager(支付模块)

依赖

  1.  
  2.  org.springframework.boot 
  3.  spring-boot-starter-data-jpa 
  4.  
  5.  
  6.  org.springframework.boot 
  7.  spring-boot-starter-web 
  8.  
  9.  
  10.  org.springframework.boot 
  11.  spring-boot-starter-amqp 
  12.  
  13.  
  14.  mysql 
  15.  mysql-connector-java 
  16.  runtime 
  17.  

子模块pay-manager

配置文件

  1. server: 
  2.   port: 8080 
  3. --- 
  4. spring: 
  5.   rabbitmq: 
  6.     host: localhost 
  7.     port: 5672 
  8.     username: guest 
  9.     password: guest 
  10.     virtual-host: / 
  11.     publisherConfirmType: correlated 
  12.     publisherReturns: true 
  13.     listener: 
  14.       simple: 
  15.         concurrency: 5 
  16.         maxConcurrency: 10 
  17.         prefetch: 5 
  18.         acknowledgeMode: MANUAL 
  19.         retry: 
  20.           enabled: true 
  21.           initialInterval: 3000 
  22.           maxAttempts: 3 
  23.         defaultRequeueRejected: false 

实体类

记录充值金额及账户信息

  1. @Entity 
  2. @Table(name = "t_pay_info"
  3. public class PayInfo implements Serializable
  4.  @Id 
  5.  private Long id; 
  6.  private BigDecimal money ; 
  7.  private Long accountId ; 
  8. }   

DAO及Service

  1. public interface PayInfoRepository extends JpaRepository { 
  2.  PayInfo findByOrderId(String orderId) ; 
  1. @Service 
  2. public class PayInfoService { 
  3.      
  4.     @Resource 
  5.     private PayInfoRepository payInfoRepository ; 
  6.     @Resource 
  7.     private RabbitTemplate rabbitTemplate ; 
  8.      
  9.   // 数据保存完后发送消息(这里发送消息可以应用确认模式或事物模式) 
  10.     @Transactional 
  11.     public PayInfo savePayInfo(PayInfo payInfo) { 
  12.         payInfo.setId(System.currentTimeMillis()) ; 
  13.         PayInfo result = payInfoRepository.save(payInfo) ; 
  14.         CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().replaceAll("-""")) ; 
  15.         try { 
  16.             rabbitTemplate.convertAndSend("pay-exchange""pay.#", new ObjectMapper().writeValueAsString(payInfo), correlationData) ; 
  17.         } catch (AmqpException | JsonProcessingException e) { 
  18.             e.printStackTrace(); 
  19.         } 
  20.         return result ; 
  21.     } 
  22.      
  23.     public PayInfo queryByOrderId(String orderId) { 
  24.         return payInfoRepository.findByOrderId(orderId) ; 
  25.     } 
  26.      

支付完成后发送消息。

Controller接口

  1. @RestController 
  2. @RequestMapping("/payInfos"
  3. public class PayInfoController { 
  4.  @Resource 
  5.  private PayInfoService payInfoService ; 
  6.      
  7.   // 支付接口 
  8.  @PostMapping("/pay"
  9.  public Object pay(@RequestBody PayInfo payInfo) { 
  10.   payInfoService.savePayInfo(payInfo) ; 
  11.   return "支付已提交,等待结果" ; 
  12.  } 
  13.      
  14.  @GetMapping("/queryPay"
  15.  public Object queryPay(String orderId) { 
  16.   return payInfoService.queryByOrderId(orderId) ; 
  17.  } 
  18.      

子模块users-manager

应用配置

  1. server: 
  2.   port: 8081 
  3. --- 
  4. spring: 
  5.   rabbitmq: 
  6.     host: localhost 
  7.     port: 5672 
  8.     username: guest 
  9.     password: guest 
  10.     virtual-host: / 
  11.     publisherConfirmType: correlated 
  12.     publisherReturns: true 
  13.     listener: 
  14.       simple: 
  15.         concurrency: 5 
  16.         maxConcurrency: 10 
  17.         prefetch: 5 
  18.         acknowledgeMode: MANUAL 
  19.         retry: 
  20.           enabled: true 
  21.           initialInterval: 3000 
  22.           maxAttempts: 3 
  23.         defaultRequeueRejected: false 

实体类

  1. @Entity 
  2. @Table(name = "t_users"
  3. public class Users { 
  4.  @Id 
  5.  private Long id; 
  6.  private String name ; 
  7.  private BigDecimal money ; 
  8. }   

账户信息表

  1. @Entity 
  2. @Table(name = "t_users_log"
  3. public class UsersLog { 
  4.  @Id 
  5.  private Long id; 
  6.  private String orderId ; 
  7.  // 0: 支付中,1:已支付,2:已取消 
  8.  @Column(columnDefinition = "int default 0"
  9.  private Integer status = 0 ; 
  10.  private BigDecimal money ; 
  11.  private Date createTime ; 

账户充值记录表(去重)

DAO及Service

  1. public interface UsersRepository extends JpaRepository { 
  2. public interface UsersLogRepository extends JpaRepository { 
  3.  UsersLog findByOrderId(String orderId) ; 

Service类

  1. @Service 
  2. public class UsersService {  
  3.     @Resource 
  4.     private UsersRepository usersRepository ; 
  5.     @Resource 
  6.     private UsersLogRepository usersLogRepository ; 
  7.      
  8.  @Transactional 
  9.  public boolean updateMoneyAndLogStatus(Long id, String orderId) { 
  10.   UsersLog usersLog = usersLogRepository.findByOrderId(orderId) ; 
  11.   if (usersLog != null && 1 == usersLog.getStatus()) { 
  12.    throw new RuntimeException("已支付") ; 
  13.   } 
  14.   Users users = usersRepository.findById(id).orElse(null) ; 
  15.   if (users == null) { 
  16.    throw new RuntimeException("账户不存在") ; 
  17.   } 
  18.   users.setMoney(users.getMoney().add(usersLog.getMoney())) ; 
  19.   usersRepository.save(users) ; 
  20.   usersLog.setStatus(1) ; 
  21.   usersLogRepository.save(usersLog) ; 
  22.   return true ; 
  23.  } 
  24.      
  25.  @Transactional 
  26.  public boolean saveLog(UsersLog usersLog) { 
  27.   usersLog.setId(System.currentTimeMillis()) ; 
  28.   usersLogRepository.save(usersLog) ; 
  29.   return true ; 
  30.  } 

消息监听

  1. @Component 
  2. public class PayMessageListener { 
  3.      
  4.  private static final Logger logger = LoggerFactory.getLogger(PayMessageListener.class) ; 
  5.      
  6.  @Resource 
  7.  private  UsersService usersService ; 
  8.      
  9.  @SuppressWarnings("unchecked"
  10.  @RabbitListener(queues = {"pay-queue"}) 
  11.  @RabbitHandler 
  12.  public void receive(Message message, Channel channel) { 
  13.   long deliveryTag = message.getMessageProperties().getDeliveryTag() ; 
  14.   byte[] buf =  null ; 
  15.   try { 
  16.    buf = message.getBody() ; 
  17.    logger.info("接受到消息:{}", new String(buf, "UTF-8")) ; 
  18.    Map result = new JsonMapper().readValue(buf, Map.class) ; 
  19.    Long id = ((Integer) result.get("accountId")) + 0L ; 
  20.    String orderId = (String) result.get("orderId") ; 
  21.    usersService.updateMoneyAndLogStatus(id, orderId) ; 
  22.    channel.basicAck(deliveryTag, true) ; 
  23.   } catch (Exception e) { 
  24.    logger.error("消息接受出现异常:{}, 异常消息:{}", e.getMessage(), new String(buf, Charset.forName("UTF-8"))) ; 
  25.    e.printStackTrace() ; 
  26.    try { 
  27.     // 应该将这类异常的消息放入死信队列中,以便人工排查。 
  28.     channel.basicReject(deliveryTag, false); 
  29.    } catch (IOException e1) { 
  30.     logger.error("拒绝消息重入队列异常:{}", e1.getMessage()) ; 
  31.     e1.printStackTrace(); 
  32.    } 
  33.   } 
  34.  } 

Controller接口

  1. @RestController 
  2. @RequestMapping("/users"
  3. public class UsersController { 
  4.      
  5.     @Resource 
  6.     private RestTemplate restTemplate ; 
  7.     @Resource 
  8.     private UsersService usersService ; 
  9.      
  10.     @PostMapping("/pay"
  11.     public Object pay(Long id, BigDecimal money) throws Exception { 
  12.         HttpHeaders headers = new HttpHeaders() ; 
  13.         headers.setContentType(MediaType.APPLICATION_JSON) ; 
  14.         String orderId = UUID.randomUUID().toString().replaceAll("-""") ; 
  15.         Map params = new HashMap<>() ; 
  16.         params.put("accountId", String.valueOf(id)) ; 
  17.         params.put("orderId", orderId) ; 
  18.         params.put("money", money.toString()) ; 
  19.          
  20.         UsersLog usersLog = new UsersLog() ; 
  21.         usersLog.setCreateTime(new Date()) ; 
  22.         usersLog.setOrderId(orderId); 
  23.         usersLog.setMoney(money) ; 
  24.         usersLog.setStatus(0) ; 
  25.         usersService.saveLog(usersLog) ; 
  26.         HttpEntity requestEntity = new HttpEntity(new ObjectMapper().writeValueAsString(params), headers) ; 
  27.         return restTemplate.postForObject("http://localhost:8080/payInfos/pay", requestEntity, String.class) ; 
  28.     } 
  29.      

以上是两个子模块的所有代码了

测试

初始数据

账户子模块控制台

支付子模块控制台

数据表数据

完毕!!!

 

来源:今日头条内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯