文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Java Spring boot 整合RabbitMQ(五):主题(Topics)-B2B2C小程序电子商务

2023-06-05 03:39

关注

在我们的日志系统中,我们不只希望订阅基于严重程度的日志,同时还希望订阅基于发送来源的日志。Unix 工具 syslog 就是同时基于严重程度 -severity (info/warn/crit…) 和 设备 -facility (auth/cron/kern…) 来路由日志的。

如果这样的话,将会给予我们非常大的灵活性,我们既可以监听来源于”cron” 的严重程度为”critical errors” 的日志,也可以监听来源于”kern” 的所有日志。

为了实现这个目的,接下来我们学习如何使用另一种更复杂的交换器 ——topic exchange。

Topic exchange

topic exchange 与 direct exchange 类似,也是将消息路由到 binding key 与 routing key 相匹配的 Queue 中,但这里的匹配规则有些不同,它约定:

routing key 为一个句点号. 分隔的字符串(我们将被句点号. 分隔开的每一段独立的字符串称为一个单词),如 “stock.usd.nyse”、”nyse.vmw”、”quick.orange.rabbit”

binding key 与 routing key 一样也是句点号. 分隔的字符串

binding key 中可以存在两种特殊字符 * 与#,用于做模糊匹配,其中 * 用于匹配一个单词,#用于匹配多个单词(可以是零个)

这个例子里,我们发送的所有消息都是用来描述小动物的。发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个。分割开。路由键里的第一个单词描述的是动物的速度,第二个单词是动物的颜色,第三个是动物的种类。所以它看起来是这样的:” 速度。颜色。种类”。

我们创建了三个绑定:Q1 的 binding key 为”.orange.“,Q2 的 binding key 为”…rabbit” 和”lazy.#”。

这三个 binding key 被可以总结为:

Q1 对所有的桔黄色动物都感兴趣。

Q2 则是对所有的兔子和所有懒惰的动物感兴趣。

以上图中的配置为例:

routingKey=”quick.orange.rabbit” 的消息会同时路由到 Q1 与 Q2

routingKey=”lazy.orange.fox” 的消息会路由到 Q1 与 Q2

routingKey=”lazy.brown.fox” 的消息会路由到 Q2

routingKey=”lazy.pink.rabbit” 的消息会路由到 Q2(只会投递给 Q2 一次,虽然这个 routingKey 与 Q2 的两个 bindingKey 都匹配)

routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit” 的消息将会被丢弃,因为它们没有匹配任何 bindingKey

如果我们违反约定,发送了一个 routing key 为一个单词或者四个单词(”orange” or “quick.orange.male.rabbit”)的消息时,该消息不会投递给任何一个队列,而且会丢失掉。

了解springcloud架构可以加求求:三五三六二四七二五九

但是,即使”lazy.orange.male.rabbit” 有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。

topic exchange

topic exchange 是强大的,它可以表现出跟其他 exchange 类似的行为。

当一个队列的 binding key 为 “#”(井号) 的时候,它会接收所有消息,而不考虑 routing key,就像 fanout exchange。

当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时 topic exchange 会表现得像 direct exchange 一样。

代码整合

生产者

public class Tut5Sender {   @Autowird    private AmqpTemplate template;       @Autowird    private TopicExchange topic;    private int index;    private int count;    private final String[] keys = {"quick.orange.rabbit",            "lazy.orange.elephant", "quick.orange.fox",            "lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};    @Scheduled(fixedDelay = 1000, initialDelay = 500)    public void send() {        StringBuilder builder = new StringBuilder("Hello to ");        if (++this.index == keys.length) {            this.index = 0;        }        String key = keys[this.index];        builder.append(key).append(' ');        builder.append(Integer.toString(++this.count));        String message = builder.toString();        template.convertAndSend(topic.getName(), key, message);        System.out.println(" [x] Sent '" + message + "'");    }}

消费者

public class Tut5Receiver {    @RabbitListener(queues = "#{autoDeleteQueue1}")    public void receiver1(String in) throws InterruptedException {        receive(in, 1);    }    @RabbitListener(queues = "#{autoDeleteQueue2}")    public void receiver2(String in) throws InterruptedException {        receive(in, 2);    }    private void receive(String in, int instance) throws InterruptedException {        StopWatch watch = new StopWatch();        watch.start();        System.out.println("instance " + instance + " [x] Received '" + in + "'");        doWork(in);        watch.stop();        System.out.println("instance " + instance + " [x] Done in "                + watch.getTotalTimeSeconds() + "s");    }    private void doWork(String in) throws InterruptedException {        for (char c : in.toCharArray()) {            if (c == '.') {                Thread.sleep(1000);            }        }    }}

配置类

@Profile({"tut5", "topics"})@Configurationpublic class Tut5Config {    @Bean    public TopicExchange topic() {        return new TopicExchange("tut.topic");    }    @Profile("receiver")    private static class ReceiverConfig {        @Bean        public Queue autoDeleteQueue1() {            return new AnonymousQueue();        }        @Bean        public Queue autoDeleteQueue2() {            return new AnonymousQueue();        }        @Bean        public Binding binding1a(TopicExchange topic, Queue autoDeleteQueue1) {            return BindingBuilder.bind(autoDeleteQueue1)                    .to(topic)                    .with("*.orange.*");        }        @Bean        public Binding binding2a(TopicExchange topic, Queue autoDeleteQueue2) {            return BindingBuilder.bind(autoDeleteQueue2)                    .to(topic)                    .with("*.*.rabbit");        }        @Bean        public Binding binding2b(TopicExchange topic, Queue autoDeleteQueue2) {            return BindingBuilder.bind(autoDeleteQueue2)                    .to(topic)                    .with("lazy.#");        }        @Bean        public Tut5Receiver receiver() {            return new Tut5Receiver();        }    }    @Profile("sender")    @Bean    public Tut5Sender sender() {        return new Tut5Sender();    }}

运行

maven 编译

mvn clean package -Dmaven.test.skip=true

运行

java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut5,receiver --tutorial.client.duration=60000

java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut5,sender --tutorial.client.duration=60000

输出

// Sender

Ready … running for 60000ms

[x] Sent ‘Hello to lazy.orange.elephant 1’

[x] Sent ‘Hello to quick.orange.fox 2’

[x] Sent ‘Hello to lazy.brown.fox 3’

[x] Sent ‘Hello to lazy.pink.rabbit 4’

[x] Sent ‘Hello to quick.brown.fox 5’

[x] Sent ‘Hello to quick.orange.rabbit 6’

[x] Sent ‘Hello to lazy.orange.elephant 7’

[x] Sent ‘Hello to quick.orange.fox 8’

[x] Sent ‘Hello to lazy.brown.fox 9’

[x] Sent ‘Hello to lazy.pink.rabbit 10’

// Receiver

Ready … running for 60000ms

instance 1 [x] Received ‘Hello to lazy.orange.elephant 1’

instance 2 [x] Received ‘Hello to lazy.orange.elephant 1’

instance 2 [x] Done in 2.004s

instance 1 [x] Done in 2.004s

instance 2 [x] Received ‘Hello to lazy.brown.fox 3’

instance 1 [x] Received ‘Hello to quick.orange.fox 2’

instance 1 [x] Done in 2.006s

instance 2 [x] Done in 2.006s

instance 2 [x] Received ‘Hello to lazy.pink.rabbit 4’

instance 1 [x] Received ‘Hello to quick.orange.rabbit 6’

instance 2 [x] Done in 2.006s

instance 2 [x] Received ‘Hello to quick.orange.rabbit 6’

instance 1 [x] Done in 2.007s

instance 1 [x] Received ‘Hello to lazy.orange.elephant 7’

instance 2 [x] Done in 2.006s

instance 2 [x] Received ‘Hello to lazy.orange.elephant 7’

instance 1 [x] Done in 2.003s

instance 1 [x] Received ‘Hello to quick.orange.fox 8’

instance 2 [x] Done in 2.005s

instance 2 [x] Received ‘Hello to lazy.brown.fox 9’

instance 1 [x] Done in 2.005s

instance 2 [x] Done in 2.004s

instance 2 [x] Received ‘Hello to lazy.pink.rabbit 10’

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯