文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

SpringBoot中使用RabbitMQ的RPC功能案例分析

2023-06-25 15:19

关注

这篇文章主要讲解了“SpringBoot中使用RabbitMQ的RPC功能案例分析”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“SpringBoot中使用RabbitMQ的RPC功能案例分析”吧!

一、RabbitMQ的RPC简介

实际业务中,有的时候我们还需要等待消费者返回结果给我们,或者是说我们需要消费者上的一个功能、一个方法或是一个接口返回给我们相应的值,而往往大型的系统软件,生产者跟消费者之间都是相互独立的两个系统,部署在两个不同的电脑上,不能通过直接对象.方法的形式获取想要的结果,这时候我们就需要用到RPC(Remote Procedure Call)远程过程调用方式。
RabbitMQ实现RPC的方式很简单,生产者发送一条带有标签(消息ID(correlation_id)+回调队列名称)的消息到发送队列,消费者(也称RPC服务端)从发送队列获取消息并处理业务,解析标签的信息将业务结果发送到指定的回调队列,生产者从回调队列中根据标签的信息获取发送消息的返回结果。

二、SpringBoot中使用RabbitMQ的RPC功能

注意:springboot中使用的时候,correlation_id为系统自动生成的,reply_to在加载AmqpTemplate实例的时候设置的。

实例:
说明:队列1为发送队列,队列2为返回队列

先配置rabbitmq

package com.ws.common;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMQConfig {public static final String TOPIC_QUEUE1 = "topic.queue1";    public static final String TOPIC_QUEUE2 = "topic.queue2";    public static final String TOPIC_EXCHANGE = "topic.exchange";        @Value("${spring.rabbitmq.host}")    private String host;    @Value("${spring.rabbitmq.port}")    private int port;    @Value("${spring.rabbitmq.username}")    private String username;    @Value("${spring.rabbitmq.password}")    private String password;        @Autowired    ConnectionFactory connectionFactory;        @Bean(name = "connectionFactory")    public ConnectionFactory connectionFactory() {    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();    connectionFactory.setHost(host);    connectionFactory.setPort(port);    connectionFactory.setUsername(username);    connectionFactory.setPassword(password);    connectionFactory.setVirtualHost("/");    return connectionFactory;    }        @Bean    public RabbitTemplate rabbitTemplate() {    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);    //设置reply_to(返回队列,只能在这设置)    rabbitTemplate.setReplyAddress(TOPIC_QUEUE2);    rabbitTemplate.setReplyTimeout(60000);    return rabbitTemplate;    }    //返回队列监听器(必须有)    @Bean(name="replyMessageListenerContainer")    public SimpleMessageListenerContainer createReplyListenerContainer() {         SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();         listenerContainer.setConnectionFactory(connectionFactory);         listenerContainer.setQueueNames(TOPIC_QUEUE2);         listenerContainer.setMessageListener(rabbitTemplate());         return listenerContainer;    }            //创建队列    @Bean    public Queue topicQueue1() {        return new Queue(TOPIC_QUEUE1);    }    @Bean    public Queue topicQueue2() {        return new Queue(TOPIC_QUEUE2);    }        //创建交换机    @Bean    public TopicExchange topicExchange() {        return new TopicExchange(TOPIC_EXCHANGE);    }        //交换机与队列进行绑定    @Bean    public Binding topicBinding1() {        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(TOPIC_QUEUE1);    }    @Bean    public Binding topicBinding2() {        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(TOPIC_QUEUE2);    }}

发送消息并同步等待返回值

@Autowiredprivate RabbitTemplate rabbitTemplate;//报文bodyString sss = "报文的内容";//封装MessageMessage msg = this.con(sss);log.info("客户端--------------------"+msg.toString());//使用sendAndReceive方法完成rpc调用Message message=rabbitTemplate.sendAndReceive(RabbitMQConfig.TOPIC_EXCHANGE, RabbitMQConfig.TOPIC_QUEUE1, msg);//提取rpc回应内容bodyString response = new String(message.getBody());log.info("回应:" + response);log.info("rpc完成---------------------------------------------");public Message con(String s) {MessageProperties mp = new MessageProperties();byte[] src = s.getBytes(Charset.forName("UTF-8"));//mp.setReplyTo("adsdas");   加载AmqpTemplate时设置,这里设置没用//mp.setCorrelationId("2222");   系统生成,这里设置没用mp.setContentType("application/json");mp.setContentEncoding("UTF-8");mp.setContentLength((long)s.length());return new Message(src, mp);}

写消费者

package com.ws.listener.mq;import java.nio.charset.Charset;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import com.ws.common.RabbitMQConfig;import lombok.extern.slf4j.Slf4j;@Slf4j@Componentpublic class Receiver {@Autowiredprivate RabbitTemplate rabbitTemplate;@RabbitListener(queues=RabbitMQConfig.TOPIC_QUEUE1)public void receiveTopic1(Message msg) {log.info("队列1:"+msg.toString());String msgBody = new String(msg.getBody());//数据处理,返回的MessageMessage repMsg = con(msgBody+"返回了", msg.getMessageProperties().getCorrelationId());rabbitTemplate.send(RabbitMQConfig.TOPIC_EXCHANGE, RabbitMQConfig.TOPIC_QUEUE2, repMsg);    }@RabbitListener(queues=RabbitMQConfig.TOPIC_QUEUE2)public void receiveTopic2(Message msg) {log.info("队列2:"+msg.toString());    }public Message con(String s, String id) {MessageProperties mp = new MessageProperties();byte[] src = s.getBytes(Charset.forName("UTF-8"));mp.setContentType("application/json");mp.setContentEncoding("UTF-8");mp.setCorrelationId(id);return new Message(src, mp);} }

日志打印:

2019-06-26 17:11:16.607 [http-nio-8080-exec-4] INFO com.ws.controller.UserController - 客户端--------------------(Body:‘报文的内容' MessageProperties [headers={}, contentType=application/json, contentEncoding=UTF-8, contentLength=5, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])

2019-06-26 17:11:16.618 [SimpleAsyncTaskExecutor-1] INFO com.ws.listener.mq.Receiver - 队列1:(Body:‘报文的内容' MessageProperties [headers={}, correlationId=1, replyTo=topic.queue2, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=topic.queue1, deliveryTag=1, consumerTag=amq.ctag-8IzlhblYmTebqUYd-uferw, consumerQueue=topic.queue1])

2019-06-26 17:11:16.623 [http-nio-8080-exec-4] INFO com.ws.controller.UserController - 回应:报文的内容返回了

2019-06-26 17:11:16.623 [http-nio-8080-exec-4] INFO com.ws.controller.UserController - rpc完成---------------------------------------------

感谢各位的阅读,以上就是“SpringBoot中使用RabbitMQ的RPC功能案例分析”的内容了,经过本文的学习后,相信大家对SpringBoot中使用RabbitMQ的RPC功能案例分析这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是编程网,小编将为大家推送更多相关知识点的文章,欢迎关注!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯