文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

springboot+rabbitmq如何实现指定消费者才能消费

2023-06-25 12:20

关注

这篇文章将为大家详细讲解有关springboot+rabbitmq如何实现指定消费者才能消费,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

如何保证mq队列里的消息只被测试服务器上的consumer消费,避免本地环境误消费?

程序里有一个应用场景使用到了rabbitmq——当财务确认收到企业的打款金额后,系统会把企业订单生成用户付款单。由于订单记录数据量大,改为通过mq来异步实现。即财务确认收款操作后,将企业订单数据放入mq,另一端监听mq消息队列,将收到的企业订单加工转换成用户付款单,并做持久化。

springboot+rabbitmq如何实现指定消费者才能消费

本地开发环境与测试环境共用一套rabbitmq。当项目部署到测试环境后,QA测试过程中,总是“莫名其妙”的发现所保存的用户付款单数据有问题。

当然,首先要排查程序,检查Consumer的数据处理的逻辑是否有bug。单元测试后,发现并不存在测试环境的bug。

原来,消息队列被“非正常”消费了!

Q: 什么情况?

A: 几个伙伴一起参与的项目,大家总是要调试自己的程序的。而如果碰巧本地程序监听到消息队列里有消息,那么,消息就被本地程序消费掉了。问题正是出现在这里!————团队开发,大家并不会及时检出git上最新的程序版本。如果本地的程序版本不是最新的正确的版本,势必会出现bug。

那么,怎么办?

每次你改了逻辑,告诉大家获取最新?

不现实,约定的东西往往不奏效的。

如何保证mq队列里的消息只被测试服务器上的consumer消费,避免本地环境误消费? 或者说,如何实现消息的定向消费呢?

只要肯琢磨,办法总比困难多!百思可得解!

我们知道,rabbitmq手动ack模式。这还不够,因为我们怎么让consumer来决定是否消费呢? 所以,我们需要一个标识————producer设定一个标识,consumer如果匹配这个标识,则消费,否则予以reject放回消息队列。

springboot+rabbitmq如何实现指定消费者才能消费

通过查看spring-rabbit/spring-amqp的代码,发现可以在spring-amqp里的MessageProperties上做文章。生产者与消费者每次消息传输都会携带一个MessageProperties,通常我们是不指定的,走MessageProperties的默认设置值。

我的策略:MessageProperties有一个属性叫AppId。我们程序所部署的测试机器就一台,即消息Producer和消息Consumer在一台机器上。那么,我就可以利用机器的IP来识别消息。只有Producer与Consumer的IP匹配,才消费消息。程序员本机IP与测试服务器IP不一样,就会拒绝接收消息,会把消息重新放回消息队列,等待测试服务器的Consumer消费。

话不多说,上代码吧,

生产者代码:

package com.sboot.mq;import org.junit.Test;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.amqp.support.converter.SimpleMessageConverter;import org.springframework.beans.factory.annotation.Autowired;import java.net.InetAddress;import java.util.UUID;public class MQProducerTest extends BaseTest {    @Autowired    RabbitTemplate rabbitTemplate;    @Test    public void test() throws Exception {        for (int i = 1; i <= 5; i++) {            MessageProperties messageProperties = new MessageProperties();            String ip = InetAddress.getLocalHost().getHostAddress();            messageProperties.setAppId(ip);//            messageProperties.setUserId(String.valueOf(i));            MessageConverter messageConverter = new SimpleMessageConverter();            String msg = UUID.randomUUID().toString();//            System.out.println(msg);            Message message1 = messageConverter.toMessage(msg, messageProperties);            rabbitTemplate.send(MessageQueueConstant.USER_SETTLEMENT_EXCHANGE, "UserSettlementRouting", message1);            System.out.println("入队完成");            Thread.sleep(500L);        }    }}

消费者手动ACK,要实现ChannelAwareMessageListener接口,感知rabbitmq.client.Channel实例,调用channel的basicAck、basicReject等方法:

package com.sboot.mq;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.amqp.support.converter.SimpleMessageConverter;import org.springframework.context.annotation.Profile;import org.springframework.stereotype.Component;import java.net.InetAddress;@Component@Profile(value = "dev")@Slf4jpublic class UserSettlementDevConsumer implements ChannelAwareMessageListener {    @RabbitHandler    @RabbitListener(queues = MessageQueueConstant.USER_SETTLEMENT_QUEUE, ackMode = "MANUAL")    @Override    public void onMessage(Message message, Channel channel) throws Exception {        Thread.currentThread().setName(UserSettlementDevConsumer.class.getSimpleName() + System.currentTimeMillis());        long tag = message.getMessageProperties().getDeliveryTag();        String appId = message.getMessageProperties().getAppId();        log.info("{}-{}, 消息出队", tag, appId);        String receiveMsg = "";        try {            //核对标识,决定是否消费消息            String ip = InetAddress.getLocalHost().getHostAddress();            if (!ip.equals(appId)) {                log.info("这不是我需要的消息。放回队列。{}", receiveMsg);//                channel.basicNack(tag, false, true);                channel.basicReject(tag, true);//                channel.basicRecover(true);                return;            }            MessageConverter messageConverter = new SimpleMessageConverter();            receiveMsg = String.valueOf(messageConverter.fromMessage(message));            。。。。在这里消费消息            log.info("success " + receiveMsg);            channel.basicAck(tag, false);        } catch (Exception e) {            log.error("receive message has an error, ", e);            channel.basicNack(tag, false, true);        }    }}

说明一下依赖的spring-rabbit包的版本,我的是2.2.0.RELEASE。如果是2.1.4版本里,@RabbitListener注解没有ackMode。

解决本案问题过程中的花絮:

springboot+rabbitmq如何实现指定消费者才能消费

spring-rabbit-2.1.4.RELEASEspring-rabbit-2.2.0.RELEASE

springboot+rabbitmq如何实现指定消费者才能消费

@RabbitListener的ackMode的值见枚举org.springframework.amqp.core.AcknowledgeMode

NONE-- no acks(自动消费 autoAck)MANUAL --Manual acks - user must ack/nack via a channel aware listener.(手动消费,Consumer端必须显式调用ack或nack)AUTO --

springboot+rabbitmq如何实现指定消费者才能消费

设置了手动消费,上文消费端的deliveryTag会是不同的long值。自动消费的deliveryTag是重复的1和2这样的。并且,自动消费时,如果要使用channel的ack/nack,会报异常:

2020-06-19 22:26:54.586 [AMQP Connection 192.168.40.20:5672] ERROR o.s.a.rabbit.connection.CachingConnectionFactory:1468 - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2020-06-19 22:26:54.599 [SimpleAsyncTaskExecutor-1] ERROR c.e.z.r.p.modules.mq.UserSettlementAckConsumer:49 -
org.springframework.amqp.AmqpException: PublisherCallbackChannel is closed
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1092)

关于“springboot+rabbitmq如何实现指定消费者才能消费”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯