文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

SpringBoot整合RabbitMQ保证消息的可靠的投递及消费

2024-11-30 18:17

关注

环境:SpringBoot2.7.9

消息丢失场景

  1. 生产者丢失消息
    生产者发出的数据由于网络原因没有到底MQ Server丢失
  2. MQ Server丢消息
    由于消息队列没有持久化或者是消息没有持久化,在Server重启后消息丢失
  3. 消费者丢消息
    接收到消息后,业务还没有处理完成,服务宕机(当你是自动ACK)。

生产者丢失解决方案

  1. 通过事务(不推荐)
  2. 确认机制(推荐)

这里只讲如何通过确认机制保证生产者不丢失消息

<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>

@Bean
public TopicExchange topicExchange() {
return new TopicExchange("akf.exchange", true, false) ;
}
@Bean
public Queue queue() {
return new Queue("akf.queue", true, false, false) ;
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(topicExchange()).with("akf.#") ;
}

spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtualHost: test
publisherConfirmType: correlated
publisherReturns: true
template:
mandatory: true

注意:spring.rabbitmq.publisher-confirm-type及spring.rabbitmq.publisher-returns 的配置值。

接下来是为RabbitTemplate配置对应的Callback,Publisher确认回调,Publisher返回回调。

  1. 确认回调
    当消息发送到了交换机则ack=true,当消息无法发送到交换机则ack=false。
  2. 返回回调
    当消息能够发送到交换机,但是不能路由到队列则会调用该return回调。

RabbitTemplate是单例的可以通过两种方式配置对应的回调。

  1. 自定义RabbitTemplate。
  2. 通过AWare接口获取RabbitTemplate配置。

这里只讲通过AWare接口配置回调。

@Component
public class ConfigRabbitTemplate implements ApplicationContextAware {

@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class) ;
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("correlation: " + correlationData) ;
if (ack) {
System.out.println("消息发送到交换机") ;
} else {
System.out.println("消息发送失败 - " + ", cause" + cause) ;
}
}
});
rabbitTemplate.setReturnsCallback(new ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println(returned.getExchange() + ", " + returned.getRoutingKey() + ", " + returned.getReplyCode() + ", " + returned.getMessage().toString()) ;
}
});
}

}

使用错误的交换机和错误的路由key分别测试即可以看到上面的输出信息了。

MQ Server丢消息

在通过@Bean声明交换机和队列时设置持久性,在消息上设置持久化。

@Bean
public TopicExchange topicExchange() {
// 这里的第二个参数就是设置是否持久化,如果设置为false,当服务重启交换机将丢失
// 第三个参数是否自动删除,当不再使用该交换机时会自动删除该交换机
return new TopicExchange("akf.exchange", true, false) ;
}
@Bean
public Queue queue() {
// 第二个参数true设置队列是持久化的,当服务重启队列不会丢失
return new Queue("akf.queue", true, false, false) ;
}

设置消息持久化。

Message message = MessageBuilder.withBody("Hello".getBytes())
// 设置消息投递模式为持久化的(默认不设置就是持久化的)
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build() ;

消费者丢消息

关闭自动应答机制。

默认是自动应答,当消息监听方法中没有异常时则正常应答,当发生异常时,在默认情况下会重新入队列(这样就会出现死循环)。

spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtualHost: test
publisherConfirmType: correlated
publisherReturns: true
listener:
simple:
acknowledgeMode: manual #设置为手动应答

消息监听。

@RabbitListener(queues = {"akf.queue"})
public void onMessage(Message message, Channel channel) throws Exception {
try {
System.out.println("接收到消息: " + new String(message.getBody()));
// ... 这里处理我们的业务代码
// 当消费者把消息消费成功,再手动应答RabbitMQ
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 如果发生了异常,我们一般的处理是直接扔掉死信队列,一般这里出现错误都是消息有问题
// 如果消息出现问题,你重试再入队列是无意义的
}
}

消息重试

如果消息消费时出现错误,你又希望能够通过重试来尽可能的处理掉该消息,Spring也提供了相应的重试机制。

修改配置:

spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtualHost: test
publisherConfirmType: correlated
publisherReturns: true
listener:
simple:
acknowledgeMode: auto
concurrency: 1
retry:
# 开启重试
enabled: true
# 延迟1s后开始重试
initialInterval: 1000
# 每次消息重试的间隔乘数
multiplier: 3
# 2次间的重试最大间隔时间
maxInterval: 20000
maxAttempts: 4 #重试4次,1s, 3s, 9s
stateless: true #如果消息处理中存在事务则需要将其设置为false

如果只是做上面的配置,重试指定次数后消息将会被丢弃,这是默认行为。Spring提供了 MessageRecoverer接口来决定消息如何处理。默认Spring提供如下几种实现:

  1. ImmediateRequeueMessageRecoverer
  2. RejectAndDontRequeueRecoverer
  3. RepublishMessageRecoverer

我们只需要定义一个Bean为MessageRecoverer即可,这里我们就用Spring提供的RepublishMessageRecoverer重新发布消息。

@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error") ;
}

这里将消息重新发布一个专门的队列(重试指定次数后)。

来源:今日头条内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯