文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

RocketMQ怎么实现请求异步处理

2023-06-19 09:24

关注

这篇文章主要介绍“RocketMQ怎么实现请求异步处理”,在日常操作中,相信很多人在RocketMQ怎么实现请求异步处理问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”RocketMQ怎么实现请求异步处理”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

一、RocketMQ

1、架构图片

RocketMQ怎么实现请求异步处理

2、角色分类

(1)、Broker

RocketMQ 的核心,接收 Producer 发过来的消息、处理 Consumer 的消费消息请求、消息的持 久化存储、服务端过滤功能等 。

(2)、NameServer

消息队列中的状态服务器,集群的各个组件通过它来了解全局的信息 。类似微服务中注册中心的服务注册,发现,下线,上线的概念。

热备份: NamServer可以部署多个,相互之间独立,其他角色同时向多个NameServer 机器上报状态信息。

心跳机制: NameServer 中的 Broker、 Topic等状态信息不会持久存储,都是由各个角色定时上报并存储到内存中,超时不上报的话, NameServer会认为某个机器出故障不可用。

(3)、Producer

消息的生成者,最常用的producer类就是DefaultMQProducer。

(4)、Consumer

消息的消费者,常用Consumer类 DefaultMQPushConsumer 收到消息后自动调用传入的处理方法来处理,实时性高 DefaultMQPullConsumer 用户自主控制 ,灵活性更高。

3、通信机制

(1)、Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向NameServer更新Topic路由信息。

(2)、Producer发送消息时候,需要根据消息的Topic从本地缓存的获取路由信息。如果没有则更新路由信息会从NameServer重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息。

(3)、Consumer消费消息时候,从NameServer获取的路由信息,并再完成客户端的负载均衡后,监听指定消息队列获取消息并进行消费。

二、代码实现案例

1、项目结构图

RocketMQ怎么实现请求异步处理

版本描述

<spring-boot.version>2.1.3.RELEASE</spring-boot.version><rocketmq.version>4.3.0</rocketmq.version>

2、配置文件

rocketmq:  # 生产者配置  producer:    isOnOff: on    # 发送同一类消息的设置为同一个group,保证唯一    groupName: CicadaGroup    # 服务地址    namesrvAddr: 127.0.0.1:9876    # 消息最大长度 默认1024*4(4M)    maxMessageSize: 4096    # 发送消息超时时间,默认3000    sendMsgTimeout: 3000    # 发送消息失败重试次数,默认2    retryTimesWhenSendFailed: 2  # 消费者配置  consumer:    isOnOff: on    # 官方建议:确保同一组中的每个消费者订阅相同的主题。    groupName: CicadaGroup    # 服务地址    namesrvAddr: 127.0.0.1:9876    # 接收该 Topic 下所有 Tag    topics: CicadaTopic~*;    consumeThreadMin: 20    consumeThreadMax: 64    # 设置一次消费消息的条数,默认为1条    consumeMessageBatchMaxSize: 1# 配置 Group  Topic  Tagrocket:  group: rocketGroup  topic: rocketTopic  tag: rocketTag

3、生产者配置

@Configurationpublic class ProducerConfig {    private static final Logger LOG = LoggerFactory.getLogger(ProducerConfig.class) ;    @Value("${rocketmq.producer.groupName}")    private String groupName;    @Value("${rocketmq.producer.namesrvAddr}")    private String namesrvAddr;    @Value("${rocketmq.producer.maxMessageSize}")    private Integer maxMessageSize ;    @Value("${rocketmq.producer.sendMsgTimeout}")    private Integer sendMsgTimeout;    @Value("${rocketmq.producer.retryTimesWhenSendFailed}")    private Integer retryTimesWhenSendFailed;    @Bean    public DefaultMQProducer getRocketMQProducer() {        DefaultMQProducer producer;        producer = new DefaultMQProducer(this.groupName);        producer.setNamesrvAddr(this.namesrvAddr);        //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName        if(this.maxMessageSize!=null){            producer.setMaxMessageSize(this.maxMessageSize);        }        if(this.sendMsgTimeout!=null){            producer.setSendMsgTimeout(this.sendMsgTimeout);        }        //如果发送消息失败,设置重试次数,默认为2次        if(this.retryTimesWhenSendFailed!=null){            producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);        }        try {            producer.start();        } catch (MQClientException e) {            e.printStackTrace();        }        return producer;    }}

4、消费者配置

@Configurationpublic class ConsumerConfig {    private static final Logger LOG = LoggerFactory.getLogger(ConsumerConfig.class) ;    @Value("${rocketmq.consumer.namesrvAddr}")    private String namesrvAddr;    @Value("${rocketmq.consumer.groupName}")    private String groupName;    @Value("${rocketmq.consumer.consumeThreadMin}")    private int consumeThreadMin;    @Value("${rocketmq.consumer.consumeThreadMax}")    private int consumeThreadMax;    @Value("${rocketmq.consumer.topics}")    private String topics;    @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")    private int consumeMessageBatchMaxSize;    @Resource    private RocketMsgListener msgListener;    @Bean    public DefaultMQPushConsumer getRocketMQConsumer(){        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);        consumer.setNamesrvAddr(namesrvAddr);        consumer.setConsumeThreadMin(consumeThreadMin);        consumer.setConsumeThreadMax(consumeThreadMax);        consumer.registerMessageListener(msgListener);        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);        try {            String[] topicTagsArr = topics.split(";");            for (String topicTags : topicTagsArr) {                String[] topicTag = topicTags.split("~");                consumer.subscribe(topicTag[0],topicTag[1]);            }            consumer.start();        }catch (MQClientException e){            e.printStackTrace();        }        return consumer;    }}

5、消息监听配置

@Componentpublic class RocketMsgListener implements MessageListenerConcurrently {    private static final Logger LOG = LoggerFactory.getLogger(RocketMsgListener.class) ;    @Resource    private ParamConfigService paramConfigService ;    @Override    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {        if (CollectionUtils.isEmpty(list)){            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        }        MessageExt messageExt = list.get(0);        LOG.info("接受到的消息为:"+new String(messageExt.getBody()));        int reConsume = messageExt.getReconsumeTimes();        // 消息已经重试了3次,如果不需要再次消费,则返回成功        if(reConsume ==3){            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        }        if(messageExt.getTopic().equals(paramConfigService.rocketTopic)){            String tags = messageExt.getTags() ;            switch (tags){                case "rocketTag":                    LOG.info("开户 tag == >>"+tags);                    break ;                default:                    LOG.info("未匹配到Tag == >>"+tags);                    break;            }        }        // 消息消费成功        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;    }}

6、配置参数绑定

@Servicepublic class ParamConfigService {    @Value("${rocket.group}")    public String rocketGroup ;    @Value("${rocket.topic}")    public String rocketTopic ;    @Value("${rocket.tag}")    public String rocketTag ;}

7、消息发送测试

@Servicepublic class RocketMqServiceImpl implements RocketMqService {    @Resource    private DefaultMQProducer defaultMQProducer;    @Resource    private ParamConfigService paramConfigService ;    @Override    public SendResult openAccountMsg(String msgInfo) {        // 可以不使用Config中的Group        defaultMQProducer.setProducerGroup(paramConfigService.rocketGroup);        SendResult sendResult = null;        try {            Message sendMsg = new Message(paramConfigService.rocketTopic,                                          paramConfigService.rocketTag,                                         "open_account_key", msgInfo.getBytes());            sendResult = defaultMQProducer.send(sendMsg);        } catch (Exception e) {            e.printStackTrace();        }        return sendResult ;    }}

三、项目源码

GitHub·地址https://github.com/cicadasmile/middle-ware-parentGitEE·地址https://gitee.com/cicadasmile/middle-ware-parent

到此,关于“RocketMQ怎么实现请求异步处理”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯