文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Springboot如何整合RocketMQ收发消息

2023-06-22 06:16

关注

这篇文章将为大家详细讲解有关Springboot如何整合RocketMQ收发消息,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

Springboot 整合 RocketMQ 收发消息

创建springboot项目

pom.xml添加rocketmq-spring-boot-starter依赖。

<dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-spring-boot-starter</artifactId>    <version>2.1.0</version></dependency>

yml 配置

application.yml

rocketmq:  name-server: 192.168.64.141:9876

application-demo1.yml

使用 demo1 profile 指定生产者组组名

rocketmq:  producer:    group: producer-demo1

application-demo2.yml

使用 demo2 profile 指定生产者组组名

rocketmq:  producer:    group: producer-demo2

测试

demo 1

生产者

package cn.tedu.demo2.m1;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;@Componentpublic class Producer {    @Autowired    private RocketMQTemplate t ;    public  void send(){        //发送同步消息        t.convertAndSend("Topic1:TagA", "Hello world! ");        //发送spring的Message        Message<String> message = MessageBuilder.withPayload("Hello Spring message! ").build();        t.send("Topic1:TagA",message);        //发送异步消息        t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() {            @Override            public void onSuccess(SendResult sendResult) {                System.out.println("发送成功");            }            @Override            public void onException(Throwable throwable) {                System.out.println("发送失败");            }        });        //发送顺序消息        t.syncSendOrderly("Topic1", "98456237,创建", "98456237");        t.syncSendOrderly("Topic1", "98456237,支付", "98456237");        t.syncSendOrderly("Topic1", "98456237,完成", "98456237");    }}

消费者

package cn.tedu.demo2.m1;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1")public class Consumer  implements RocketMQListener<String> {    @Override    public void onMessage(String s) {        System.out.println("收到"+s);    }}

主类

package cn.tedu.demo2.m1;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class Main {    public static void main(String[] args) {        SpringApplication.run(Main.class, args);    }}

测试类

需要放在 test 文件夹

激活 demo1 profile  @ActiveProfiles("demo1")

package cn.tedu.demo2.m1;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.ActiveProfiles;@SpringBootTest@ActiveProfiles("demo1")public class Test1 {    @Autowired    private  Producer producer;    @Test    public void test1(){        producer.send();        try {            Thread.sleep(5000);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

demo 2

发送事务消息

生产者

package cn.tedu.demo2.m2;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;@Componentpublic class Producer {    @Autowired    private RocketMQTemplate t;    public void send(){        Message<String> message = MessageBuilder.withPayload("Hello world").build();        //一旦发送消息,则执行监听器        t.sendMessageInTransaction("Topic2",message,null);    }    @RocketMQTransactionListener    class Lis implements RocketMQLocalTransactionListener {        @Override        public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {            System.out.println("执行本地事务");            return RocketMQLocalTransactionState.UNKNOWN;        }        @Override        public RocketMQLocalTransactionState checkLocalTransaction(Message message) {            System.out.println("执行事务回查");            return RocketMQLocalTransactionState.COMMIT;        }    }}

消费者

package cn.tedu.demo2.m2;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2")public class Consumer implements RocketMQListener<String> {    @Override    public void onMessage(String s) {        System.out.println("收到"+s);    }}

主类

package cn.tedu.demo2.m2;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class Main {    public static void main(String[] args) {        SpringApplication.run(Main.class, args);    }}

测试类

package cn.tedu.demo2.m2;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.ActiveProfiles;@SpringBootTest@ActiveProfiles("demo2")public class Test2 {    @Autowired    private  Producer producer;    @Test    public void  test1(){        producer.send();        //为了能够收到消费者消费的数据,这里通过休眠模拟等待时间        try {            Thread.sleep(30000);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

关于“Springboot如何整合RocketMQ收发消息”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯