文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

详解RocketMQ消费端如何监听消息

2022-12-15 15:00

关注

前言

上一篇文章中我们主要来看RocketMQ消息消费者是如何启动的,

那他有一个步骤是非常重要的,就是启动消息的监听,通过不断的拉取消息,来实现消息的监听,那具体怎么做,让我们我们跟着源码来学习一下~

流程地图

源码跟踪

这一块的代码比较多,我自己对关键点的一些整理,这个图我画的不是很OK

核心模块(消息拉取)

入口:this.pullMessageService.start();

org.apache.rocketmq.client.impl.consumer.PullMessageService#run

声明一个阻塞队列用来存放 PullRequest 对象

PullRequest 用于消息拉取任务,如果 pullRequestQueue 为空则会阻塞,直到拉取任务被放入

private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();

将 stopped 用volatile来修饰,每次执行的时候都检测stopped的状态,线程只要修改了这个状态,其余线程就会马上知道

protected volatile boolean stopped = false;
@Override
public void run() {
    log.info(this.getServiceName() + " service started");
    // 判断启动状态
    while (!this.isStopped()) {
        try {
            // 取出一个PullRequest对象
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }
    log.info(this.getServiceName() + " service end");
}

PullMessageService 从消息服务器默认拉取32条消息,按消息的偏移量顺序存放在 ProcessQueue 队列

final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());

入口:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

// 获取消费队列快照
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
    log.info("the pull request[{}] is dropped.", pullRequest.toString());
    return;
}
// 设置最后一次拉取时间
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
// 校验状态
this.makeSureStateOK();
private void makeSureStateOK() throws MQClientException {
    if (this.serviceState != ServiceState.RUNNING) {
        throw new MQClientException("The consumer service state not OK, "
            + this.serviceState
            + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
            null);
    }
}

如果消费者状态不正确,则抛出异常,启动定时线程池过段时间回收 PullRequest 对象,以便pullMessageService能及时唤醒并再次执行消息拉取,这个逻辑在多个地方使用到了

public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
    if (!isStopped()) {
        this.scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                PullMessageService.this.executePullRequestImmediately(pullRequest);
            }
        }, timeDelay, TimeUnit.MILLISECONDS);
    } else {
        log.warn("PullMessageServiceScheduledThread has shutdown");
    }
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
    try {
        // 最后将pullRequest放入pullRequestQueue中
        this.pullRequestQueue.put(pullRequest);
    } catch (InterruptedException e) {
        log.error("executePullRequestImmediately pullRequestQueue.put", e);
    }
}

如果触发流量控制,则延迟拉取消息,先将 PullRequest 对象进行回收,以便pullMessageService能及时唤醒并再次执行消息拉取

// 缓存消息条数
long cachedMessageCount = processQueue.getMsgCount().get();
// 缓存消息的大小
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
// 当队列中的消息跳过,超过设置 则延迟拉取消息
if (cachedMessageCount &gt; this.defaultMQPushConsumer.getPullThresholdForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
        log.warn(
            "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
            this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
}
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
        log.warn(
            "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
            this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
}

这里通过查询 subscriptionInner Map容器,利用主题来获取对应的订阅关系,如果没有找到对应的订阅关系,则延迟拉取消息,先将 PullRequest 对象进行回收以便 pullMessageService 能及时唤醒并再次执行消息拉取

protected final ConcurrentMap<String , SubscriptionData> subscriptionInner =
    new ConcurrentHashMap<String, SubscriptionData>();
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    log.warn("find the consumer's subscription failed, {}", pullRequest);
    return;
}

通过消费者启动的模块中,我们知道RocketMQ是根据不同模式,将消息进度存储在不同的地方

广播模式:消息进度存储在本地文件

集群模式:消息进度存储在Broker 服务器上

boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
    // 从内存中读取位置
    commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
    if (commitOffsetValue > 0) {
        commitOffsetEnable = true;
    }
}

入口:org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl

public PullResult pullKernelImpl(
    final MessageQueue mq,
    final String subExpression,
    final String expressionType,
    final long subVersion,
    final long offset,
    final int maxNums,
    final int sysFlag,
    final long commitOffset,
    final long brokerSuspendMaxTimeMillis,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}

我们看到他有非常多的参数

拉取流程

// step 1 通过BrokerName找到对应的Broker
FindBrokerResult findBrokerResult =
    this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
        this.recalculatePullFromWhichNode(mq), false);
// step 2 如果没有找到对应的,则更新路由信息
if (null == findBrokerResult) {
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
    findBrokerResult =
        this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
            this.recalculatePullFromWhichNode(mq), false);
}
// check version
if (!ExpressionType.isTagType(expressionType)
    &amp;&amp; findBrokerResult.getBrokerVersion() &lt; MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
}
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
    brokerAddr,
    requestHeader,
    timeoutMillis,
    communicationMode,
    pullCallback);

因为 CommunicationMode 传递的是ASYNC,我们着重来看一下这个方法

入口: org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync

调用 this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback()

这里我们就先不细看了

拉取消息处理

// 处理pullResult数据
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
    subscriptionData);

主要做了三件事,转换消息格式、设置消息信息、放入msgFoundList

将pullResult 转成 PullResultExt,转换消息格式为List

PullResultExt pullResultExt = (PullResultExt) pullResult;
// 转换消息格式为List
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);

执行消息过滤,匹配符合的tag

if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
    msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
    for (MessageExt msg : msgList) {
        if (msg.getTags() != null) {
            if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                msgListFilterAgain.add(msg);
            }
        }
    }
}

设置消息的transactionId、扩展属性、BrokerName名称,放入List中

for (MessageExt msg : msgListFilterAgain) {
    String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (Boolean.parseBoolean(traFlag)) {
        msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
    }
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
        Long.toString(pullResult.getMinOffset()));
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
        Long.toString(pullResult.getMaxOffset()));
    msg.setBrokerName(mq.getBrokerName());
}
pullResultExt.setMsgFoundList(msgListFilterAgain);

当pullStatus为FOUND,消息进行提交消费的请求

// 获取第一条消息的offset
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());

主要做了两件事,循环读取消息list,存入msgTreeMap和计算此次读取信息偏移量

public boolean putMessage(final List<MessageExt> msgs) {
    boolean dispatchToConsume = false;
    try {
        // 上锁
        this.treeMapLock.writeLock().lockInterruptibly();
        try {
            int validMsgCnt = 0;
            // 循环读取消息list,存入msgTreeMap
            for (MessageExt msg : msgs) {
                MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
                if (null == old) {
                    validMsgCnt++;
                    this.queueOffsetMax = msg.getQueueOffset();
                    msgSize.addAndGet(msg.getBody().length);
                }
            }
            msgCount.addAndGet(validMsgCnt);
            if (!msgTreeMap.isEmpty() && !this.consuming) {
                dispatchToConsume = true;
                this.consuming = true;
            }
            if (!msgs.isEmpty()) {
                // 获取最后一条消息
                MessageExt messageExt = msgs.get(msgs.size() - 1);
                // 获取最大偏移量
                String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
                ...
            }
        } finally {
            this.treeMapLock.writeLock().unlock();
        }
    }
    ...
}
// 提交消费请求,消息提交到内部的线程池
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);

入口:org.apache.rocketmq.client.impl.consumer.ConsumeMessageService#submitConsumeRequest

获取 ConsumeRequest对象,拿到当前主题的监听器

这里拿到的监听器,就是我们在启动消费者的时候所注册的,监听到消息后执行相关的业务逻辑

consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
               ...
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

在这里触发我们在一开始重写的consumeMessage方法,这里msgs用Collections.unmodifiableList进行包装,意思就是不可以修改的,是一个只读的List

ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
    ) {
        int ackIndex = context.getAckIndex();
        if (consumeRequest.getMsgs().isEmpty())
            return;
        switch (status) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            ...
        }
        switch (this.defaultMQPushConsumer.getMessageModel()) {
            ...
            case CLUSTERING:
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }
                // 如果存在失败消息,则过5秒在定时执行
                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);
                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
                ...
        }
        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        // 更新Offset位置  
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore()
            .updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }

入口:

org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestImmediately

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

消息消费进度提交

总结

目前只是将整体的一个消费端监听消息的流程了解清楚,里面还有许多细节需要去推敲~

以上就是详解RocketMQ 消费端如何监听消息的详细内容,更多关于RocketMQ 消费端监听消息的资料请关注编程网其它相关文章!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯