文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

SpringBoot整合RocketMQ事务/广播/顺序消息

2024-12-03 06:34

关注

环境:springboot2.3.9RELEASE + RocketMQ4.8.0

依赖

  1.  
  2.   org.springframework.boot 
  3.     spring-boot-starter-web 
  4.  
  5.  
  6.     org.apache.rocketmq 
  7.     rocketmq-spring-boot-starter 
  8.     2.2.0 
  9.  

配置文件

  1. server: 
  2.   port: 8080 
  3. --- 
  4. rocketmq: 
  5.   nameServer: localhost:9876 
  6.   producer: 
  7.     group: demo-mq 

普通消息

发送

  1. @Resource 
  2. private RocketMQTemplate rocketMQTemplate ; 
  3.      
  4. public void send(String message) { 
  5.   rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build()); 

接受

  1. @RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2"
  2. @Component 
  3. public class ConsumerListener implements RocketMQListener { 
  4.  
  5.     @Override 
  6.     public void onMessage(String message) { 
  7.         System.out.println("接收到消息:" + message) ; 
  8.     } 
  9.  

顺序消息

发送

  1. @Resource 
  2. private RocketMQTemplate rocketMQTemplate ; 
  3.  
  4. public void sendOrder(String topic, String message, String tags, int id) { 
  5.     rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(),  
  6.             "order-" + id, new SendCallback() { 
  7.                 @Override 
  8.                 public void onSuccess(SendResult sendResult) { 
  9.                     System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ; 
  10.                 } 
  11.                 @Override 
  12.                 public void onException(Throwable e) { 
  13.                     e.printStackTrace() ; 
  14.                 } 
  15.             }); 

这里是根据hashkey将消息发送到不同的队列中

  1. @RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group",  
  2.     selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY) 
  3. @Component 
  4. public class ConsumerOrderListener implements RocketMQListener { 
  5.  
  6.     @Override 
  7.     public void onMessage(String message) { 
  8.         System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ; 
  9.     } 
  10.  

consumeMode = ConsumeMode.ORDERLY,指明了消息模式为顺序模式,一个队列,一个线程。

结果

 

当consumeMode = ConsumeMode.CONCURRENTLY执行结果如下:

集群/广播消息模式

发送端

  1. @Resource 
  2. private RocketMQTemplate rocketMQTemplate ; 
  3.      
  4. public void send(String topic, String message, String tags) { 
  5.     rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ; 

集群消息模式

消费端

  1. @RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",  
  2.     selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING) 
  3. @Component 
  4. public class ConsumerBroadListener implements RocketMQListener { 
  5.  
  6.     @Override 
  7.     public void onMessage(String message) { 
  8.         System.out.println("ConsumerBroadListener1接收到消息:" + message) ; 
  9.     } 
  10.  

messageModel = MessageModel.CLUSTERING

测试

启动两个服务分别端口是8080,8081

8080服务

8081服务

集群消息模式下,每个服务分别接收一部分消息,实现了负载均衡

广播消息模式

消费端

  1. @RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",  
  2.     selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING) 
  3. @Component 
  4. public class ConsumerBroadListener implements RocketMQListener { 
  5.  
  6.     @Override 
  7.     public void onMessage(String message) { 
  8.         System.out.println("ConsumerBroadListener1接收到消息:" + message) ; 
  9.     } 
  10.  

messageModel = MessageModel.BROADCASTING

测试

启动两个服务分别端口是8080,8081

8080服务

8081服务

集群消息模式下,每个服务分别都接受了同样的消息。

事务消息

RocketMQ事务的3个状态

TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息

TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。

TransactionStatus.Unknown :中间状态,它代表需要检查消息队列来确定状态。

RocketMQ实现事务消息主要分为两个阶段:正常事务的发送及提交、事务信息的补偿流程 整体流程为:

正常事务发送与提交阶段

生产者发送一个半消息给MQServer(半消息是指消费者暂时不能消费的消息)

服务端响应消息写入结果,半消息发送成功

开始执行本地事务

根据本地事务的执行状态执行Commit或者Rollback操作

事务信息的补偿流程

如果MQServer长时间没收到本地事务的执行状态会向生产者发起一个确认回查的操作请求

生产者收到确认回查请求后,检查本地事务的执行状态

根据检查后的结果执行Commit或者Rollback操作

补偿阶段主要是用于解决生产者在发送Commit或者Rollback操作时发生超时或失败的情况。

发送端

  1. @Resource 
  2. private RocketMQTemplate rocketMQTemplate ; 
  3.      
  4. public void sendTx(String topic, Long id, String tags) { 
  5.     rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload( 
  6.             new Users(id, UUID.randomUUID().toString().replaceAll("-"""))). 
  7.             setHeader("BID", UUID.randomUUID().toString().replaceAll("-""")).build(),  
  8.             UUID.randomUUID().toString().replaceAll("-""")) ; 

生产者对应的监听器

  1. @RocketMQTransactionListener 
  2. public class ProducerTxListener implements RocketMQLocalTransactionListener { 
  3.      
  4.     @Resource 
  5.     private BusinessService bs ; 
  6.  
  7.     @Override 
  8.     public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { 
  9.         // 这里执行本地的事务操作,比如保存数据。 
  10.         try { 
  11.             // 创建一个日志记录表,将这唯一的ID存入数据库中,在下面的check方法中可以根据这个id查询是否有数据 
  12.             String id = (String) msg.getHeaders().get("BID") ; 
  13.             Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ; 
  14.             System.out.println("消息内容:" + users + "\t参与数据:" + arg + "\t本次事务的唯一编号:" + id) ; 
  15.             bs.save(users, new UsersLog(users.getId(), id)) ; 
  16.         } catch (Exception e) { 
  17.             e.printStackTrace() ; 
  18.             return RocketMQLocalTransactionState.ROLLBACK ; 
  19.         } 
  20.         return RocketMQLocalTransactionState.COMMIT ; 
  21.     } 
  22.  
  23.     @Override 
  24.     public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { 
  25.         // 这里检查本地事务是否执行成功 
  26.         String id = (String) msg.getHeaders().get("BID") ; 
  27.         System.out.println("执行查询ID为:" + id + " 的数据是否存在") ; 
  28.         UsersLog usersLog = bs.queryUsersLog(id) ; 
  29.         if (usersLog == null) { 
  30.             return RocketMQLocalTransactionState.ROLLBACK ; 
  31.         } 
  32.         return RocketMQLocalTransactionState.COMMIT ; 
  33.     } 
  34.  

消费端

  1. @RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10"
  2. @Component 
  3. public class ConsumerTxListener implements RocketMQListener { 
  4.  
  5.     @Override 
  6.     public void onMessage(Users users) { 
  7.         System.out.println("TX接收到消息:" + users) ; 
  8.     } 
  9.  

Service

  1. @Transactional 
  2. public boolean save(Users users, UsersLog usersLog) { 
  3.     usersRepository.save(users) ; 
  4.     usersLogRepository.save(usersLog) ; 
  5.     if (users.getId() == 1) { 
  6.         throw new RuntimeException("数据错误") ; 
  7.     } 
  8.     return true ; 
  9.      
  10. public UsersLog queryUsersLog(String bid) { 
  11.     return usersLogRepository.findByBid(bid) ; 

Controller

  1. @GetMapping("/tx/{id}"
  2. public Object sendTx(@PathVariable("id")Long id) { 
  3.     ps.sendTx("tx-topic", id, "tag10") ; 
  4.     return "send transaction success" ; 

测试

调用接口后,控制台输出:

从打印日志看出来都保存完毕了后 消费端才接受到消息。

删除数据,再测试ID为1会报错的。

数据库中没有数据。。。

是不是也不是很复杂,2个阶段来处理。

完毕!!!

 

来源:今日头条内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯