前言
在上一篇文章中,我们讲述了DefaultMQPushConsumer
拉消息的原理,它是通过重平衡触发pullRequest
的创建,通过阻塞队列作为pullRequest
的存储容器,另一端通过定时任务从阻塞队列中取出pullRequest
来向Broker
发送拉消息的请求,无论消息拉取成功还是失败,都会重新把pullRequest
放回阻塞队列中,这样就能保证持续不断地向Broker
拉消息了;
今天这篇文章我们继续讲述DefaultLitePullConsumer
是如何实现消息拉取的;
DefaultLitePullConsumer拉消息代码示例
我们在使用DefaultLitePullConsumer
时都是主动去poll
消息,并不是像DefaultMQPushConsumer
那样设置一个消息监听器:
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(group);
consumer.setNamesrvAddr(nameSrv);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(topic, subExpression);
try {
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
try {
while (true) {
List<MessageExt> messageExts = consumer.poll(5000);
// 处理业务逻辑
System.out.println("消息数量:" + messageExts.size());
System.out.println("消息内容:");
for (MessageExt messageExt : messageExts) {
System.out.println(new String(messageExt.getBody(), StandardCharsets.UTF_8));
}
}
} catch (Exception e) {
e.printStackTrace();
}
一般拿到消息后都会交给业务线程池去处理,上述代码我只简单地打印了一下消息内容;
消息消费
跟着poll()
方法,我们最终定位到DefaultLitePullConsumerImpl.poll()
这个方法:
public synchronized List<MessageExt> poll(long timeout) {
try {
this.checkServiceState();
if (timeout < 0L) {
throw new IllegalArgumentException("Timeout must not be negative");
}
if (this.defaultLitePullConsumer.isAutoCommit()) {
this.maybeAutoCommit();
}
long endTime = System.currentTimeMillis() + timeout;
// 从阻塞队列中取ConsumeRequest
DefaultLitePullConsumerImpl.ConsumeRequest consumeRequest = (DefaultLitePullConsumerImpl.ConsumeRequest)this.consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (endTime - System.currentTimeMillis() > 0L) {
while(consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
consumeRequest = (DefaultLitePullConsumerImpl.ConsumeRequest)this.consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (endTime - System.currentTimeMillis() <= 0L) {
break;
}
}
}
if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
List<MessageExt> messages = consumeRequest.getMessageExts();
long offset = consumeRequest.getProcessQueue().removeMessage(messages);
// 取到消息后直接更新消费点位
this.assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
this.resetTopic(messages);
// 下面是调用consumeMessageHook
if (!this.consumeMessageHookList.isEmpty()) {
ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(this.defaultLitePullConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(this.groupName());
consumeMessageContext.setMq(consumeRequest.getMessageQueue());
consumeMessageContext.setMsgList(messages);
consumeMessageContext.setSuccess(false);
this.executeHookBefore(consumeMessageContext);
// 默认是消费成功
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
this.executeHookAfter(consumeMessageContext);
}
return messages;
}
} catch (InterruptedException var10) {
}
return Collections.emptyList();
}
1.直接从阻塞队列consumeRequestCache
中取出消息对象ConsumeRequest
,这里面就包含了消息内容;
2.取出来后直接更新消费点位,默认为此次消息消费成功;
这里跟DefaultMQPushConsumer
不同的是,DefaultLitePullConsumerImpl.poll()
默认的是消息消费一定成功,如果消费失败的话,需要开发人员自己处理,消费失败的消息不会再次发送给消费者;
那么咱们的疑问就出来了,poll()
方法光顾着从consumeRequestCache
中取消息,那消息是啥时候放进去的呢?
消息拉取入口
我们可以重新了解一下消费者重平衡过程,在MessageQueue
分配完毕后,会对比被分配的MessageQueue
是否和分配前的不一致,大部分情况下是会发生改变的,那么就会触发messageQueueChanged()
方法的调用:
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
// 取出所有的MessageQueueListener
MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
if (messageQueueListener != null) {
try {
// 依次调用messageQueueChanged方法
messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
} catch (Throwable e) {
log.error("messageQueueChanged exception", e);
}
}
}
MessageQueueListener
是什么时候被放进去的呢?可以看一下subscribe()
方法:
public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
try {
if (topic == null || "".equals(topic)) {
throw new IllegalArgumentException("Topic can not be null or empty.");
}
setSubscriptionType(SubscriptionType.SUBSCRIBE);
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
// 每个subscribe()都会设置MessageQueueListener
this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
if (serviceState == ServiceState.RUNNING) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
updateTopicSubscribeInfoWhenSubscriptionChanged();
}
} catch (Exception e) {
throw new MQClientException("subscribe exception", e);
}
}
可以发现,每个subscribe()都会设置MessageQueueListener
,MessageQueueListenerImpl
里面只干了一件事情:更新MessageQueue
并且创建pullTask
;
class MessageQueueListenerImpl implements MessageQueueListener {
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
updateAssignQueueAndStartPullTask(topic, mqAll, mqDivided);
}
}
public void updateAssignQueueAndStartPullTask(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
switch (messageModel) {
case BROADCASTING:
updateAssignedMessageQueue(topic, mqAll);
// 更新拉消息任务
updatePullTask(topic, mqAll);
break;
case CLUSTERING:
updateAssignedMessageQueue(topic, mqDivided);
// 更新拉消息任务
updatePullTask(topic, mqDivided);
break;
default:
break;
}
}
现在终于快找到这个消息拉取的入口了:
private void startPullTask(Collection<MessageQueue> mqSet) {
for (MessageQueue messageQueue : mqSet) {
if (!this.taskTable.containsKey(messageQueue)) {
// 创建消息拉取任务
PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
this.taskTable.put(messageQueue, pullTask);
// 这个就是任务执行的入口
this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
}
}
}
消息拉取的入口寻找起来还是有点困难的,但是主要思路还是从【重平衡】开始,另外就是触发了MessageQueueListener
,此时才会创建pullTask
;
虽然DefaultMQPushConsumer
也是【重平衡】触发pullRequest
的创建,但是它是将pullRequest
放进阻塞队列,另一端由消息拉取任务去取pullRequest
向Broker
发送请求;而DefaultLitePullConsumer
是直接创建pullTask
去拉消息;
PullTaskImpl拉消息
很显然,PullTaskImpl
就是一个Runnable
,那么最重要的就是它的run()
方法,这个方法就是负责从Broker
拉消息并放进consumeRequestCache
阻塞队列中,这样poll()
方法才能从consumeRequestCache
阻塞队列中取到消息;
messageQueue
暂停
if (DefaultLitePullConsumerImpl.this.assignedMessageQueue.isPaused(this.messageQueue)) {
DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 1000L, TimeUnit.MILLISECONDS);
DefaultLitePullConsumerImpl.this.log.debug("Message Queue: {} has been paused!", this.messageQueue);
return;
}
如果messageQueue
处于暂停状态,那么延迟1秒重新执行这个任务;
ProcessQueue
被移除
ProcessQueue processQueue = DefaultLitePullConsumerImpl.this.assignedMessageQueue.getProcessQueue(this.messageQueue);
if (null == processQueue || processQueue.isDropped()) {
DefaultLitePullConsumerImpl.this.log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
return;
}
如果processQueue
不存在或者已经被移除了,那么这个任务也不用执行了;
- 流量控制
if ((long)DefaultLitePullConsumerImpl.this.consumeRequestCache.size() * (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullBatchSize() > DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForAll()) {
DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS);
if (DefaultLitePullConsumerImpl.this.consumeRequestFlowControlTimes++ % 1000L == 0L) {
DefaultLitePullConsumerImpl.this.log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", DefaultLitePullConsumerImpl.this.consumeRequestCache.size(), DefaultLitePullConsumerImpl.this.consumeRequestFlowControlTimes);
}
return;
}
如果consumeRequestCache
中的消息数量超过了PullThresholdForAll
阈值,那么触发限流机制,当前任务将不会继续拉消息,并且50毫秒后才会重新执行该任务;
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / 1048576L;
// 单个processQueue上面消息数量限制
if (cachedMessageCount > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForQueue()) {
DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS);
if (DefaultLitePullConsumerImpl.this.queueFlowControlTimes++ % 1000L == 0L) {
DefaultLitePullConsumerImpl.this.log.warn("The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", new Object[]{DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, DefaultLitePullConsumerImpl.this.queueFlowControlTimes});
}
return;
}
// 单个processQueue中消息总大小限制
if (cachedMessageSizeInMiB > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS);
if (DefaultLitePullConsumerImpl.this.queueFlowControlTimes++ % 1000L == 0L) {
DefaultLitePullConsumerImpl.this.log.warn("The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", new Object[]{DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, DefaultLitePullConsumerImpl.this.queueFlowControlTimes});
}
return;
}
- 如果当前
processQueue
中消息的数量大于PullThresholdForQueue
阈值,也同样触发限流机制,当前任务不再执行,50毫秒后重新执行该任务; - 如果当前
processQueue
中消息的总大小超过PullThresholdSizeForQueue
(单位:MB
)阈值,将触发限流机制,当前任务不再执行,50毫秒后重新执行该任务;
if (processQueue.getMaxSpan() > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getConsumeMaxSpan()) {
DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS);
if (DefaultLitePullConsumerImpl.this.queueMaxSpanFlowControlTimes++ % 1000L == 0L) {
DefaultLitePullConsumerImpl.this.log.warn("The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}", new Object[]{processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), DefaultLitePullConsumerImpl.this.queueMaxSpanFlowControlTimes});
}
return;
}
如果processQueue
中的maxSpan
大于消费者的ConsumeMaxSpan
,也就是第一个消息与最后一个消息的点位偏差大于ConsumeMaxSpan
(默认是2000),将触发限流机制,当前任务不执行,50毫秒后重新执行该任务;
- 计算拉取点位
long offset = 0L;
try {
offset = DefaultLitePullConsumerImpl.this.nextPullOffset(this.messageQueue);
} catch (Exception var17) {
DefaultLitePullConsumerImpl.this.log.error("Failed to get next pull offset", var17);
DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 3000L, TimeUnit.MILLISECONDS);
return;
}
计算消息拉取的点位,如果产生异常,那么简隔3秒后再来重新开始任务;
- 拉消息
PullResult pullResult = DefaultLitePullConsumerImpl.this.pull(this.messageQueue, subscriptionData, offset, DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullBatchSize());
这个就是发请求给Broker
拉消息;
- 放进消息缓存区
switch(pullResult.getPullStatus()) {
case FOUND:
Object objLock = DefaultLitePullConsumerImpl.this.messageQueueLock.fetchLockObject(this.messageQueue);
synchronized(objLock) {
if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && DefaultLitePullConsumerImpl.this.assignedMessageQueue.getSeekOffset(this.messageQueue) == -1L) {
processQueue.putMessage(pullResult.getMsgFoundList());
DefaultLitePullConsumerImpl.this.submitConsumeRequest(DefaultLitePullConsumerImpl.this.new ConsumeRequest(pullResult.getMsgFoundList(), this.messageQueue, processQueue));
}
break;
}
找到消息的情况下,将调用submitConsumeRequest()
方法把消息放进阻塞队列中,等待poll()
方法来消费;
- 重新开启拉取任务
if (!this.isCancelled()) {
DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
} else {
DefaultLitePullConsumerImpl.this.log.warn("The Pull Task is cancelled after doPullTask, {}", this.messageQueue);
}
如果当前任务还没有被取消的话,那么重新开启下一个轮回,准备下一次消息拉取;
以上就是RocketMQ消息拉取过程详解的详细内容,更多关于RocketMQ消息拉取过程的资料请关注编程网其它相关文章!