文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

阿里二面:RocketMQ 消费者拉取一批消息,其中部分消费失败了,偏移量怎样更新?

2024-12-13 15:17

关注

最近有读者参加面试时被问了一个问题,如果消费者拉取了一批消息,比如 100 条,第 100 条消息消费成功了,但是第 50 条消费失败,偏移量会怎样更新?就着这个问题,今天来聊一下,如果一批消息有消费失败的情况时,偏移量怎么保存。

1 拉取消息

1.1 封装拉取请求

以 RocketMQ 推模式为例,RocketMQ 消费者启动代码如下:

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context){
try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}

上面的 DefaultMQPushConsumer 是一个推模式的消费者,启动方法是 start。消费者启动后会触发重平衡线程(RebalanceService),这个线程的任务是在死循环中不停地进行重平衡,最终封装拉取消息的请求到 pullRequestQueue。这个过程涉及到的 UML 类图如下:

1.2 处理拉取请求

封装好拉取消息的请求 PullRequest 后,RocketMQ 就会不停地从 pullRequestQueue 获取消息拉取请求进行处理。UML 类图如下:

拉取消息的入口方法是一个死循环,代码如下:

//PullMessageService
public void run(){
log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}

log.info(this.getServiceName() + " service end");
}

这里拉取到消息后,提交给 PullCallback 这个回调函数进行处理。

拉取到的消息首先被 put 到 ProcessQueue 中的 msgTreeMap 上,然后被封装到 ConsumeRequest 这个线程类来处理。把代码精简后,ConsumeRequest 处理逻辑如下:

//ConsumeMessageConcurrentlyService.java
public void run(){
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
try {
//1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
}
if (!processQueue.isDropped()) {
//2.处理消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}

2 处理消费结果

2.1 并发消息

并发消息处理消费结果的代码做精简后如下:

//ConsumeMessageConcurrentlyService.java
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
){
int ackIndex = context.getAckIndex();
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
break;
case RECONSUME_LATER:
break;
default:
break;
}

switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
}
break;
case CLUSTERING:
List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}

if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
}
break;
default:
break;
}

long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}

从上面的代码可以看出,如果处理消息的逻辑是串行的,比如文章开头的代码使用 for 循环来处理消息,那如果在某一条消息处理失败了,直接退出循环,给 ConsumeConcurrentlyContext 的 ackIndex 变量赋值为消息列表中失败消息的位置,这样这条失败消息后面的消息就不再处理了,发送给 Broker 等待重新拉取。代码如下:

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context){
for (int i = 0; i < msgs.size(); i++) {
try{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){
context.setAckIndex(i);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}

消费成功的消息则从 ProcessQueue 中的 msgTreeMap 中移除,并且返回 msgTreeMap 中最小的偏移量(firstKey)去更新。注意:集群模式偏移量保存在 Broker 端,更新偏移量需要发送消息到 Broker,而广播模式偏移量保存在 Consumer 端,只需要更新本地偏移量就可以。

如果处理消息的逻辑是并行的,处理消息失败后给 ackIndex 赋值是没有意义的,因为可能有多条消息失败,给 ackIndex 变量赋值并不准确。最好的方法就是给 ackIndex 赋值 0,整批消息全部重新消费,这样又可能带来冥等问题。

2.2 顺序消息

对于顺序消息,从 msgTreeMap 取出消息后,先要放到 consumingMsgOrderlyTreeMap 上面,更新偏移量时,是从 consumingMsgOrderlyTreeMap 上取最大的消息偏移量(lastKey)。

3 总结

回到开头的问题,如果一批消息按照顺序消费,是不可能出现第 100 条消息消费成功了,但第 50 条消费失败的情况,因为第 50 条消息失败的时候,应该退出循环,不再继续进行消费。

如果是并发消费,如果出现了这种情况,建议是整批消息全部重新消费,也就是给 ackIndex 赋值 0,这样必须考虑冥等问题。

来源:君哥聊技术内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯