SpringBoot整合RabbitMQ
主要实现RabbitMQ以下三种消息队列:
- 简单消息队列(演示direct模式)
- 基于RabbitMQ特性的延时消息队列
- 基于RabbitMQ相关插件的延时消息队列
公共资源
1. 引入pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 配置yml文件
基于上篇《RabbitMQ安装与配置》实现的情况下,进行基础配置。
spring:
rabbitmq:
host: 121.5.168.31
port: 5672 # 默认可省略
virtual-host:
public class Constants {
public final static String HORSE_SIMPLE_QUEUE = "HORSE_SIMPLE_QUEUE";
public final static String HORSE_SIMPLE_EXCHANGE = "HORSE_SIMPLE_EXCHANGE";
public final static String HORSE_SIMPLE_KEY = "HORSE_SIMPLE_KEY";
public final static String HORSE_ANNOTATION_QUEUE = "HORSE_ANNOTATION_QUEUE";
public final static String HORSE_ANNOTATION_EXCHANGE = "HORSE_ANNOTATION_EXCHANGE";
public final static String HORSE_ANNOTATION_KEY = "HORSE_ANNOTATION_KEY";
/
public final static String HORSE_DELAY_EXCHANGE = "HORSE_DELAY_EXCHANGE";
public final static String HORSE_DELAY_QUEUE = "HORSE_DELAY_QUEUE";
public final static String HORSE_DELAY_KEY = "HORSE_DELAY_KEY";
public final static String HORSE_DEAD_EXCHANGE = "HORSE_DEAD_EXCHANGE";
public final static String HORSE_DEAD_QUEUE = "HORSE_DEAD_QUEUE";
public final static String HORSE_DEAD_KEY = "HORSE_DEAD_KEY";
/
public final static String HORSE_PLUGIN_EXCHANGE = "HORSE_PLUGIN_EXCHANGE";
public final static String HORSE_PLUGIN_QUEUE = "HORSE_PLUGIN_QUEUE";
public final static String HORSE_PLUGIN_KEY = "HORSE_PLUGIN_KEY";
}
简单消息队列(direct模式)
4. RabbitTemplate模板配置
主要定义消息投递Exchange成功回调函数和消息从Exchange投递到消息队列失败的回调函数。
package com.topsun.rabbit;
import com.sun.org.apache.xpath.internal.operations.Bool;
import com.topsun.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
private static Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 触发setReturnCallback回调必须设置mandatory=true,否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
rabbitTemplate.setMandatory(Boolean.TRUE);
// 设置序列化机制
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
// 消息由投递到Exchange中时触发的回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) ->
logger.info("消息发送到Exchange情况反馈:唯一标识:correlationData={},消息确认:ack={},原因:cause={}",
correlationData, ack, cause)
);
// 消息由Exchange发送到Queue时失败触发的回调
rabbitTemplate.setReturnsCallback((returnedMessage) -> {
// 如果是插件形式实现的延时队列,则直接返回
// 原因: 因为发送方确实没有投递到队列上,只是在交换器上暂存,等过期时间到了 才会发往队列,从而实现延时队列的操作
if (Constants.HORSE_PLUGIN_EXCHANGE.equals(returnedMessage.getExchange())) {
return;
}
logger.warn("消息由Exchange发送到Queue时失败:message={},replyCode={},replyText={},exchange={},rountingKey={}",
returnedMessage.getMessage(), returnedMessage.getReplyText(), returnedMessage.getReplyText(),
returnedMessage.getExchange(), returnedMessage.getRoutingKey());
});
return rabbitTemplate;
}
/
@Bean
public Queue horseQueue() {
return new Queue(Constants.HORSE_SIMPLE_QUEUE, Boolean.TRUE);
}
@Bean
public DirectExchange horseExchange() {
return new DirectExchange(Constants.HORSE_SIMPLE_EXCHANGE, Boolean.TRUE, Boolean.FALSE);
}
@Bean
public Binding horseBinding() {
return BindingBuilder.bind(horseQueue()).to(horseExchange()).with(Constants.HORSE_SIMPLE_KEY);
}
}
5. 定义消息监听器
基于 @RabbitListenerzi注解,实现自定义消息监听器。主要有两种实现方式:
- 如果在配置类中声明了Queue、Excehange以及他们直接的绑定,这里直接指定队列进行消息监听
- 如果前面什么也没做,这里可以直接用注解的方式进行绑定实现消息监听
package com.topsun.rabbit;
import com.rabbitmq.client.Channel;
import com.topsun.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class MsgListener {
private static Logger logger = LoggerFactory.getLogger(MsgListener.class);
@RabbitListenerzi(queues = Constants.HORSE_SIMPLE_QUEUE)
public void customListener(Message message, Channel channel, String msg) {
// 获取每条消息唯一标识(用于手动ACK确认)
long tag = message.getMessageProperties().getDeliveryTag();
try {
logger.info(" ==> customListener接收" + msg);
// 手动ACK确认
channel.basicAck(tag, false);
} catch (IOException e) {
logger.error(" ==> 消息接收失败: {}", tag);
}
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = Constants.HORSE_ANNOTATION_QUEUE, durable = "true"),
exchange = @Exchange(value = Constants.HORSE_ANNOTATION_EXCHANGE, ignoreDeclarationExceptions = "true"),
key = {Constants.HORSE_ANNOTATION_KEY}
))
public void annotationListener(Message message, Channel channel, String msg) {
// 获取每条消息唯一标识(用于手动ACK确认)
long tag = message.getMessageProperties().getDeliveryTag();
try {
logger.info(" ==> annotationListener接收" + msg);
// 手动ACK确认
channel.basicAck(tag, false);
} catch (IOException e) {
logger.error(" ==> 消息接收失败: {}", tag);
}
}
}
6. 测试接口
这里发送100条消息:
- 奇数条到非注解方式的消息监听器
- 偶数条到注解式消息监听器
@GetMapping("/rabbit")
public void sendMsg() {
for (int i = 1; i <= 100; i++) {
String msg = "第" + i + "条消息";
logger.info("==> 发送" + msg);
if (i % 2 == 1) {
rabbitTemplate.convertAndSend(Constants.HORSE_SIMPLE_EXCHANGE, Constants.HORSE_SIMPLE_KEY, msg, new CorrelationData(String.valueOf(i)));
} else {
rabbitTemplate.convertAndSend(Constants.HORSE_ANNOTATION_EXCHANGE, Constants.HORSE_ANNOTATION_KEY, msg, new CorrelationData(String.valueOf(i)));
}
}
}
结果:自行测试过,非常成功:smile::smile::smile:
延时消息队列
原理:生产者生产一条延时消息,根据需要延时时间的不同,利用不同的routingkey将消息路由到不同的延时队列,每个队列都设置了不同的TTL属性,并绑定在同一个死信交换机中,消息过期后,根据routingkey的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理即可。
7. 配置绑定相关信息
@Configuration
public class DelayRabbitConfig {
private static Logger logger = LoggerFactory.getLogger(DelayRabbitConfig.class);
@Bean
public DirectExchange delayExchange() {
return new DirectExchange(Constants.HORSE_DELAY_EXCHANGE, Boolean.TRUE, Boolean.FALSE);
}
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(Constants.HORSE_DEAD_EXCHANGE, Boolean.TRUE, Boolean.FALSE);
}
@Bean
public Queue delayQueue() {
Map<String, Object> args = new HashMap<>(3);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Constants.HORSE_DEAD_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", Constants.HORSE_DEAD_KEY);
// x-message-ttl 声明队列的TTL(过期时间)
// 可以在这里直接写死,也可以进行动态的设置(推荐动态设置)
// args.put("x-message-ttl", 10000);
return QueueBuilder.durable(Constants.HORSE_DELAY_QUEUE).withArguments(args).build();
}
@Bean
public Queue deadQueue() {
return new Queue(Constants.HORSE_DEAD_QUEUE, Boolean.TRUE);
}
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(Constants.HORSE_DELAY_KEY);
}
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(Constants.HORSE_DEAD_KEY);
}
/
@Bean
public CustomExchange customPluginExchange() {
Map<String, Object> args = new HashMap<>(2);
args.put("x-delayed-type", "direct");
return new CustomExchange(Constants.HORSE_PLUGIN_EXCHANGE, "x-delayed-message", Boolean.TRUE, Boolean.FALSE, args);
}
@Bean
public Binding pluginBinding() {
return BindingBuilder.bind(pluginQueue()).to(customPluginExchange()).with(Constants.HORSE_PLUGIN_KEY).noargs();
}
}
8. 定义延时监听器
@Component
public class DelayMsgListener {
private static Logger logger = LoggerFactory.getLogger(DelayMsgListener.class);
@RabbitListener(queues = Constants.HORSE_DEAD_QUEUE)
public void consumeDeadListener(Message message, Channel channel, String msg) {
long tag = message.getMessageProperties().getDeliveryTag();
try {
logger.info(" ==> consumeDeadListener接收" + msg);
// 手动ACK确认
channel.basicAck(tag, false);
} catch (IOException e) {
logger.error(" ==> 消息接收失败: {}", tag);
}
}
@RabbitListener(queues = Constants.HORSE_PLUGIN_QUEUE)
public void consumePluginListener(Message message, Channel channel, String msg) {
long tag = message.getMessageProperties().getDeliveryTag();
try {
logger.info(" ==> consumePluginListener" + msg);
// 手动ACK确认
channel.basicAck(tag, false);
} catch (IOException e) {
logger.error(" ==> 消息接收失败: {}", tag);
}
}
}
9. 测试接口
// 基于特性的延时队列
@GetMapping("/delay/rabbit")
public void delayMsg(@RequestParam("expire") Long expire) {
for (int i = 1; i <= 10; i++) {
String msg = "第" + i + "条消息";
logger.info("==> 发送" + msg);
// 这里可以动态的设置过期时间
rabbitTemplate.convertAndSend(Constants.HORSE_DELAY_EXCHANGE, Constants.HORSE_DELAY_KEY, msg,
message -> {
message.getMessageProperties().setExpiration(String.valueOf(expire));
return message;
},
new CorrelationData(String.valueOf(i)));
}
}
// 基于插件的延时队列
@GetMapping("/delay/plugin")
public void delayPluginMsg(@RequestParam("expire") Integer expire) {
for (int i = 1; i <= 10; i++) {
String msg = "第" + i + "条消息";
logger.info("==> 发送" + msg);
// 动态设置过期时间
rabbitTemplate.convertAndSend(Constants.HORSE_PLUGIN_EXCHANGE, Constants.HORSE_PLUGIN_KEY, msg, message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setDelay(expire);
return message;
}, new CorrelationData(String.valueOf(i)));
}
}
结果:你懂的:scream_cat::scream_cat::scream_cat:
RabbitMQ的基础使用演示到此结束。
总结
到此这篇关于SpringBoot整合RabbitMQ消息队列的文章就介绍到这了,更多相关SpringBoot整合RabbitMQ消息队列内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!