文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

SpringBoot怎么整合RabbitMQ处理死信队列和延迟队列

2023-06-30 16:55

关注

今天小编给大家分享一下SpringBoot怎么整合RabbitMQ处理死信队列和延迟队列的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。

简介

RabbitMQ消息简介

RabbitMQ的消息默认不会超时。 

什么是死信队列?什么是延迟队列?

死信队列:

DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。

以下几种情况会导致消息变成死信:

延迟队列:

延迟队列用来存放延迟消息。延迟消息:指当消息被发送以后,不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

相关网址

详解RabbitMQ中死信队列和延迟队列的使用详解

实例代码

路由配置

package com.example.config; import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration; @Configurationpublic class RabbitRouterConfig {    public static final String EXCHANGE_TOPIC_WELCOME   = "Exchange@topic.welcome";    public static final String EXCHANGE_FANOUT_UNROUTE  = "Exchange@fanout.unroute";    public static final String EXCHANGE_TOPIC_DELAY     = "Exchange@topic.delay";     public static final String ROUTINGKEY_HELLOS        = "hello.#";    public static final String ROUTINGKEY_DELAY         = "delay.#";     public static final String QUEUE_HELLO              = "Queue@hello";    public static final String QUEUE_HI                 = "Queue@hi";    public static final String QUEUE_UNROUTE            = "Queue@unroute";    public static final String QUEUE_DELAY              = "Queue@delay";     public static final Integer TTL_QUEUE_MESSAGE       = 5000;     @Autowired    AmqpAdmin amqpAdmin;     @Bean    Object initBindingTest() {        amqpAdmin.declareExchange(ExchangeBuilder.fanoutExchange(EXCHANGE_FANOUT_UNROUTE).durable(true).autoDelete().build());        amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_DELAY).durable(true).autoDelete().build());        amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME)                .durable(true)                .autoDelete()                .withArgument("alternate-exchange", EXCHANGE_FANOUT_UNROUTE)                 .build());         amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HI).build());        amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HELLO)                .withArgument("x-dead-letter-exchange", EXCHANGE_TOPIC_DELAY)                .withArgument("x-dead-letter-routing-key", ROUTINGKEY_DELAY)                .withArgument("x-message-ttl", TTL_QUEUE_MESSAGE)                .build());        amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_UNROUTE).build());        amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_DELAY).build());         amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,                EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null));        amqpAdmin.declareBinding(new Binding(QUEUE_UNROUTE, Binding.DestinationType.QUEUE,                EXCHANGE_FANOUT_UNROUTE, "", null));        amqpAdmin.declareBinding(new Binding(QUEUE_DELAY, Binding.DestinationType.QUEUE,                EXCHANGE_TOPIC_DELAY, ROUTINGKEY_DELAY, null));         return new Object();    }}

控制器

package com.example.controller; import com.example.config.RabbitRouterConfig;import com.example.mq.Sender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; @RestControllerpublic class HelloController {    @Autowired    private Sender sender;     @PostMapping("/hi")    public void hi() {        sender.send(RabbitRouterConfig.QUEUE_HI, "hi1 message:" + LocalDateTime.now());    }     @PostMapping("/hello1")    public void hello1() {        sender.send("hello.a", "hello1 message:" + LocalDateTime.now());    }     @PostMapping("/hello2")    public void hello2() {        sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "hello.b", "hello2 message:" + LocalDateTime.now());    }     @PostMapping("/ae")    public void aeTest() {        sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "nonono", "ae message:" + LocalDateTime.now());    }}

发送器

package com.example.mq; import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component; import java.util.Date; @Componentpublic class Sender {    @Autowired    private AmqpTemplate rabbitTemplate;     public void send(String routingKey, String message) {        this.rabbitTemplate.convertAndSend(routingKey, message);    }     public void send(String exchange, String routingKey, String message) {        this.rabbitTemplate.convertAndSend(exchange, routingKey, message);    }}

接收器

package com.example.mq; import com.example.config.RabbitRouterConfig;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component; @Componentpublic class Receiver {    @RabbitListener(queues = RabbitRouterConfig.QUEUE_HI)    public void hi(String payload) {        System.out.println ("Receiver(hi) : "  + payload);    }     // @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO)    // public void hello(String hello) throws InterruptedException {    //     System.out.println ("Receiver(hello) : "  + hello);    //     Thread.sleep(5 * 1000);    //     System.out.println("(hello):sleep over");    // }    //    // @RabbitListener(queues = RabbitRouterConfig.QUEUE_UNROUTE)    // public void unroute(String hello) throws InterruptedException {    //     System.out.println ("Receiver(unroute) : "  + hello);    //     Thread.sleep(5 * 1000);    //     System.out.println("(unroute):sleep over");    // }     @RabbitListener(queues = RabbitRouterConfig.QUEUE_DELAY)    public void delay(String hello) throws InterruptedException {        System.out.println ("Receiver(delay) : "  + hello);        Thread.sleep(5 * 1000);        System.out.println("(delay):sleep over");    }}

application.yml

server:#  port: 9100  port: 9101spring:  application:#    name: demo-rabbitmq-sender    name: demo-rabbitmq-receiver  rabbitmq:    host: localhost    port: 5672    username: admin    password: 123456#    virtualHost: /    publisher-confirms: true    publisher-returns: true#    listener:#      simple:#        acknowledge-mode: manual#      direct:#        acknowledge-mode: manual

实例测试

分别启动发送者和接收者。

访问:http://localhost:9100/hello2

五秒钟后输出:

Receiver(delay) : hello2 message:2020-11-27T09:30:51.548
(delay):sleep over

以上就是“SpringBoot怎么整合RabbitMQ处理死信队列和延迟队列”这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注编程网行业资讯频道。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     801人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     348人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     311人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     432人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     220人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯