文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

RabbitMQ非常实用技巧,动态调整消息并发处理能力

2024-11-29 20:08

关注

1. 简介

RabbitMQ 是一个开源的消息代理和队列服务器,用于通过轻量级和可靠的消息传递,在服务器之间进行通信。在Spring Boot项目中我们一般都是通过@RabbitListener进行消息监听。可以通过配置消息监听器并发数来提高系统的消息处理能力。

在实际应用中,根据业务场景的不同,我们可能需要动态调整 RabbitMQ 消息监听的并发数。例如,当RabbitMQ消息积压过多时,这时候我们就可以考虑通过动态调整并发数,以提高消息处理速度;而在系统自身负载过高时,这时候可以考虑通过减少并发数来减轻系统的整体压力。本篇文章将通过具体的示例来展示如何调整运行中消息监听处理器的并发数。

注意:动态调整并发监听数还可以帮助我们更好地控制系统的稳定性和可靠性。通过实时监测系统的负载情况和消息处理速度,我们可以及时发现潜在的问题并进行调整,从而确保系统的正常运行。

2. 实战案例

2.1依赖管理


  org.springframework.boot
  spring-boot-starter-amqp

2.2 配置管理

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtualHost: test
    publisherConfirmType: correlated
    publisherReturns: true
    listener: 
      simple:
        # 手动应答
        acknowledgeMode: manual
        concurrency: 2
        max-concurrency: 2

2.3 创建交换机及队列

通过管理界面创建交换机及队列。

2.4 消息队列准备消息

通过如下接口,先往队列中插入100条消息

@Resource
private RabbitTemplate rabbitTemplate ;


@GetMapping("/send")
public String send() {
  new Thread(() -> {
    for (int i = 0; i < 100; i++) {
      rabbitTemplate.convertAndSend("test.exchange", "akf.a", "message - " + i) ;
    }
  }).start() ;
  return "success" ;
}

图片

2.5 消息监听器

@RabbitListener(queues = "test")
public void listener1(String message) {
  System.out.printf("%s - 接收到消息:%s%n", Thread.currentThread().getName(), message) ;
  try {
    TimeUnit.SECONDS.sleep(2) ;
  } catch (InterruptedException e) {}
}

2.6 测试

测试上面的消息监听器是正常的

图片

2.7 调整并发数

在上一步的测试中我们发现控制台打印的始终是一个线程在执行消息处理。但是在一开始的配置文件中我们将concurrency属性设置的为2,起码这里应该是2个线程交替执行才对,这是为什么呢?

Spring监听RabbitMQ的消息时默认并不是一条一条的从RabbitMQ中去,是一次预期一批数据,这一批消费完后才进行下一批的获取,默认预期250条。而我们向队列中存入的数据才100条,所以控制台中你只能看到一个线程打印,因为你没有足够的消息供其它线程去获取处理。我们可以通过如下配置进行预期数的设置:

spring:
  rabbitmq:
    listener: 
      simple:
        prefetch: 5

重新启动服务,测试如下

图片

2个线程交替执行;接下来该如何实现动态调整并发数呢?

首先,修改消息监听器配置

@RabbitListener(id = "test-queue", queues = "test", ackMode = "AUTO")
public void listener1(String message) {
  // ...
}

id: 这里最好是设置唯一的id值,我们是要通过该id值来获取当前队列的消息监听容器。ackMode: AUTO 这里设置的应答模式,用来覆盖配置文件中的设置。

其次,通过RabbitListenerEndpointRegistry操作

@Resource
private RabbitListenerEndpointRegistry registry ;


@GetMapping("/modify/{count}")
public Object modify(@PathVariable("count") Integer count) {
  // 这里通过id获取对应的队列监听器;所以上面一定要定义唯一的id值
  MessageListenerContainer listenerContainer = registry.getListenerContainer("test-queue") ;
  if (listenerContainer instanceof SimpleMessageListenerContainer container) {
    container.setConcurrentConsumers(count) ;
  }
  return String.format("并发接收消息:%d%n", count) ;
}

最后,测试。

首先将服务启动,控制输出如下(当前只有2个线程处理)

图片

目前只有2个线程。

调用上面的接口修改并发数为3个后,控制台输出。

图片

成功增加了一个消费者线程。

接下来再测试,如果修改的数量大于最大数(spring.rabbitmq.listener.simple.max-concurrency)

图片

控制台抛出如下异常。

图片

不能超过最大数;再看看调小是否可以。

图片

可以动态调小。

我们也可以对消息监听器进行暂停消费和重新启动消息监听,这里就不在演示了,非常简单调用相应start/stop即可。

总结:在 Spring Boot 中动态调整 RabbitMQ 消息监听的并发数是一个重要的优化手段。通过合理设置并发数并根据系统负载情况进行动态调整,我们可以提高消息处理效率、节省系统资源、确保系统的稳定性和可靠性。在实际应用中,我们应该根据具体的业务场景和需求来选择合适的并发数调整策略,以达到最佳的性能和效果。

来源:Spring全家桶实战案例源码内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯