文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

顺序消息的实现-RocketMQ知识体系(五)

2024-12-03 02:06

关注

我们知道,kafka 如果要保证顺序消费,必须保证消息保存到同一个patition上,而且为了有序性,只能有一个消费者进行消费。这种情况下,Kafka 就退化成了单一队列,毫无并发性可言,极大降低系统性能。那么对于对业务比较友好的RocketMQ 是如何实现的呢?首先,我们循序渐进的来了解下顺序消息的实现。

顺序消息业务使用场景

电商场景中传递订单状态。

同步mysql 的binlong 日志,数据库的操作是有顺序的。

其他消息之间有先后的依赖关系,后一条消息需要依赖于前一条消息的处理结果的情况。

等等。。。

消息中间件中的顺序消息

顺序消息(FIFO 消息)是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型。顺序消息由两个部分组成:顺序发布和顺序消费。

顺序消息包含两种类型:

分区顺序:一个Partition(queue)内所有的消息按照先进先出的顺序进行发布和消费

全局顺序:一个Topic内所有的消息按照先进先出的顺序进行发布和消费.但是全局顺序极大的降低了系统的吞吐量,不符合mq的设计初衷。

那么折中的办法就是选择分区顺序。

【局部顺序消费】

如何保证顺序

在MQ的模型中,顺序需要由3个阶段去保障:

  1. 消息被发送时保持顺序
  2. 消息被存储时保持和发送的顺序一致
  3. 消息被消费时保持和存储的顺序一致

发送时保持顺序意味着对于有顺序要求的消息,用户应该在同一个线程中采用同步的方式发送。存储保持和发送的顺序一致则要求在同一线程中被发送出来的消息A和B,存储时在空间上A一定在B之前。而消费保持和存储一致则要求消息A、B到达Consumer之后必须按照先A后B的顺序被处理。

第一点,消息顺序发送,多线程发送的消息无法保证有序性,因此,需要业务方在发送时,针对同一个业务编号(如同一笔订单)的消息需要保证在一个线程内顺序发送,在上一个消息发送成功后,在进行下一个消息的发送。对应到mq中,消息发送方法就得使用同步发送,异步发送无法保证顺序性。

第二点,消息顺序存储,mq的topic下会存在多个queue,要保证消息的顺序存储,同一个业务编号的消息需要被发送到一个queue中。对应到mq中,需要使用MessageQueueSelector来选择要发送的queue,即对业务编号进行hash,然后根据队列数量对hash值取余,将消息发送到一个queue中。

第三点,消息顺序消费,要保证消息顺序消费,同一个queue就只能被一个消费者所消费,因此对broker中消费队列加锁是无法避免的。同一时刻,一个消费队列只能被一个消费者消费,消费者内部,也只能有一个消费线程来消费该队列。即,同一时刻,一个消费队列只能被一个消费者中的一个线程消费。

RocketMQ中顺序的实现

【Producer端】

Producer端确保消息顺序唯一要做的事情就是将消息路由到特定的分区,在RocketMQ中,通过MessageQueueSelector来实现分区的选择。

  1.  
  2. public interface MessageQueueSelector { 
  3.  
  4.      
  5.     MessageQueue select(final List mqs, final Message msg, final Object arg); 

比如如下实现就可以保证相同的订单的消息被路由到相同的分区:

  1. long orderId = ((Order) object).getOrderId; 
  2. return mqs.get(orderId % mqs.size()); 

【Consumer端】

尝试锁定锁定MessageQueue。

首先我们如何保证一个队列只被一个消费者消费?

消费队列存在于broker端,如果想保证一个队列被一个消费者消费,那么消费者在进行消息拉取消费时就必须向mq服务器申请队列锁,消费者申请队列锁的代码存在于RebalanceService消息队列负载的实现代码中。

消费者重新负载,并且分配完消费队列后,需要向mq服务器发起消息拉取请求,代码实现在RebalanceImpl#updateProcessQueueTableInRebalance中,针对顺序消息的消息拉取,mq做了如下判断:

  1. // 增加 不在processQueueTable && 存在于mqSet 里的消息队列。 
  2.        List pullRequestList = new ArrayList<>(); // 拉消息请求数组 
  3.        for (MessageQueue mq : mqSet) { 
  4.            if (!this.processQueueTable.containsKey(mq)) { 
  5.                if (isOrder && !this.lock(mq)) { // 顺序消息锁定消息队列 
  6.                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); 
  7.                    continue
  8.                } 
  9.  
  10.                this.removeDirtyOffset(mq); 
  11.                ProcessQueue pq = new ProcessQueue(); 
  12.                long nextOffset = this.computePullFromWhere(mq); 
  13.                if (nextOffset >= 0) { 
  14.                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); 
  15.                    if (pre != null) { 
  16.                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); 
  17.                    } else { 
  18.                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); 
  19.                        PullRequest pullRequest = new PullRequest(); 
  20.                        pullRequest.setConsumerGroup(consumerGroup); 
  21.                        pullRequest.setNextOffset(nextOffset); 
  22.                        pullRequest.setMessageQueue(mq); 
  23.                        pullRequest.setProcessQueue(pq); 
  24.                        pullRequestList.add(pullRequest); 
  25.                        changed = true
  26.                    } 
  27.                } else { 
  28.                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); 
  29.                } 
  30.            } 
  31.        } 
  32.  
  33.        // 发起消息拉取请求 
  34.        this.dispatchPullRequest(pullRequestList); 

核心思想就是,消费客户端先向broker端发起对messageQueue的加锁请求,只有加锁成功时才创建pullRequest进行消息拉取,下面看下lock加锁请求方法:

  1.  
  2.    public boolean lock(final MessageQueue mq) { 
  3.        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); 
  4.        if (findBrokerResult != null) { 
  5.            LockBatchRequestBody requestBody = new LockBatchRequestBody(); 
  6.            requestBody.setConsumerGroup(this.consumerGroup); 
  7.            requestBody.setClientId(this.mQClientFactory.getClientId()); 
  8.            requestBody.getMqSet().add(mq); 
  9.  
  10.            try { 
  11.                // 请求Broker获得指定消息队列的分布式锁 
  12.                Set lockedMq = 
  13.                    this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); 
  14.  
  15.                // 设置消息处理队列锁定成功。锁定消息队列成功,可能本地没有消息处理队列,设置锁定成功会在lockAll()方法。 
  16.                for (MessageQueue mmqq : lockedMq) { 
  17.                    ProcessQueue processQueue = this.processQueueTable.get(mmqq); 
  18.                    if (processQueue != null) { 
  19.                        processQueue.setLocked(true); 
  20.                        processQueue.setLastLockTimestamp(System.currentTimeMillis()); 
  21.                    } 
  22.                } 
  23.  
  24.                boolean lockOK = lockedMq.contains(mq); 
  25.                log.info("the message queue lock {}, {} {}"
  26.                    lockOK ? "OK" : "Failed"
  27.                    this.consumerGroup, 
  28.                    mq); 
  29.                return lockOK; 
  30.            } catch (Exception e) { 
  31.                log.error("lockBatchMQ exception, " + mq, e); 
  32.            } 
  33.        } 
  34.  
  35.        return false
  36.    } 

代码实现逻辑比较清晰,就是调用lockBatchMQ方法发送了一个加锁请求,那么broker端收到加锁请求后的处理逻辑又是怎么样?

【broker端实现】

broker端收到加锁请求的处理逻辑在RebalanceLockManager#tryLockBatch方法中,RebalanceLockManager中关键属性如下:

  1.  
  2.     private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty( 
  3.         "rocketmq.broker.rebalance.lockMaxLiveTime""60000")); 
  4.      
  5.     private final Lock lock = new ReentrantLock(); 
  6.      
  7.     private final ConcurrentHashMap> mqLockTable = 
  8.             new ConcurrentHashMap<>(1024); 

LockEntry对象中关键属性如下:

  1.  
  2.    static class LockEntry { 
  3.         
  4.        private String clientId; 
  5.         
  6.        private volatile long lastUpdateTimestamp = System.currentTimeMillis(); 
  7.  
  8.        public String getClientId() { 
  9.            return clientId; 
  10.        } 
  11.  
  12.        public void setClientId(String clientId) { 
  13.            this.clientId = clientId; 
  14.        } 
  15.  
  16.        public long getLastUpdateTimestamp() { 
  17.            return lastUpdateTimestamp; 
  18.        } 
  19.  
  20.        public void setLastUpdateTimestamp(long lastUpdateTimestamp) { 
  21.            this.lastUpdateTimestamp = lastUpdateTimestamp; 
  22.        } 
  23.  
  24.         
  25.        public boolean isLocked(final String clientId) { 
  26.            boolean eq = this.clientId.equals(clientId); 
  27.            return eq && !this.isExpired(); 
  28.        } 
  29.  
  30.         
  31.        public boolean isExpired() { 
  32.            boolean expired = 
  33.                (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME; 
  34.  
  35.            return expired; 
  36.        } 
  37.    } 

broker端通过对ConcurrentMap> mqLockTable的维护来达到messageQueue加锁的目的,使得同一时刻,一个messageQueue只能被一个消费者消费。

【再次回到Consumer端,拿到锁后】

消费者对messageQueue的加锁已经成功,那么就进入到了第二个步骤,创建pullRequest进行消息拉取,消息拉取部分的代码实现在PullMessageService中,消息拉取完后,需要提交到ConsumeMessageService中进行消费,顺序消费的实现为ConsumeMessageOrderlyService,提交消息进行消费的方法为ConsumeMessageOrderlyService#submitConsumeRequest,具体实现如下:

  1. @Override 
  2.  public void submitConsumeRequest(// 
  3.      final List msgs, // 
  4.      final ProcessQueue processQueue, // 
  5.      final MessageQueue messageQueue, // 
  6.      final boolean dispathToConsume) { 
  7.      if (dispathToConsume) { 
  8.          ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); 
  9.          this.consumeExecutor.submit(consumeRequest); 
  10.      } 
  11.  } 

构建了一个ConsumeRequest对象,并提交给了ThreadPoolExecutor来并行消费,看下顺序消费的ConsumeRequest的run方法实现:

  1. public void run() { 
  2.            if (this.processQueue.isDropped()) { 
  3.                log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); 
  4.                return
  5.            } 
  6.  
  7.            // 获得 Consumer 消息队列锁 
  8.            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); 
  9.            synchronized (objLock) { 
  10.                // (广播模式) 或者 (集群模式 && Broker消息队列锁有效) 
  11.                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) 
  12.                    || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { 
  13.                    final long beginTime = System.currentTimeMillis(); 
  14.                    // 循环 
  15.                    for (boolean continueConsume = true; continueConsume; ) { 
  16.                        if (this.processQueue.isDropped()) { 
  17.                            log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); 
  18.                            break; 
  19.                        } 
  20.  
  21.                        // 消息队列分布式锁未锁定,提交延迟获得锁并消费请求 
  22.                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) 
  23.                            && !this.processQueue.isLocked()) { 
  24.                            log.warn("the message queue not locked, so consume later, {}", this.messageQueue); 
  25.                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); 
  26.                            break; 
  27.                        } 
  28.                        // 消息队列分布式锁已经过期,提交延迟获得锁并消费请求 
  29.                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) 
  30.                            && this.processQueue.isLockExpired()) { 
  31.                            log.warn("the message queue lock expired, so consume later, {}", this.messageQueue); 
  32.                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); 
  33.                            break; 
  34.                        } 
  35.  
  36.                        // 当前周期消费时间超过连续时长,默认:60s,提交延迟消费请求。默认情况下,每消费1分钟休息10ms。 
  37.                        long interval = System.currentTimeMillis() - beginTime; 
  38.                        if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { 
  39.                            ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); 
  40.                            break; 
  41.                        } 
  42.  
  43.                        // 获取消费消息。此处和并发消息请求不同,并发消息请求已经带了消费哪些消息。 
  44.                        final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); 
  45.                        List msgs = this.processQueue.takeMessags(consumeBatchSize); 
  46.                        if (!msgs.isEmpty()) { 
  47.                            final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue); 
  48.  
  49.                            ConsumeOrderlyStatus status = null
  50.  
  51.                            // Hook:before 
  52.                            ConsumeMessageContext consumeMessageContext = null
  53.                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { 
  54.                                consumeMessageContext = new ConsumeMessageContext(); 
  55.                                consumeMessageContext 
  56.                                    .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup()); 
  57.                                consumeMessageContext.setMq(messageQueue); 
  58.                                consumeMessageContext.setMsgList(msgs); 
  59.                                consumeMessageContext.setSuccess(false); 
  60.                                // init the consume context type 
  61.                                consumeMessageContext.setProps(new HashMap()); 
  62.                                ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); 
  63.                            } 
  64.  
  65.                            // 执行消费 
  66.                            long beginTimestamp = System.currentTimeMillis(); 
  67.                            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; 
  68.                            boolean hasException = false
  69.                            try { 
  70.                                this.processQueue.getLockConsume().lock(); // 锁定队列消费锁 
  71.  
  72.                                if (this.processQueue.isDropped()) { 
  73.                                    log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}"
  74.                                        this.messageQueue); 
  75.                                    break; 
  76.                                } 
  77.  
  78.                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); 
  79.                            } catch (Throwable e) { 
  80.                                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", // 
  81.                                    RemotingHelper.exceptionSimpleDesc(e), // 
  82.                                    ConsumeMessageOrderlyService.this.consumerGroup, // 
  83.                                    msgs, // 
  84.                                    messageQueue); 
  85.                                hasException = true
  86.                            } finally { 
  87.                                this.processQueue.getLockConsume().unlock(); // 锁定队列消费锁 
  88.                            } 
  89.  
  90.                            if (null == status // 
  91.                                || ConsumeOrderlyStatus.ROLLBACK == status// 
  92.                                || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { 
  93.                                log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", // 
  94.                                    ConsumeMessageOrderlyService.this.consumerGroup, // 
  95.                                    msgs, // 
  96.                                    messageQueue); 
  97.                            } 
  98.  
  99.                            // 解析消费结果状态 
  100.                            long consumeRT = System.currentTimeMillis() - beginTimestamp; 
  101.                            if (null == status) { 
  102.                                if (hasException) { 
  103.                                    returnType = ConsumeReturnType.EXCEPTION; 
  104.                                } else { 
  105.                                    returnType = ConsumeReturnType.RETURNNULL; 
  106.                                } 
  107.                            } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { 
  108.                                returnType = ConsumeReturnType.TIME_OUT; 
  109.                            } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { 
  110.                                returnType = ConsumeReturnType.FAILED; 
  111.                            } else if (ConsumeOrderlyStatus.SUCCESS == status) { 
  112.                                returnType = ConsumeReturnType.SUCCESS; 
  113.                            } 
  114.  
  115.                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { 
  116.                                consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); 
  117.                            } 
  118.  
  119.                            if (null == status) { 
  120.                                status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; 
  121.                            } 
  122.  
  123.                            // Hook:after 
  124.                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { 
  125.                                consumeMessageContext.setStatus(status.toString()); 
  126.                                consumeMessageContext 
  127.                                    .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status); 
  128.                                ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); 
  129.                            } 
  130.  
  131.                            ConsumeMessageOrderlyService.this.getConsumerStatsManager() 
  132.                                .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); 
  133.  
  134.                            // 处理消费结果 
  135.                            continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); 
  136.                        } else { 
  137.                            continueConsume = false
  138.                        } 
  139.                    } 
  140.                } else { 
  141.                    if (this.processQueue.isDropped()) { 
  142.                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); 
  143.                        return
  144.                    } 
  145.  
  146.                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); 
  147.                } 
  148.            } 
  149.        } 

获取到锁对象后,使用synchronized尝试申请线程级独占锁。

如果加锁成功,同一时刻只有一个线程进行消息消费。

如果加锁失败,会延迟100ms重新尝试向broker端申请锁定messageQueue,锁定成功后重新提交消费请求

至此,第三个关键点的解决思路也清晰了,基本上就两个步骤。

创建消息拉取任务时,消息客户端向broker端申请锁定MessageQueue,使得一个MessageQueue同一个时刻只能被一个消费客户端消费。

消息消费时,多线程针对同一个消息队列的消费先尝试使用synchronized申请独占锁,加锁成功才能进行消费,使得一个MessageQueue同一个时刻只能被一个消费客户端中一个线程消费。

【顺序消费问题拆解】

  1. broke 上要保证一个队列只有一个进程消费,即一个队列同一时间只有一个consumer 消费
  2. broker 给consumer 的消息顺序应该保持一致,这个通过 rpc传输,序列化后消息顺序不变,所以很容易实现
  3. consumer 上的队列消息要保证同一个时间只有一个线程消费

通过问题的拆分,问题变成同一个共享资源串行处理了,要解决这个问题,通常的做法都是访问资源的时候加锁,即broker 上一个队列消息在被consumer 访问的必须加锁,单个consumer 端多线程并发处理消息的时候需要加锁;这里还需要考虑broker 锁的异常情况,假如一个broke 队列上的消息被consumer 锁住了,万一consumer 崩溃了,这个锁就释放不了,所以broker 上的锁需要加上锁的过期时间。

实际上 RocketMQ 消费端也就是照着上面的思路做:

RocketMQ中顺序消息注意事项

实际项目中并不是所有情况都需要用到顺序消息,但这也是设计方案的时候容易忽略的一点

顺序消息是生产者和消费者配合协调作用的结果,但是消费端保证顺序消费,是保证不了顺序消息的

消费端并行方式消费,只设置一次拉取消息的数量为 1(即配置参数 consumeBatchSize ),是否可以实现顺序消费 ?这里实际是不能的,并发消费在消费端有多个线程同时消费,consumeBatchSize 只是一个线程一次拉取消息的数量,对顺序消费没有意义,这里大家有兴趣可以看 ConsumeMessageConcurrentlyService 的代码,并发消费的逻辑都在哪里。

在使用顺序消息时,一定要注意其异常情况的出现,对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 版会自动不断地进行消息重试(每次间隔时间为 1 秒),重试最大值是Integer.MAX_VALUE.这时,应用会出现消息消费被阻塞的情况。因此,建议您使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

重要的事再强调一次:在使用顺序消息时,一定要注意其异常情况的出现!防止资源不释放!

小结

通过以上的了解,我们知道了实现顺序消息所必要的条件:顺序发送、顺序存储、顺序消费。RocketMQ的设计中考虑到了这些,我们只需要简单的使用API,不需要额外使用代码来约束业务,使得实现顺序消息更加简单。

 

来源:小汪哥写代码内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯