Springboot整合Rocketmq系列教程
本教程是基于Springboot2.6.3整合Rocketmq5.0,其中涉及了Rocketmq的安装,消息的发送及消费的代码实现。
本文不会对rocketmq的一些概念、原理,及注意事项进行讲解,因为官网已经写的很清楚,又有中文版,详细访问https://rocketmq.apache.org/zh/docs/quickStart/01quickstart
1.安装
1.1window下启动rocketmq
window下启动rocketmq需要注意的地方有两个:
- 环境变量的配置
- jdk的版本,例如jdk版本太高了,怎么办
1.1.1下载最新版
打开rocketmq的官网:https://rocketmq.apache.org/,点击下载按钮
然后解压放到相应的目录
1.1.2启动mqnamesrv
来到解压的目录,打开E:\rocketmq-all-5.1.0\bin,找到mqnamesrv.cmd
在启动之前还有两个事情要做:
第一、配置rocketmq环境变量
我们打开它的启动文件,内容如下
if not exist "%ROCKETMQ_HOME%\bin\runserver.cmd" echo Please set the ROCKETMQ_HOME variable in your environment! & EXIT /B 1call "%ROCKETMQ_HOME%\bin\runserver.cmd" -Drmq.logback.configurationFile=%ROCKETMQ_HOME%\conf\rmq.namesrv.logback.xml org.apache.rocketmq.namesrv.NamesrvStartup %*IF %ERRORLEVEL% EQU 0 ( ECHO "Namesrv starts OK")
首先是配置ROCKETMQ_HOME环境变量,也就是rocketmq所在的目录。
从内容我们发现最终启动的是runserver.cmd,我们也看看它的内容
if not exist "%JAVA_HOME%\bin\java.exe" echo Please set the JAVA_HOME variable in your environment, We need java(x64)! & EXIT /B 1set "JAVA=%JAVA_HOME%\bin\java.exe"setlocalset BASE_DIR=%~dp0set BASE_DIR=%BASE_DIR:~0,-1%for %%d in (%BASE_DIR%) do set BASE_DIR=%%~dpdset CLASSPATH=.;%BASE_DIR%conf;%BASE_DIR%lib\*;%CLASSPATH%set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"set "JAVA_OPT=%JAVA_OPT% -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"set "JAVA_OPT=%JAVA_OPT% -verbose:gc -Xloggc:"%USERPROFILE%\rmq_srv_gc.log" -XX:+PrintGCDetails -XX:+PrintGCDateStamps"set "JAVA_OPT=%JAVA_OPT% -XX:-OmitStackTraceInFastThrow"set "JAVA_OPT=%JAVA_OPT% -XX:-UseLargePages"set "JAVA_OPT=%JAVA_OPT% %JAVA_OPT_EXT% -cp "%CLASSPATH%"""%JAVA%" %JAVA_OPT% %*
这里需要配置JAVA_HOME环境变量,java所在的目录
第二、检查jdk版本
rocketmq推荐的版本是java8,而我本机配置的环境变量是jdk17的,如果用17去启动会报错,因此为了简单点,
if not exist "D:\Program Files\jdk1.8\bin\java.exe" echo Please set the JAVA_HOME variable in your environment, We need java(x64)! & EXIT /B 1set "JAVA=D:\Program Files\jdk1.8\bin\java.exe"
然后打开cmd,运行
E:\rocketmq-all-5.1.0\bin>mqnamesrv.cmd
如果出现以下的信息,就说明启动成功
The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
1.1.3启动broker
同样的,在mqnamesrv.cmd所在的目录里面我们会发现有mqbroker.cmd和runbroker.cmd这个两个文件,我们只需注意ROCKETMQ_HOME和JAVA_HOME两个环境变量即可。
ROCKETMQ_HOME已经在上面一步完成了,接下来只需把涉及到JAVA_HOME的地方替换成“D:\Program Files\jdk1.8”即可
我们打开cmd,运行以下命令
E:\rocketmq-all-5.1.0\bin>mqbroker.cmd -n localhost:9876The broker[DESKTOP-ED6R0PI, 198.18.0.1:10911] boot success. serializeType=JSON and name server is localhost:9876
说明broker也启动成功了
这样window下启动rocketmq就完成了
1.2linux下启动rocketmq
linux下启动rocketmq主要的问题也是两个:
- jdk版本,上面已经提到过了
- 内存大小的问题
1.2.1检查jdk版本
[root@centos7 ~]# java -versionjava version "1.8.0_212"Java(TM) SE Runtime Environment (build 1.8.0_212-b10)Java HotSpot(TM) 64-Bit Server VM (build 25.212-b10, mixed mode)
jdk是1.8满足安装rocketmq的要求
1.2.2下载rocketmq
[root@centos7 ~]# cd /opt/software/[root@centos7 software]# wget https://archive.apache.org/dist/rocketmq/5.1.0/rocketmq-all-5.1.0-bin-release.zip
我把下载的压缩包放到software目录下,然后解压到module目录下
[root@centos7 software]# unzip rocketmq-all-5.1.0-bin-release.zip -d /opt/module[root@centos7 software]# cd /opt/module/[root@centos7 module]# lldrwxr-xr-x. 6 root root 120 4月 10 16:28 rocketmq-all-5.1.0-bin-release
1.2.3启动mqnamesrv
启动之前要修改一个文件,打开runserver.sh
因为虚拟机的内容没有那么大,上面是我修改后的值,改完之后使用以下命令启动
[root@centos7 rocketmq-all-5.1.0-bin-release]# nohup sh bin/mqnamesrv &[1] 13155[root@centos7 rocketmq-all-5.1.0-bin-release]# nohup: 忽略输入并把输出追加到"nohup.out"
1.2.4启动broker
同样,启动之前也要修改一个文件,打开runbroker.sh
同样的,框起来的值是我修改后的值
然后用以下命令启动
[root@centos7 rocketmq-all-5.1.0-bin-release]# nohup sh bin/mqbroker -n localhost:9876 &[2] 13258[root@centos7 rocketmq-all-5.1.0-bin-release]# nohup: 忽略输入并把输出追加到"nohup.out"
最后我们使用jps命令看看刚才的是否成功启动服务
[root@centos7 rocketmq-all-5.1.0-bin-release]# jps13314 Jps13289 BrokerStartup13182 NamesrvStartup
可以看到服务已经启动成功
这样linux下启动rocketmq就完成了
1.3 RocketMQ仪表板
RocketMQ Dashboard是一个Web界面应用程序,用于管理和监控Apache RocketMQ消息中间件的运行状态。RocketMQ Dashboard提供了一系列的监控指标和管理功能,包括Broker配置、集群概览、主题状态、消费者状态、生产者状态和消息跟踪等。
使用RocketMQ Dashboard,用户可以方便地查看RocketMQ集群的整体状态和各个组件的运行情况,以及执行一些管理操作,如创建和删除主题、查看消费者组订阅信息、发送测试消息等。
打开https://rocketmq.apache.org/download#rocketmq-dashboard,直接下载源码到本地部署
用idea打开,修改配置文件application.yml
rocketmq: config: namesrvAddrs: - 192.168.10.100:9876 - 192.168.10.100:9876
namesrvAddrs地址就是服务的地址,192.168.10.100是我虚拟主机的ip,如果是window环境下,改成localhost
然后启动程序,访问http://localhost:8080/#/
可以在上面添加主题,查看发送的消息
2.准备工作
2.1引入依赖
<dependency> <groupId>org.apache.rocketmqgroupId> <artifactId>rocketmq-spring-boot-starterartifactId> <version>2.2.3version>dependency>
2.2配置文件
server: port: 9002rocketmq: # 服务地址,多个用逗号分开 name-server: 192.168.10.100:9876 producer: # 发送消息超时时间,默认3000 send-message-timeout: 30000 # 生产者组 group: group1 # 发送消息失败重试次数,默认2 retryTimesWhenSendFailed: 2 # 异步消息重试此处,默认2 retryTimesWhenSendAsyncFailed: 2 # 消息最大长度,默认1024 * 1024 * 4(默认4M) maxMessageSize: 4096 # 压缩消息阈值,默认4k(1024 * 4) compressMessageBodyThreshold: 4096 # 是否在内部发送失败时重试另一个broker,默认false retryNextServer: false
2.3创建RocketMQTemplate
封装了RocketMQ的生产者API,提供了一系列简单易用的方法,用于发送不同类型的消息, 后面的例子都会用到它
我们要新建一个ExtRocketMQTemplate类去继承RocketMQTemplate
@ExtRocketMQTemplateConfiguration(nameServer = "${rocketmq.name-server}")public class ExtRocketMQTemplate extends RocketMQTemplate {}
后面发送消息都用这个类
2.4创建一个主题
利用上面的RocketMQ仪表盘新建一个主题,后面用得到
3.发送消息
3.1普通消息
普通消息为 Apache RocketMQ 中最基础的消息 , 普通消息一般应用于微服务解耦、事件驱动、数据集成等场景,这些场景大多数要求数据传输通道具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求。
3.1.1单向消息
指发送消息后,不需要等待Broker的响应,直接返回。这种方式适用于不需要关注消息发送结果的场景,如日志记录、统计信息等。
@Resource(name = "extRocketMQTemplate")private RocketMQTemplate extRocketMQTemplate;public void sendOneWay(){ HeroDTO heroDTO = new HeroDTO(); heroDTO.setId("jialuo"); heroDTO.setName("伽罗"); Message<String> msgs = MessageBuilder.withPayload(JSON.toJSONString(heroDTO)) //设置消息KEYS,一般是数据的唯一ID,主要用于在仪表盘中方便搜索 .setHeader("KEYS", heroDTO.getId()) .build(); //给消息打上射手的标签。主题+tag,中间用“:”分隔,主要是用于消息的过滤,比如说在消费的时候,只消费ESS标签下的消息 String topic = "kingsTopic".concat(":shooter"); extRocketMQTemplate.sendOneWay(topic, msgs);}
3.1.2同步发送消息
syncSend
方法会阻塞当前线程,直到消息发送完成并收到了消息服务器的响应。如果消息发送成功,syncSend
方法会返回一个SendResult
对象,包含了消息的发送状态、消息ID等信息。如果消息发送失败,syncSend
方法会抛出一个MessagingException
异常。
public void syncSend(){ HeroDTO heroDTO = new HeroDTO(); heroDTO.setId("luban"); heroDTO.setName("小鲁班"); Message<String> msgs = MessageBuilder.withPayload(JSON.toJSONString(heroDTO)) .setHeader("KEYS", heroDTO.getId()).build(); SendResult sendResult = extRocketMQTemplate.syncSend("kingsTopic".concat(":shooter"), msgs); log.info(sendResult.toString());}
3.1.3异步发送消息
asyncSend
方法不会阻塞当前线程,而是在另一个线程中异步发送消息。因此,asyncSend
方法会立即返回,不会等待消息发送完成。如果需要等待消息发送完成并处理发送结果,可以使用SendCallback
回调接口。
public void asyncSend(){ HeroDTO heroDTO = new HeroDTO(); heroDTO.setId("sunshangxiang"); heroDTO.setName("孙尚香"); Message<String> msgs = MessageBuilder.withPayload(JSON.toJSONString(heroDTO)).setHeader("KEYS", heroDTO.getId()).build(); extRocketMQTemplate.asyncSend("kingsTopic".concat(":shooter"), msgs, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info(sendResult.toString()); } @Override public void onException(Throwable e) { log.info(e.getMessage()); } });}
3.2顺序消息
顺序消息是 Apache RocketMQ 提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。 相比其他类型消息,顺序消息在发送、存储和投递的处理过程中,更多强调多条消息间的先后顺序关系。
3.2.1单向顺序消息
public void sendOneWayOrderly(){ HeroDTO heroDTO = new HeroDTO(); heroDTO.setId("zhangfei"); heroDTO.setName("张飞"); Message<String> msgs = MessageBuilder.withPayload(JSON.toJSONString(heroDTO)) .setHeader("KEYS", heroDTO.getId()).build(); //顺序消息比普通消息多一个参数,第三个参数只要唯一就行,比如ID extRocketMQTemplate.sendOneWayOrderly("kingsTopic".concat(":tank"), msgs,heroDTO.getId());}
3.2.2同步发送顺序消息
public void syncSendOrderly(){ HeroDTO heroDTO = new HeroDTO(); heroDTO.setId("niumo"); heroDTO.setName("牛魔"); Message<String> msgs = MessageBuilder.withPayload(JSON.toJSONString(heroDTO)) .setHeader("KEYS", heroDTO.getId()).build(); SendResult sendResult = extRocketMQTemplate.syncSendOrderly("kingsTopic".concat(":tank"), msgs, heroDTO.getId()); log.info(sendResult.toString());}
3.2.3异步发送顺序消息
public void asyncSendOrderly(){ HeroDTO heroDTO = new HeroDTO(); heroDTO.setId("xiangyu"); heroDTO.setName("项羽"); Message<String> msgs = MessageBuilder.withPayload(JSON.toJSONString(heroDTO)).setHeader("KEYS", heroDTO.getId()).build(); extRocketMQTemplate.asyncSendOrderly("kingsTopic".concat(":tank"), msgs, heroDTO.getId(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info(sendResult.toString()); } @Override public void onException(Throwable e) { log.info(e.getMessage()); } });}
3.3定时/延时消息
定时消息是 Apache RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。
3.3.1发送延迟消息
场景一:任务超时处理
public void syncSendDelayTimeSeconds(){ HeroDTO heroDTO = new HeroDTO(); heroDTO.setId("baiqi"); heroDTO.setName("白起"); Message<String> msgs = MessageBuilder.withPayload(JSON.toJSONString(heroDTO)).setHeader("KEYS", heroDTO.getId()).build(); //10秒后才能消费这条消息 SendResult sendResult = extRocketMQTemplate.syncSendDelayTimeSeconds("kingsTopic".concat(":tank"), msgs, 10L); //10毫秒后才能消费这条消息 //extRocketMQTemplate.syncSendDelayTimeMills("kingsTopic".concat(":tank"), msgs, 10L); log.info(sendResult.toString());}
场景二:定时处理
public void syncSendDeliverTimeMills(){ HeroDTO heroDTO = new HeroDTO(); heroDTO.setId("liubang"); heroDTO.setName("刘邦"); Message<String> msgs = MessageBuilder.withPayload(JSON.toJSONString(heroDTO)).setHeader("KEYS", heroDTO.getId()).build(); //每天凌晨处理 long time = LocalDate.now().atStartOfDay().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); SendResult sendResult = extRocketMQTemplate.syncSendDeliverTimeMills("kingsTopic".concat(":tank"), msgs, time); log.info(sendResult.toString());}
所有任务都在每天凌晨0点统一处理
3.4事务消息
事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性
public void sendMessageInTransaction() throws InterruptedException { HeroDTO heroDTO = new HeroDTO(); heroDTO.setId("houyi"); heroDTO.setName("后裔"); Message<String> msgs = MessageBuilder.withPayload(JSON.toJSONString(heroDTO)) .setHeader("KEYS", heroDTO.getId()) //设置事务ID .setHeader(RocketMQHeaders.TRANSACTION_ID,"KEY_"+heroDTO.getId()) .build(); TransactionSendResult transactionSendResult = extRocketMQTemplate.sendMessageInTransaction("kingsTopic".concat(":shooter"), msgs, null); log.info(transactionSendResult.toString()); }
另外还需要创建监听器
@RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate")@Slf4jpublic class TransactionListenerImpl implements RocketMQLocalTransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<String, Integer>(); @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { //事务ID String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); int value = transactionIndex.getAndIncrement(); int status = value % 3; assert transId != null; localTrans.put(transId, status); if (status == 0) { log.info("success"); //成功,提交事务 return RocketMQLocalTransactionState.COMMIT; } if (status == 1) { log.info("failure"); //失败,回滚事务 return RocketMQLocalTransactionState.ROLLBACK; } log.info("unknown"); //中间状态 return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT; Integer status = localTrans.get(transId); if (null != status) { switch (status) { case 0: retState = RocketMQLocalTransactionState.COMMIT; break; case 1: retState = RocketMQLocalTransactionState.ROLLBACK; break; case 2: retState = RocketMQLocalTransactionState.UNKNOWN; break; default: break; } } log.info("msgTransactionId:{},TransactionState:{},status:{}",transId,retState,status); return retState; }}
3.5批量发送
public void syncSendBatchMessage() throws InterruptedException { List<HeroDTO> heroList = new ArrayList<>(); HeroDTO heroDTO = new HeroDTO(); heroDTO.setId("geya"); heroDTO.setName("戈娅"); heroList.add(heroDTO); HeroDTO heroDTO1 = new HeroDTO(); heroDTO.setId("direnjie"); heroDTO.setName("狄仁杰"); heroList.add(heroDTO1); List<Message> msgs = new ArrayList<Message>(); for (HeroDTO hero : heroList){ Message<String> message = MessageBuilder.withPayload(JSON.toJSONString(heroDTO)) .setHeader("KEYS", heroDTO.getId()) .build(); msgs.add(message); } SendResult sendResult = extRocketMQTemplate.syncSend("kingsTopic".concat(":shooter"), msgs); log.info(sendResult.toString());}
4.消费消息
消费消息的代码就很简单,新建一个HeroConsumer,继承RocketMQListener监听器
@Service@Slf4j@RocketMQMessageListener(topic = "kingsTopic", consumerGroup = "hero",selectorExpression = "shooter")public class HeroConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info(message); }}
接下来了解一下@RocketMQMessageListener这个注解
topic:主题,指消费者组订阅的消息服务
consumerGroup:消费者组,一个组可以有多个消费者,主要的作用是集群模式负载均衡的实现,广播模式的通知的实现
consumeModel:控制消费模式,你可以选择并发或有序接收消息
messageModel:控制消息模式,广播模式-所有消费者都能接收到信息, 集群模式:无论有多少个消费者,只有一个消费者能够接收到信息,也就是说消息一旦被消费了,其它消费者就不能消费该条消息
selectorExpression:选择哪个标签(tag)下的信息,默认是消费该主题下的所有信息
源码:https://gitee.com/myha/spring-cloud-study-demo/tree/master/rocketmq