文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Spring Boot 整合RocketMq实现消息过滤功能

2024-04-02 19:55

关注

简介

消息过滤是指消费者一端在消费消息时,对消息进行选择性过滤,只消费符合过滤条件的消息。 RocketMQ的消息过滤机制大致分为两种:标签过滤和类过滤。其中标签过滤又分为Tag过滤和SQL92过滤。

根据TAG过滤消息

消息发送端只能设置一个tag,消息接收端可以设置多个tag。

生产者

 public void sendTagMessage()
   {
       String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
       for(int i=0;i<10;i++)
       {
           String tag = tags[i % tags.length];
           logger.info("sendTagMessage tag is :{}",tag);
           String msg = "hello, 这是第" + (i + 1) + "条消息";
           org.springframework.messaging.Message<String> msg1 = MessageBuilder.withPayload(msg).build(); 
           rocketMQTemplate.convertAndSend("test-tag-rocketmq" + ":" + tag, msg1);
       }
   }

说明:示例中循环发送了10条消息,每条消息设置了一个tag发送过滤消息的格式为:topic:tag的形式,注意发送端只能设定一个tag。

消费者

@Component
@RocketMQMessageListener(consumerGroup="test-tagrocketmq-group",topic="test-tag-rocketmq",selectorExpression="TagA || TagC || TagD",selectorType=SelectorType.TAG, messageModel = MessageModel.CLUSTERING)
public class TagConsumer implements RocketMQListener<Object>
{
    private Logger logger =LoggerFactory.getLogger(getClass());
    @Override
    public void onMessage(Object o)
    {
        String msg=JSON.toJSONString(o);
        logger.info("send TagA || TagC || TagD  succss content is:{}", msg);
    }
}

说明:

测试结果

从结果我可以看出第2条为TAGC、第7条为TAGC、第8条为TAGD,第3条为TAGD,第5条为TAGA,第0条为TAGA,而消费端监听的TAG为TAGA、TAGC、TAGD所以对于不符合条件的消息进行了过滤。

根据SQL表达式过滤消息

SQL表达式方式可以根据发送消息时输入的属性进行一些计算。

RocketMQ的SQL表达式语法 只定义了一些基本的语法功能。

生产者

   public void sendSQLMessage()
   {
       String msg = "hello, 这是第1条消息";
       org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(msg).build() ;
       Map<String, Object> headers = new HashMap<>() ;
       headers.put("i", 5) ;
       rocketMQTemplate.convertAndSend("test-sql-rocketmq", message, headers);
   }

说明:传递了参数为5进行条件判断。

消费者

@Component
@RocketMQMessageListener(consumerGroup="test-sqlrocketmq-group",topic="test-sql-rocketmq",selectorExpression = "i=5",selectorType=SelectorType.SQL92, messageModel = MessageModel.CLUSTERING)
public class SQLConsumer implements RocketMQListener<MessageExt>
{
    private Logger logger =LoggerFactory.getLogger(getClass());
    @Override
    public void onMessage(MessageExt message)
    {
        String msg=new String(message.getBody());
        String paramStr=JSON.toJSONString(message.getProperties());
        //消息内容
        logger.info("send succss content is:{}", msg);
        //消息参数
        logger.info("send mssage parma is:{}", paramStr);
    }
}

说明:

启动程序报错The broker does not support consumer to filter message by SQL92

原因:默认情况下broke没有开启对SQL语法的支持,需要修改配置

1.打开rocketmq服务下的broke.conf文件,添加如下配置即可。

2.重启broke服务即可.

测试结果

说明:只有满足SQL条件能进行消费。

总结

本文讲解了RocketMQ实现消息过滤,针对不同的业务场景选择合适的方案即可,如果疑问,请随时反馈,

到此这篇关于Spring Boot 整合RocketMq实现消息过滤的文章就介绍到这了,更多相关Spring Boot消息过滤内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯