文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

老弟问我,RocketMQ 中的 ProcessQueue 怎么理解?

2024-11-30 17:03

关注

今天来分享 RocketMQ 中一个非常重要又不太好理解的知识点-ProcessQueue。

一句话概括,ProcessQueue 就是 MessageQueue 的消费快照。看下面这张图:

1 ProcessQueue 构建

RocketMQ 客户端启动时,会开启一个 rebalance 线程,代码如下:

//MQClientInstance.java
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
//...
// Start rebalance service
this.rebalanceService.start();
//...
}
}
}

这个线程会不停的做重平衡操作,对 ProcessQueue 进行维护。在重平衡线程类 RebalanceImpl 定义了一个变量 processQueueTable,数据结构如下:

可以看到,在 processQueueTable 这个数据结构上维护了 MessageQueue 和 ProcessQueue 的映射。

下面看一下维护 processQueueTable 的代码:

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;

Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();

if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {
//从processQueueTable上移除
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY://拉模式
break;
case CONSUME_PASSIVELY://推模式
//从processQueueTable上移除
break;
default:
break;
}
}
}
}
//创建ProcessQueue并放到processQueueTable
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
//...
ProcessQueue pq = new ProcessQueue();

long nextOffset = -1L;
try {
nextOffset = this.computePullFromWhereWithException(mq);
} catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}

if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
//封装好processQueueTable后再创建一个PullRequest进行消息拉取
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}

this.dispatchPullRequest(pullRequestList);

return changed;
}

2 拉取消息

上一节中构建 ProcessQueue 后,会再创建一个 PullRequest,这个 PullRequest 封装了 MessageQueue 和 ProcessQueue,创建成功后被放到了 PullMessageService 中的 pullRequestQueue 变量:

//PullMessageService.java
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();

public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}

这里以 RocketMQ 的推模式为例,Consumer 拉取到消息后,会进行如下处理:

  1. 对拉取到的消息根据 TAG 再次
    进行过滤;
  2. 更新 PullRequest 下次拉取的偏移量 nextOffset;
  3. 把拉取的消息封装到 ProcessQueue 的 msgTreeMap(

    放到 msgTreeMap 之前首先要获取到写锁 treeMapLock

    );
  4. 封装 ConsumeRequest 进行消息消费;
  5. 封装消息拉取请求再次进行拉取。

代码如下:

//DefaultMQPushConsumerImpl.java
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
//1. 对拉取到的消息根据 TAG 再次进行过滤
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);

switch (pullResult.getPullStatus()) {
case FOUND:
//2. 更新 PullRequest 下次拉取的偏移量 nextOffset
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
//3. 把拉取的消息封装到 ProcessQueue 的 msgTreeMap
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
//4. 封装 ConsumeRequest 进行消息消费
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
//5. 封装消息拉取请求
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
break;
//...
}
}
}

3 消费消息

在上一节提到过,拉取到消息后,会把消息封装成一个 ConsumeRequest,这个线程类会调用消费者定义的 MessageListener 进行消费处理。看一下源代码:

//ConsumeMessageConcurrentlyService.ConsumeRequest
public void run() {
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}

MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;

try {
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
}//...

if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
}
}

消息消费成功后,会调用 processConsumeResult 方法进行结果处理。对于广播模式,发送失败后不会做重试,相当于把消息丢弃,而对于集群模式,消费失败的消息会发送到 Broker 端等待消费者重新拉取进行重试。

消费结果处理完后,消费成功的消息会从 ProcessQueue 的 msgTreeMap 中移除(需要获取到写锁 treeMapLock),同时从 msgTreeMap 中获取最小的 Offset 来更新对应 MessageQueue 的偏移量。这个逻辑可以参考下面代码:

public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();

switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
break;
//...
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
//...
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
//消费失败的,发送回Broker
boolean result = this.sendMessageBack(msg, context);
//...
}

break;
default:
break;
}
//从msgTreeMap中移除并返回msgTreeMap第一条消息的offset
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}

4 消费者限流

4.1 缓存消息数量

如果消费者缓存的消息数量大于 RocketMQ 配置的阈值(默认 1000),就会触发延迟拉取,而消费者缓存的消息数量就来自 ProcessQueue,看下面代码:

long cachedMessageCount = processQueue.getMsgCount().get();
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}

4.2 缓存的消息大小

如果消费者缓存的消息大小大于 RocketMQ 配置的阈值(默认 100M),就会触发延迟拉取,而消费者缓存的消息大小就来自 ProcessQueue,看下面代码:

long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}

4.3 消息间隔

对于普通消息,如果消费偏移量间隔大于配置的阈值(默认 2000),就会触发延迟拉取,而消息间隔就来自 ProcessQueue,看下面代码:

if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}
}

4.4 获取锁失败

对于顺序消息,如果获取锁失败,也会触发延迟拉取,而判断获取锁是否成功,也是在 ProcessQueue,看下面代码:

if (processQueue.isLocked()) {
//...
} else {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}

5 总结

ProcessQueue 是 MessageQueue 的消费快照,可以协助消费者进行消息拉取、消息消费、更新偏移量、限流。最后,看一下 ProcessQueue 的数据结构:

来源:君哥聊技术内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯