文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

SpringBoot分布式事务之可靠消息最终一致性

2024-11-30 11:07

关注

可靠消息最终一致性原理

  1. Producer发送Prepare message到broker。
  2. Prepare Message发送成功后开始执行本地事务。
  3. 如果本地事务执行成功的话则返回commit,如果执行失败则返回rollback。(这个是在事务消息的回调方法里由开发者自己决定commit or rollback)
  4. Producer发送上一步的commit还是rollback到broker,这里有以下两种情况:

如果broker收到了commit/rollback消息 :

如果收到了commit,则broker认为整个事务是没问题的,执行成功的。那么会下发消息给Consumer端消费。

如果收到了rollback,则broker认为本地事务执行失败了,broker将会删除Half Message,不下发给Consumer端。

如果broker未收到消息(如果执行本地事务突然宕机了,相当执行本地事务(executeLocalTransaction)执行结果返回unknow,则和broker未收到确认消息的情况一样处理。):

broker会定时回查本地事务的执行结果:如果回查结果是本地事务已经执行则返回commit,若未执行,则返回unknow。

Producer端回查的结果发送给Broker。Broker接收到的如果是commit,则broker视为整个事务执行成功,如果是rollback,则broker视为本地事务执行失败,broker删除Half Message,不下发给consumer。如果broker未接收到回查的结果(或者查到的是unknow),则broker会定时进行重复回查,以确保查到最终的事务结果。重复回查的时间间隔和次数都可配。

工程结构

图片

建立父子工程,两个子项目account-manager,integral-manager。

依赖


  org.springframework.boot
  spring-boot-starter-data-jpa


  org.springframework.boot
  spring-boot-starter-web


  org.apache.rocketmq
  rocketmq-spring-boot-starter
  2.2.0

Account子模块

server:
  port: 8081
---
rocketmq:
  nameServer: localhost:9876
  producer:
    group: pack-mq
---
spring:
  jpa:
    generateDdl: false
    hibernate:
      ddlAuto: update
    openInView: true
    show-sql: true
---
spring:
  datasource:
    driverClassName: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/account?serverTimeznotallow=GMT%2B8
    username: root
    password: ******
    type: com.zaxxer.hikari.HikariDataSource
    hikari:
      minimumIdle: 10
      maximumPoolSize: 200
      autoCommit: true
      idleTimeout: 30000
      poolName: MasterDatabookHikariCP
      maxLifetime: 1800000
      connectionTimeout: 30000
      connectionTestQuery: SELECT 1
// 用户表
@Entity
@Table(name = "t_account")
public class Account {
  @Id
  private Long id;
  private String name ;
}
// 业务记录表(用来查询去重)
@Entity
@Table(name = "t_account_log")
public class AccountLog {
  @Id
  private Long txid;
  private Date createTime ;
}
public interface AccountRepository extends JpaRepository {
}
public interface AccountLogRepository extends JpaRepository {
}
@Resource
private AccountRepository accountRepository ;
@Resource
private AccountLogRepository accountLogRepository ;
  
// 该方法保存业务数据,同时保存操作记录;操作记录用来回查。
@Transactional
public boolean register(Account account) {
  accountRepository.save(account) ;
  AccountLog accountLog = new AccountLog(account.getId(), new Date()) ;
  accountLogRepository.save(accountLog) ;
  return true ;
}
  
public AccountLog existsTxId(Long txid) {
  return accountLogRepository.findById(txid).orElse(null) ;
}
@Resource
private RocketMQTemplate rocketMQTemplate ;
  
public String sendTx(String topic, String tags, Account account) {
  String uuid = UUID.randomUUID().toString().replaceAll("-", "") ;
  TransactionSendResult result =rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(account).
      setHeader("tx_id", uuid).build(), uuid) ;
  return result.getSendStatus().name() ;
}
@RocketMQTransactionListener
public class ProducerMessageListener implements RocketMQLocalTransactionListener {
  
  @Resource
  private AccountService accountService ;


  @Override
  public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    try {
      Account account = new JsonMapper().readValue((byte[])msg.getPayload(), Account.class) ;
      accountService.register(account) ;
    } catch (Exception e) {
      e.printStackTrace() ;
      return RocketMQLocalTransactionState.ROLLBACK ;
    }
    return RocketMQLocalTransactionState.COMMIT ;
  }


  @Override
  public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
  // 这里检查本地事务是否执行成功
    try {
      Account account = new JsonMapper().readValue((byte[])msg.getPayload(), Account.class) ;
      System.out.println("执行查询ID为:" + account.getId() + " 的数据是否存在") ;
      AccountLog accountLog = accountService.existsTxId(account.getId()) ;
      if (accountLog == null) {
        return RocketMQLocalTransactionState.UNKNOWN ;
      }
    } catch (Exception e) {
      e.printStackTrace() ;
      return RocketMQLocalTransactionState.UNKNOWN ;
    }
    return RocketMQLocalTransactionState.COMMIT ;
  }


}
@RestController
@RequestMapping("/accounts")
public class AccountController {
  @Resource
  private ProducerMessageService messageService ;
  @PostMapping("/send")
  public Object sendMessage(@RequestBody Account account) {
    return messageService.sendTx("tx-topic", "mks", account) ;
  }
}

Integral子模块

@Entity
@Table(name = "t_integral")
public class Integral {
  @Id
  private Long id;
  private Integer score ;
  private Long acccountId ;
}
public interface IntegralRepository extends JpaRepository {
}
@Resource
private IntegralRepository integralRepository ;
  
@Transactional
public Integral saveIntegral(Integral integral) {
  return integralRepository.save(integral) ;
}
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "mks")
@Component
public class IntegralMessageListener implements RocketMQListener {


  @Resource
  private IntegralService integralService ;
  
  @SuppressWarnings("unchecked")
  @Override
  public void onMessage(String message) {
    System.out.println("Integral接收到消息:" + message) ;
    try {
      Map jsonMap = new JsonMapper().readValue(message, Map.class) ;
      Integer id = (Integer) jsonMap.get("id") ;
      integralService.saveIntegral(new Integral(1L, 1000, id + 0L)) ;
    } catch (Exception e) {
      throw new RuntimeException(e) ;
    }
  }


}

测试

分别启动两个子模块

图片


图片

Account模块

图片

Integral模块

图片


当子模块Account执行本地事务发生错误时,事务会回滚并且删除消息。子模块Integral并不会收到消息。

来源:实战案例锦集内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯