文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

RocketMQ消息回溯实践与解析

2024-11-29 19:42

关注

由于短信服务是直接通过RocketMQ触发,因此在修复这些数据的时候,小A犯了难,于是就有了以下对话

领导:小A呀,这数据这么多,你准备怎么修呀?

小A:头大呀领导,一般业务我们都有一个本地消息表来做幂等,我只需要把数据库表的状态重置,然后把数据捞出来重新循环执行就可以啦,但是短信服务我们没有本地表呀!

领导:那你有什么想法吗?

小A:简单的话,那就让上游重发吧,我们再消费一遍就好了。

领导:这样问题就更严重了呀,你想,上游重发一遍,那是不是所有的消费者组都要重新消费一遍,到时候其他业务同学就要来找你了。

小A:那就不好办了。。。

领导:其实RocketMQ有专门的消息回溯的能力,你可以试试

小A:这么神奇?我研究研究。。。

2 验证

2.1 生产者启动

准备一个新的topic,并发送1W条消息

public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 10000; i++) {
            try {
                Message msg = new Message("TopicTest" ,
                    "TagA" ,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) 
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }

2.2 消费者启动

准备一个新的消费者组,消费topic下数据并记录总条数

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.subscribe("TopicTest", "*");
    
    final AtomicInteger count = new AtomicInteger();
    
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List msgs,
            ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            count.incrementAndGet();
            System.out.printf("%s Receive New Messages End: %s %n", Thread.currentThread().getName(), msgs);
            System.out.println(count.get());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    consumer.start();
}

消费者消息记录

2.3 执行回溯

命令行执行

mqadmin.cmd resetOffsetByTime -n 127.0.0.1:9876 -t TopicTest -g please_rename_unique_group_name_4 -s 1722240069000

以下为mqadmin.cmd的内容,因此也可以直接通过调用MQAdminStartup的main方法执行

MQAdminStartup手动执行

代码执行:

public static void main(String[] args) {
    String[] params = new String[]{"resetOffsetByTime","-n","127.0.0.1:9876","-t", "TopicTest", "-g", "please_rename_unique_group_name_4", "-s", "1722240069000"};
    MQAdminStartup.main(params);
}

2.4 结果验证

客户端重置成功记录

消费者重新消费记录

2.5 验证小结

从结果上来看,消费者offset被重置到了指定的时间戳位置,由于指定时间戳早于最早消息的创建时间,因此重新消费了所有未被删除的消息。

那rocketmq究竟做了什么呢?

2.5.1 分析参数

动作标识:resetOffsetByTime

额外参数:

-n nameserver的地址

-t 指定topic名称

-g 指定消费者组名称

-s 指定回溯时间

2.5.2 思考

消息回溯思考

3 分析

以下源码部分均出自4.2.0版本,展示代码有所精简。

3.1 策略模式,解析命令行

org.apache.rocketmq.tools.command.MQAdminStartup#main


SubCommand cmd = findSubCommand(args[0]);

Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),
                            new PosixParser());
                            

cmd.execute(commandLine, options, rpcHook);

3.2 创建客户端,与服务端交互

org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand#execute

public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
    DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
    
    String group = commandLine.getOptionValue("g").trim();//消费者组
    String topic = commandLine.getOptionValue("t").trim();//主题
    String timeStampStr = commandLine.getOptionValue("s").trim();//重置时间戳
    long timestamp = timeStampStr.equals("now") ? System.currentTimeMillis() : Long.parseLong(timeStampStr);//重置时间戳
    boolean isC = false;//是否C客户端
    boolean force = true;//是否强制重置,这里提前解释一下,有可能时间戳对应的offset比当前消费进度要大,强制的话会出现部分消息消费不到
    if (commandLine.hasOption('f')) {
        force = Boolean.valueOf(commandLine.getOptionValue("f").trim());
    }

    
    defaultMQAdminExt.start();
    
    Map offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, group, timestamp, force, isC);
}

3.3 获取topic对应的broker地址,提交重置请求

org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetByTimestamp

public Map resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
    boolean isC)
    throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    
    TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
    
    List brokerDatas = topicRouteData.getBrokerDatas();
    Map allOffsetTable = new HashMap();
    if (brokerDatas != null) {
        for (BrokerData brokerData : brokerDatas) {
            String addr = brokerData.selectBrokerAddr();
            if (addr != null) {
                
                Map offsetTable =
                    this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce,
                        timeoutMillis, isC);
                if (offsetTable != null) {
                    allOffsetTable.putAll(offsetTable);
                }
            }
        }
    }
    return allOffsetTable;
}

3.4 与 nameserver交互获取broker地址

org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#examineTopicRouteInfo

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
    boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
    GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
    requestHeader.setTopic(topic);
 
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);

    
    RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
    byte[] body = response.getBody();
    if (body != null) {
        return TopicRouteData.decode(body, TopicRouteData.class);
    }
}

3.4.1 nameserver收到请求,获取路由信息并返回

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetRouteInfoRequestHeader requestHeader =
        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

    
    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
 byte[] content = topicRouteData.encode();
    response.setBody(content);
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

3.4.2 RouteInfoManager的核心属性

//topic路由信息,根据这个做负载均衡,QueueData里面记录brokerName
private final HashMap> topicQueueTable;
//broke基本信息 名称  所在集群信息   主备broke地址  brokerId=0表示master   >0表示slave
private final HashMap brokerAddrTable;
//集群信息,包含集群所有的broke信息
private final HashMap> clusterAddrTable;
//存活的broke信息,以及对应的channel
private final HashMap brokerLiveTable;
//broke的过滤类信息
private final HashMap> filterServerTable;

3.5 与broker交互,执行重置操作

org.apache.rocketmq.client.impl.MQClientAPIImpl#invokeBrokerToResetOffset

public Map invokeBrokerToResetOffset(final String addr, final String topic, final String group,
    final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
    throws RemotingException, MQClientException, InterruptedException {
    
    ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
    requestHeader.setTopic(topic);
    requestHeader.setGroup(group);
    requestHeader.setTimestamp(timestamp);
    requestHeader.setForce(isForce);

    
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);
    if (isC) {
        request.setLanguage(LanguageCode.CPP);
    }
 
    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
    if (response.getBody() != null) {
        ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
        return body.getOffsetTable();
    }
}

broker收到请求,开始处理;

org.apache.rocketmq.broker.client.net.Broker2Client#resetOffset

public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
    boolean isC) {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);

    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);

    
    Map offsetTable = new HashMap();

    for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
        MessageQueue mq = new MessageQueue();
        mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
        mq.setTopic(topic);
        mq.setQueueId(i);

        
        long consumerOffset =
            this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);//消费者组当前已经消费的offset
        if (-1 == consumerOffset) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(String.format("THe consumer group <%s> not exist", group));
            return response;
        }

        long timeStampOffset;
        if (timeStamp == -1) {
   //没有指定表示当前队列最大的offset
            timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
        } else {
            //根据时间戳查到队列下对应的offset
            timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
        }

        if (timeStampOffset < 0) {
            //<0表示消息已经被删掉了
            log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
            timeStampOffset = 0;
        }

        
        if (isForce || timeStampOffset < consumerOffset) {
            offsetTable.put(mq, timeStampOffset);
        } else {
            offsetTable.put(mq, consumerOffset);
        }
    }

    
    ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
    requestHeader.setTopic(topic);
    requestHeader.setGroup(group);
    requestHeader.setTimestamp(timeStamp);
    RemotingCommand request =
        RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
    if (isC) {
        // c++ language
        ResetOffsetBodyForC body = new ResetOffsetBodyForC();
        List offsetList = convertOffsetTable2OffsetList(offsetTable);
        body.setOffsetTable(offsetList);
        request.setBody(body.encode());
    } else {
        // other language
        ResetOffsetBody body = new ResetOffsetBody();
        body.setOffsetTable(offsetTable);
        request.setBody(body.encode());
    }

    
    ConsumerGroupInfo consumerGroupInfo =
        this.brokerController.getConsumerManager().getConsumerGroupInfo(group);

    if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
        //获取长连接channel
        ConcurrentMap channelInfoTable =
            consumerGroupInfo.getChannelInfoTable();
        for (Map.Entry entry : channelInfoTable.entrySet()) {
            int version = entry.getValue().getVersion();
            
            if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
                try {
                    
                    this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
                    log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
                        topic, group, entry.getValue().getClientId());
                } catch (Exception e) {
                    log.error("[reset-offset] reset offset exception. topic={}, group={}",
                        new Object[] {topic, group}, e);
                }
            } else {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("the client does not support this feature. versinotallow="
                    + MQVersion.getVersionDesc(version));
                log.warn("[reset-offset] the client does not support this feature. versinotallow={}",
                    RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
                return response;
            }
        }
    } else {
        String errorInfo =
            String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
                requestHeader.getGroup(),
                requestHeader.getTopic(),
                requestHeader.getTimestamp());
        log.error(errorInfo);
        response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
        response.setRemark(errorInfo);
        return response;
    }
    response.setCode(ResponseCode.SUCCESS);
    ResetOffsetBody resBody = new ResetOffsetBody();
    resBody.setOffsetTable(offsetTable);
    response.setBody(resBody.encode());
    return response;
}

3.6 消费客户端收到请求,开始处理

org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset

public void resetOffset(String topic, String group, Map offsetTable) {
    DefaultMQPushConsumerImpl consumer = null;
    try {
        
        MQConsumerInner impl = this.consumerTable.get(group);
        if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
            consumer = (DefaultMQPushConsumerImpl) impl;
        } else {
            //由于PullConsumer消费进度自己控制,因此直接返回
            log.info("[reset-offset] consumer dose not exist. group={}", group);
            return;
        }
        
        consumer.suspend();//暂停消费

        
        ConcurrentMap processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
        for (Map.Entry entry : processQueueTable.entrySet()) {
            MessageQueue mq = entry.getKey();
            if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
                ProcessQueue pq = entry.getValue();
                pq.setDropped(true);
                pq.clear();
            }
        }
  
        
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
        }

        Iterator iterator = processQueueTable.keySet().iterator();
        while (iterator.hasNext()) {
            MessageQueue mq = iterator.next();
            //获取messagequeue应该被重置的offset
            Long offset = offsetTable.get(mq);
            if (topic.equals(mq.getTopic()) && offset != null) {
                try {
                    
                    consumer.updateConsumeOffset(mq, offset);
                    consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
                    iterator.remove();
                } catch (Exception e) {
                    log.warn("reset offset failed. group={}, {}", group, mq, e);
                }
            }
        }
    } finally {
        if (consumer != null) {
            
            consumer.resume();
        }
    }
}

4 核心流程

消息回溯全流程

5 总结

消息回溯功能是 RocketMQ 提供给业务方的定心丸,业务在出现任何无法恢复的问题后,都可以及时通过消息回溯来恢复业务或者订正数据。特别是在流或者批计算的场景,重跑数据往往是常态。

RocketMQ 能实现消息回溯功能得益于其简单的位点管理机制,可以很容易通过 mqadmin 工具重置位点。但要注意,由于topic的消息实际都是存储在broker上,且有一定的删除机制,因此首先要确认需要消息回溯的集群broker不能下线节点或者回溯数据被删除之前的时间点,确保消息不会丢失。

6 延申

通过消息回溯的功能,我们可以任意向前或者向后拨动offset,那当我们想要指定一个区间进行消费,这个时候怎么办呢。比如当消费进度过慢,我们选择向后拨动offset,那就会有一部分未消费的消息出现,针对这部分消息,我们应该在空余时间把他消费完成,就需要指定区间来消费了。

其实通过上面代码org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset中我们可以看到,对于DefaultMQPullConsumerImpl类型的消费者,消息重置是不生效的,这是因为DefaultMQPullConsumerImpl的消费进度完全由消费者来控制,那我们就可以采用拉模式来进行消费。

示例代码:

public class PullConsumerLocalTest {
    private static final Map OFFSE_TABLE = new HashMap();
    private static final Map> QUEUE_OFFSE_SECTION_TABLE = new HashMap<>();
    private static final Long MIN_TIMESTAMP = 1722240069000L;//最小时间戳
    private static final Long MAX_TIMESTAMP = 1722240160000L;//最大时间戳

    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();

        
        String topic = "TopicTest";
        init(consumer, topic);

        Set mqs = consumer.fetchSubscribeMessageQueues(topic);
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            //check max offset and dosomething...
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        consumer.shutdown();
    }

    private static void init(DefaultMQPullConsumer consumer, String topic) throws MQClientException {
        Set mqs = consumer.fetchSubscribeMessageQueues(topic);
        for (MessageQueue mq : mqs) {
            long minOffset = consumer.searchOffset(mq, MIN_TIMESTAMP);
            long maxOffset = consumer.searchOffset(mq, MAX_TIMESTAMP);
            //记录区间内范围内最小以及最大的offset
            QUEUE_OFFSE_SECTION_TABLE.put(mq, new Pair<>(minOffset, maxOffset));
            //将最小offset写为下次消费的初始offset
            OFFSE_TABLE.put(mq, minOffset);
        }
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null)
            return offset;

        return 0;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSE_TABLE.put(mq, offset);
    }

}

7 对比

方式

优点

缺点

消费者本地消息表

业务完全可控

额外存储开销,重复消费需要单独开发

消息重置

无需业务修改,支持广播/集群,顺序/无序消息(有幂等操作的需要重置状态)

低版本3.0.7之前不支持

pull手动控制

消费进度完全可控

需要考虑offset维护,复杂度较高

来源:转转技术内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯