文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

消息队列选型看这一篇就够了

2024-11-29 18:30

关注

消息队列是重要的分布式系统组件,在高性能、高可用、低耦合等系统架构中扮演着重要作用。可用于异步通信、削峰填谷、解耦系统、数据缓存等多种业务场景。本文是关于消息队列(MQ)选型和常见问题的精心整理。在这篇文章中,我们将详细介绍消息队列的概念、作用以及如何选择适合自己需求的消息队列系统。

一、概述

消息队列是分布式系统中重要的中间件,在高性能、高可用、低耦合等系统架构中扮演着重要作用。分布式系统可以借助消息队列的能力,轻松实现以下功能:

二、架构简介

1. Kafka

(1) 系统框架

一个 Kafka 集群由多个 Broker 和一个 ZooKeeper 集群组成,Broker 作为 Kafka 节点的服务器。同一个消息主题 Topic 可以由多个分区 Partition 组成,分区物理存储在 Broker 上。负载均衡考虑,同一个 Topic 的多个分区存储在多个不同的 Broker 上,为了提高可靠性,每个分区在不同的 Broker 会存在副本。

ZookKeeper 是一个分布式开源的应用程序协调服务,可以实现统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等工作。Kafka 里的 ZooKeeper 主要有一下几个作用:

(2) 基本术语

2. Pulsar

(1) 系统框架

Pulsar 有三个重要的组件,Broker、BookKeeper 和ZooKeeper,Broker 是无状态服务,客户端需要连接到 Broker 上进行消息的传递。BookKeeper 与 ZooKeeper 是有状态服务。BookKeeper 的节点叫 Bookie,负责存储消息和游标,ZooKeeper 存储 Broker 和 Bookie 的元数据。Pulsar 以这种架构,实现存储和计算分离,Broker 负责计算,Bookie 负责有状态存储。

Pulsar 的多层架构影响了存储数据的方式。Pulsar 将 Topic 分区划分为分片(Segment),然后将这些分片存储在 Apache BookKeeper 的存储节点上,以提高性能、可伸缩性和可用性。Pulsar 的分布式日志以分片为中心,借助扩展日志存储(通过 Apache BookKeeper)实现,内置分层存储支持,因此分片可以均匀地分布在存储节点上。由于与任一给定 Topic 相关的数据都不会与特定存储节点进行捆绑,因此很容易替换存储节点或缩扩容。另外,集群中最小或最慢的节点也不会成为存储或带宽的短板。

(2) 基本术语

3. RocketMQ

(1) 系统框架

RocketMQ 是阿里开源的消息中间件,它是一个开源的分布式消息传递和流式数据平台。总共有四大部分:NameServer,Broker,Producer,Consumer。

NameServer 主要用来管理 brokers 以及路由信息。broker 服务器启动时会注册到 NameServer 上,并且两者之间保持心跳监测机制,以此来保证 NameServer 知道 broker 的存活状态。而且,每一台 NameServer 都存有全部的 broker 集群信息和生产者/消费者客户端的请求信息。

Broker 负责管理消息存储分发,主从数据同步,为消息建立索引,提供消息查询等能力。

(2) 基本术语

4. RabbitMQ

(1) 系统框架

RabbitMQ 基于 AMQP 协议来实现,主要由 Exchange 和 Queue 两部分组成,然后通过 RoutingKey 关联起来,消息投递到 Exchange 然后通过 Queue 接收。

(2) 基本术语

5. NSQ

(1) 系统框架

NSQ 主要有 nsqlookup、nsqd 两部分组成:

NSQ 由 3 个守护进程组成:

三、选型要点

1. 选型参考

2. 消息队列对比

注:作为 LShift 和 CohesiveFT 于 2007 年成立的合资企业,RabbitMQ 于 2010 年 4 月被 VMware 旗下的 SpringSource 收购。

四、功能剖析

1. 消费推拉模式

客户端消费者获取消息的方式,Kafka 和 RocketMQ 是通过长轮询 Pull 的方式拉取消息,RabbitMQ、Pulsar、NSQ 都是通过 Push 的方式。

pull 类型的消息队列更适合高吞吐量的场景,允许消费者自己进行流量控制,根据消费者实际的消费能力去获取消息。而 push 类型的消息队列,实时性更好,但需要有一套良好的流控策略(backpressure)当消费者消费能力不足时,减少 push 的消费数量,避免压垮消费端。

2. 延迟队列

消息延迟投递,当消息产生送达消息队列时,有些业务场景并不希望消费者立刻收到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。延迟队列一般分为两种,基于消息的延迟和基于队列的延迟。基于消息的延迟指为每条消息设置不同的延迟时间,当队列有新消息进入的时候根据延迟时间排序,当然这样会对性能造成较大影响。另一种基于队列的延迟指的是设置不同延迟级别的队列,队列中每个消息的延迟时间都是相同的,这样免去了基于延迟时间排序对性能带来的损耗,通过一定的扫描策略即可投递超时的消息。

延迟消息的使用场景比如异常检测重试,订单超时取消等,例如:

Kafka 不支持延迟消息。Pulsar 支持秒级的延迟消息,所有延迟投递的消息会被 Delayed Message Tracker 记录对应的 index,consumer 在消费时,会先去 Delayed Message Tracker 检查,是否有到期需要投递的消息,如果有到期的消息,则从 Tracker 中拿出对应的 index,找到对应的消息进行消费,如果没有到期的消息,则直接消费正常的消息。对于长时间的延迟消息,会被存储在磁盘中,当快到延迟间隔时才被加载到内存里。

RocketMQ 开源版本延迟消息临时存储在一个内部主题中,不支持任意时间精度,支持特定的 level,例如定时 5s,10s,1m 等。

RabbitMQ 需要安装一个 rabbitmq_delayed_message_exchange 插件。

NSQ 通过内存中的优先级队列来保存延迟消息,支持秒级精度,最多 2 个小时延迟。

3. 死信队列

由于某些原因消息无法被正确的投递,为了确保消息不会被无故的丢弃,一般将其置于一个特殊角色的队列,这个队列一般称之为死信队列。与此对应的还有一个“回退队列”的概念,试想如果消费者在消费时发生了异常,那么就不会对这一次消费进行确认(Ack), 进而发生回滚消息的操作之后消息始终会放在队列的顶部,然后不断被处理和回滚,导致队列陷入死循环。为了解决这个问题,可以为每个队列设置一个回退队列,它和死信队列都是为异常的处理提供的一种机制保障。实际情况下,回退队列的角色可以由死信队列和重试队列来扮演。

NSQ 没有死信队列。

4. 优先级队列

有一些业务场景下,我们需要优先处理一些消息,比如银行里面的金卡客户、银卡客户优先级高于普通客户,他们的业务需要优先处理。如下图:

优先级队列不同于先进先出队列,优先级高的消息具备优先被消费的特权,这样可以为下游提供不同消息级别的保证。不过这个优先级也是需要有一个前提的:如果消费者的消费速度大于生产者的速度,并且消息中间件服务器(一般简单的称之为 Broker)中没有消息堆积,那么对于发送的消息设置优先级也就没有什么实质性的意义了,因为生产者刚发送完一条消息就被消费者消费了,那么就相当于 Broker 中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。

Kafka、RocketMQ、Pulsar、NSQ 不支持优先级队列,可以通过不同的队列来实现消息优先级。

RabbitMQ 支持优先级消息。

5. 消息回溯

一般消息在消费完成之后就被处理了,之后再也不能消费到该条消息。消息回溯正好相反,是指消息在消费完成之后,还能消费到之前被消费掉的消息。对于消息而言,经常面临的问题是“消息丢失”,至于是真正由于消息中间件的缺陷丢失还是由于使用方的误用而丢失一般很难追查,如果消息中间件本身具备消息回溯功能的话,可以通过回溯消费复现“丢失的”消息进而查出问题的源头之所在。消息回溯的作用远不止与此,比如还有索引恢复、本地缓存重建,有些业务补偿方案也可以采用回溯的方式来实现。

6. 消息持久化

流量削峰是消息中间件的一个非常重要的功能,而这个功能其实得益于其消息堆积能力。从某种意义上来讲,如果一个消息中间件不具备消息堆积的能力,那么就不能把它看做是一个合格的消息中间件。消息堆积分内存式堆积和磁盘式堆积。一般来说,磁盘的容量会比内存的容量要大得多,对于磁盘式的堆积其堆积能力就是整个磁盘的大小。从另外一个角度讲,消息堆积也为消息中间件提供了冗余存储的功能。

Kafka 和 RocketMQ 直接将消息刷入磁盘文件中进行持久化,所有的消息都存储在磁盘中。只要磁盘容量够,可以做到无限消息堆积。

RabbitMQ 是典型的内存式堆积,但这并非绝对,在某些条件触发后会有换页动作来将内存中的消息换页到磁盘(换页动作会影响吞吐),或者直接使用惰性队列来将消息直接持久化至磁盘中。

Pulsar 消息是存储在 BookKeeper 存储集群上,也是磁盘文件。

NSQ 通过 nsq_to_file 工具,将消息写入到文件。

7. 消息确认机制

消息队列需要管理消费进度,确认消费者是否成功处理消息,使用 push 的方式的消息队列组件往往是对单条消息进行确认,对于未确认的消息,进行延迟重新投递或者进入死信队列。

Kafka通过 Offset 的方式确认消息:

RocketMQ与 Kafka 类似也会提交 Offset,区别在于消费者对于消费失败的消息,可以标记为消息消费失败,Broker 会重试投递,如果累计多次消费失败,会投递到死信队列。

RabbitMQ和 NSQ 类似,消费者确认单条消息,否则会重新放回队列中等待下次投递:

Pulsar使用专门的 Cursor 管理。累积确认和 Kafka 效果一样;提供单条或选择性确认。

8. 消息 TTL

消息 TTL 表示一条消息的生存时间,如果消息发出来后,在 TTL 的时间内没有消费者进行消费,消息队列会将消息删除或者放入死信队列中。

Kafka 根据设置的保留期来删除消息。有可能消息没被消费,过期后被删除。不支持 TTL。

Pulsar 支持 TTL,如果消息未在配置的 TTL 时间段内被任何消费者使用,则消息将自动标记为已确认。消息保留期与消息 TTL 之间的区别在于:消息保留期作用于标记为已确认并设置为已删除的消息,而 TTL 作用于未 ack 的消息。上面的图例中说明了 Pulsar 中的 TTL。例如,如果订阅 B 没有活动消费者,则在配置的 TTL 时间段过后,消息 M10 将自动标记为已确认,即使没有消费者实际读取该消息。

RocketMQ 提及到消息 TTL 的资料比较少,不过看接口似乎是支持的。

RabbitMQ 有两种方式,一个是声明队列的时候在队列属性中设置,整个队列中的消息都有相同的有效期。还可以发送消息的时候给消息设置属性,可以位每条消息都设置不同的 TTL。

NSQ 似乎还没支持,有一个 Feature Request 的 Issue 处于 Open 状态。

9. 多租户隔离

多租户是指通过一个软件实例为多个租户提供服务的能力。租户是指对系统有着相同“视图”的一组用户。不支持多租户的系统里边,往往要为不同用户或者不同集群创建多个消息队列实例实现物理隔离,这样会带来较高的运维成本。作为一种企业级的消息系统,Pulsar 的多租户能力按照设计可满足下列需求:

Pulsar 通过下列方式满足了上述需求:

10. 消息顺序性

消息顺序性是指保证消息有序。消息消费顺序跟生产的顺序保持一致。

Kafka 保证了分区内的消息有序。

Pulsar 支持两种消费模式,独占订阅的流模式只保证了消息的顺序性,共享订阅队列模型不保证有序性。

RocketMQ 需要用到锁来保证一个队列同时只有一个消费者线程进行消费,保证消息的有序性。

RabbitMQ 顺序性的条件比较苛刻,需要单线程发送、单线程消费,并且不采用延迟队列、优先级队列等高级功能。

NSQ 是利用了 golang 自身的 case/select 实现的消息分发,本身不提供有序性保障,不能够把特性消息和消费者对应起来,无法实现消息的有序性。

11. 消息查询

在实际开发中,经常要查看 MQ 中消息的内容,比如通过某个 MessageKey/ID,查询到 MQ 的具体消息。或者是对消息进行链路追踪,知道消息从哪里来,发送到哪里去,进而快速对问题进行排查定位。

Kafka 存储层是以分布式提交日志的形式实现,每次写操作都顺序追加到日志的末尾。读也是顺序读。不支持检索功能。

Pulsar 可以通过消息 ID,查询到具体某条消息的消息内容、消息参数和消息轨迹。

RocketMQ 支持按 Message Key、Unique Key、Message Id 对消息进行查询。

RabbitMQ 使用基于索引的存储系统。这些将数据保存在树结构中,以提供确认单个消息所需的快速访问。由于 RabbitMQ 的消息在确认后会被删除,因此只能查询未确认的消息。

NSQ 自身不支持消息持久化和消息检索,不过可以使用 nsq_to_http 等工具将消息写入可支持索引的存储里。

12. 消费模式

Kafka 有两种消费模式,最终都会保证一个分区只有 1 个消费者在消费:

Pulsar 有以下四种消费模式,其中独占模式和灾备模式跟 Kafka 类似,为流模型,每个分区只有 1 个消费者消费,能保证消息有序性。共享模式和 Key 共享模式为队列模型,多个消费者能提高消费速度,但不能保证有序性。

RocketMQ 有两种消费模式,BROADCASTING 广播模式,CLUSTERING 集群模式。

RabbitMQ 和 NSQ 的消费比较类似,都是跟 Pulsar 共享模式类似的,队列的形式,增加一个消费者组里的消费者数量能提高消费速度。

13. 消息可靠性

消息丢失是使用消息中间件时所不得不面对的一个同点,其背后消息可靠性也是衡量消息中间件好坏的一个关键因素。尤其是在金融支付领域,消息可靠性尤为重要。比如当服务出现故障时,一些对于生产者来说已经生产成功的消息,是否会在高可用切换时丢失。同步刷盘是增强一个组件可靠性的有效方式,消息中间件也不例外,Kafka 和 RabbitMQ 都可以支持同步刷盘,但绝大多数情景下,一个组件的可靠性不应该由同步刷盘这种极其损耗性能的操作来保障,而是采用多副本的机制来保证。

Kafka 可以通过配置 request.required.acks 参数设置可靠级别,表示一条消息有多少个副本确认接收成功后,才被任务发送成功。

Pulsar 有跟 Kafka 类似的概念,叫 Ack Quorum Size(Qa),Qa 是每次写请求发送完毕后需要回复确认的 Bookie 的个数,其数值越大则需要确认写成功的时间越长,其值上限是副本数 Qw。为了一致性,Qa 应该是:(Qw+1)/2 或者更,即为了确保数据安全性,Qa 下限是  (Qw+1)/2。

RocketMQ 与 Kafka 类似。

14. 负载均衡

Kafka:支持负载均衡。一个 broker 通常就是一台服务器节点。对于同一个 Topic 的不同分区,Kafka 会尽力将这些分区分布到不同的 Broker 服务器上,zookeeper 保存了 broker、主题和分区的元数据信息。分区首领会处理来自客户端的生产请求,kafka 分区首领会被分配到不同的 broker 服务器上,让不同的 broker 服务器共同分担任务。

每一个 broker 都缓存了元数据信息,客户端可以从任意一个 broker 获取元数据信息并缓存起来,根据元数据信息知道要往哪里发送请求。

kafka 的消费者组订阅同一个 topic,会尽可能地使得每一个消费者分配到相同数量的分区,分摊负载。

当消费者加入或者退出消费者组的时候,还会触发再均衡,为每一个消费者重新分配分区,分摊负载。

kafka 的负载均衡大部分是自动完成的,分区的创建也是 kafka 完成的,隐藏了很多细节,避免了繁琐的配置和人为疏忽造成的负载问题。

发送端由 topic 和 key 来决定消息发往哪个分区,如果 key 为 null,那么会使用轮询算法将消息均衡地发送到同一个 topic 的不同分区中。如果 key 不为 null,那么会根据 key 的 hashcode 取模计算出要发往的分区。

rabbitmq:对负载均衡的支持不好。消息被投递到哪个队列是由交换器和 key 决定的,交换器、路由键、队列都需要手动创建。

rabbitmq 客户端发送消息要和 broker 建立连接,需要事先知道 broker 上有哪些交换器,有哪些队列。通常要声明要发送的目标队列,如果没有目标队列,会在 broker 上创建一个队列,如果有,就什么都不处理,接着往这个队列发送消息。假设大部分繁重任务的队列都创建在同一个 broker 上,那么这个 broker 的负载就会过大。(可以在上线前预先创建队列,无需声明要发送的队列,但是发送时不会尝试创建队列,可能出现找不到队列的问题,rabbitmq 的备份交换器会把找不到队列的消息保存到一个专门的队列中,以便以后查询使用)

使用镜像队列机制建立 rabbitmq 集群可以解决这个问题,形成 master-slave 的架构,master 节点会均匀分布在不同的服务器上,让每一台服务器分摊负载。slave 节点只是负责转发,在 master 失效时会选择加入时间最长的 slave 成为 master。

当新节点加入镜像队列的时候,队列中的消息不会同步到新的 slave 中,除非调用同步命令,但是调用命令后,队列会阻塞,不能在生产环境中调用同步命令。

当 rabbitmq 队列拥有多个消费者的时候,队列收到的消息将以轮询的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者,不会重复。

这种方式非常适合扩展,而且是专门为并发程序设计的。

如果某些消费者的任务比较繁重,那么可以设置 basicQos 限制信道上消费者能保持的最大未确认消息的数量,在达到上限时,rabbitmq 不再向这个消费者发送任何消息。

对于 rabbitmq 而言,客户端与集群建立的 TCP 连接不是与集群中所有的节点建立连接,而是挑选其中一个节点建立连接。

但是 rabbitmq 集群可以借助 HAProxy、LVS 技术,或者在客户端使用算法实现负载均衡,引入负载均衡之后,各个客户端的连接可以分摊到集群的各个节点之中。

客户端均衡算法:

zeromq:去中心化,不支持负载均衡。本身只是一个多线程网络库。

rocketmq:支持负载均衡。一个 broker 通常是一个服务器节点,broker 分为 master 和 slave,master 和 slave 存储的数据一样,slave 从 master 同步数据。

nameserver 与每个集群成员保持心跳,保存着 Topic-Broker 路由信息,同一个 topic 的队列会分布在不同的服务器上。

发送消息通过轮询队列的方式发送,每个队列接收平均的消息量。发送消息指定 topic、tags、keys,无法指定投递到哪个队列(没有意义,集群消费和广播消费跟消息存放在哪个队列没有关系)。

tags 选填,类似于 Gmail 为每封邮件设置的标签,方便服务器过滤使用。目前只支 持每个消息设置一个 tag,所以也可以类比为 Notify 的 MessageType 概念。

keys 选填,代表这条消息的业务关键词,服务器会根据 keys 创建哈希索引,设置后, 可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引,请尽可能 保证 key 唯一,例如订单号,商品 Id 等。

rocketmq 的负载均衡策略规定:Consumer 数量应该小于等于 Queue 数量,如果 Consumer 超过 Queue 数量,那么多余的 Consumer 将不能消费消息。这一点和 kafka 是一致的,rocketmq 会尽可能地为每一个 Consumer 分配相同数量的队列,分摊负载。

activemq:支持负载均衡。可以基于 zookeeper 实现负载均衡。

15. 集群方式

Kafka:天然的‘Leader-Slave’无状态集群,每台服务器既是 Master 也是 Slave。分区首领均匀地分布在不同的 kafka 服务器上,分区副本也均匀地分布在不同的 kafka 服务器上,所以每一台 kafka 服务器既含有分区首领,同时又含有分区副本,每一台 kafka 服务器是某一台 kafka 服务器的 Slave,同时也是某一台 kafka 服务器的 leader。

kafka 的集群依赖于 zookeeper,zookeeper 支持热扩展,所有的 broker、消费者、分区都可以动态加入移除,而无需关闭服务,与不依靠 zookeeper 集群的 mq 相比,这是最大的优势。

rabbitmq:支持简单集群,'复制'模式,对高级集群模式支持不好。

rabbitmq 的每一个节点,不管是单一节点系统或者是集群中的一部分,要么是内存节点,要么是磁盘节点,集群中至少要有一个是磁盘节点。

在 rabbitmq 集群中创建队列,集群只会在单个节点创建队列进程和完整的队列信息(元数据、状态、内容),而不是在所有节点上创建。

引入镜像队列,可以避免单点故障,确保服务的可用性,但是需要人为地为某些重要的队列配置镜像。

Name Server 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

Broker 部署相对复杂,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。每个 Broker 与 Name Server 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 Name Server。

Producer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。

Consumer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。

客户端先找到 NameServer, 然后通过 NameServer 再找到 Broker。

一个 topic 有多个队列,这些队列会均匀地分布在不同的 broker 服务器上。rocketmq 队列的概念和 kafka 的分区概念是基本一致的,kafka 同一个 topic 的分区尽可能地分布在不同的 broker 上,分区副本也会分布在不同的 broker 上。

rocketmq 集群的 slave 会从 master 拉取数据备份,master 分布在不同的 broker 上。

activemq:支持简单集群模式,比如'主-备',对高级集群模式支持不好。

五、性能

Kafka 的公司 Confluent 在 2020 年 8 月发了一篇 Benchmarking Apache Kafka, Apache Pulsar, and RabbitMQ: Which is the Fastest?文章,并且提出了一个开源的 MQ Benchmark 框架 THE OPENMESSAGING BENCHMARK FRAMEWORK,在这个文档里,对比了 Kafka、Pulsar、RabbitMQ 的吞吐量、端到端延迟等性能数据。最后得出结论 Kafka 相对来说性能最好。

但接下来 StreamNative 在 2020 年 12 月指出了 Confluence 的基准测试的一些问题,并对 Pulsar 进行了参数调优之后重新执行了一遍结果,测试报告展示 Pulsar 能达到跟 Kafka 同样的吞吐量,在某些场景下,Pulsar 的延迟显著低于 Kafka。

而且在性能测试上,有很多客户端、服务端参数设置、机器性能配置等影响,比如消息可靠性级别,压缩算法等,很难做到“完全”控制变量公平的测试。而且 OpenMessaging Benchmark 的开源 Github 的 Readme 上也提到了。

不过有几个关注点:

Pulsar 和 Kafka 都被广泛用于各个企业,也各有优势,都能通过数量基本相同的硬件处理大流量。部分用户误以为 Pulsar 使用了很多组件,因此需要很多服务器来实现与 Kafka 相匹敌的性能。这种想法适用于一些特定硬件配置,但在多数资源配置相同的情况中,Pulsar 的优势更加明显,可以用相同的资源实现更好的性能。举例来说,Splunk 最近分享了他们选择 Pulsar 放弃 Kafka 的原因,其中提到“由于分层架构,Pulsar 帮助他们将成本降低了 30%-50%,延迟降低了 80%-98%,运营成本降低了 33%-50%”。Splunk 团队发现 Pulsar 可以更好地利用磁盘 IO,降低 CPU 利用率,同时更好地控制内存。

在分布式系统里,单机性能指标虽然也很重要,分布式系统整体的性能以及灵活扩缩容、高可用容灾等能力也会是评估的一个重要参考。MQ 中间件具体的性能指标,也需要我们自己根据实际的情况,根据实际购买的集群配置和客户端参数,进行压测调优来评估。

六、运维

在使用过程中难免会出现各种异常情况,比如宕机、网络抖动、扩容等。消息队列具备异地容灾,高可用架构等能力,能避免一些计算节点、网络等基础设施不可用导致的故障。

1. 高可用

Kafka 通过分区多副本的方式解决高可用问题。

Pulsar 的计算集群 Broker 是无状态的,可以灵活扩缩容,存储节点 Bookie 上通过消息分区分片副本的方式,每个分片都有一个或多个副本,保证在某一个 Bookie 挂掉后,有其他分片可以提供服务。

RocketMQ 和 RabbitMQ 都是主从架构,当 master 挂掉后,由原来的从节点继续提供服务。备机提供消费服务,保证消息不丢,但不提供写服务。

NSQ 是类似分布式架构,不过由于消息存储是在节点本地磁盘上,如果一个节点离线,堆积在节点磁盘上的消息会丢失。

2. 跨地域容灾

Pulsar 原生支持跨地域容灾功能,在这个图中,每当 P1、P2 和 P3 的生产者分别向 Cluster-A、Cluster-B 和 Cluster-C 中的 T1 topic 发送消息时,这些消息很快在不同的集群中复制。一旦消息完成复制,消费者 C1 和 C2 会从各自的集群消费到这个消息。

在这个跨地域容灾的设计支撑下,其一,我们可以比较容易的将服务分散到多个机房;其二,可以应对机房级别的故障,即在一个机房不可用的情况下,服务可以转接到其它的机房来继续对外提供服务。

一句话概括,Pulsar 的跨地域复制,其实就是在一个本地集群中创建一个 Producer,把异地的集群作为这个 Producer 的发送地址,将本地集群的消息发送过去,并且在本地维护一个 Cusor 来保证消息可靠性和幂等性。

3. 集群扩容

当消息量突然上涨,消息队列集群到达瓶颈的时候,需要对集群进行扩容,扩容一般分为水平扩容和垂直扩容两种方式,水平扩容指的是往往集群中增加节点,垂直扩容指的是把集群中部分节点的配置调高,增加处理能力。

Kafka 集群由于主题分区是物理存储在 Broker 节点上的,新加入的集群的节点并没有存储分区分片,也就无法提供马上提供服务,因此需要把一些 Topic 的分区分配到新加入的节点里,这里会涉及到一个分区数据均衡的过程,将某些分区的数据复制到新节点上。这个过程跟分区当前堆积的数据量、Broker 性能有关,有可能会出现由于源 Broker 负载过高,堆积数据过大,导致数据均衡的时间变长。

Pulsar 的无限分布式日志以分片为中心,借助扩展日志存储(通过 Apache BookKeeper)实现,内置分层存储支持,因此分片可以均匀地分布在存储节点上。由于与任一给定 topic 相关的数据都不会与特定存储节点进行捆绑,因此很容易替换存储节点或缩扩容。另外,集群中最小或最慢的节点也不会成为存储或带宽的短板。

RocketMQ 新节点直接加入到集群中,在新的 broker 创建新 topic 并且分配队列,或者在已有 topic 基础上分配队列。与 Kafka 的区别是,Kafka 的分区是在不同的物理机器上,而 Rocketmq 是逻辑分区,用的队列形式,因此不存在出现数据不均衡的情况。

RabbitMQ 和 NSQ 类似,由于不涉及过多的消息持久化,直接往集群中增加节点。

4. 使用成本

Kafka/Pulsar/RocketMQ/RabbitMQ 在腾讯云上都上线了标准产品,可以直接购买创建实例(产品选型),能大大降低部署运维成本。而 NSQ 目前暂时还没有上线,需要自行部署。

七、常见问题 & 使用场景

1. Kafka

日志收集:大量的日志消息先写入 kafka,数据服务通过消费 kafka 消息将数据落地;

2. RocketMQ

为金融互联网领域而生,对于可靠性要求很高的场景。

3. 普通消息

消息队列最基础的功能就是生产者发送消息、Broker 保存消息,消费者来消费消息,以此实现系统解耦、削峰填谷的作用。

普通消息是消息队列必备的消息类型,也是系统使用场景最多的一种消息。

4. 顺序消息

顺序消息是指生产者发送消息的顺序和消费者消费消息的顺序是一致的。比如在一个电商场景,同一个用户提交订单、订单支付、订单出库,这三个消息消费者需要按照顺序来进行消费。如下图:

顺序消息的实现并不容易,原因如下:

要保证消息有序,需要满足两个条件:

如下图:

上面第二个条件是比较容易实现的,一个分区绑定一个消费者就可以,主要是第一个条件。

在主流消息队列的实现中,Kafka 和 Pulsar 的实现方式类似,生产者给消息赋值一个 key,对 key 做 Hash 运算来指定消息发送到哪一个分区。比如上面电商的例子,对同一个用户的一笔订单,提交订单、订单支付、订单出库这三个消息赋值同一个 key,就可以把这三条消息发送到同一个分区。

对于 RocketMQ,生产者在发送消息的时候,可以通过 MessageQueueSelector 指定把消息投递到那个 MessageQueue,如下图:

5. 延时消息

或者也叫定时消息,是指消息发送后不会立即被消费,而是指定一个时间,到时间后再消费。经典的场景比如电商购物时,30 分钟未支付订单,让订单自动失效。

(1) RocketMQ 实现

RocketMQ 定义了 18 个延时级别,每个延时级别对应一个延时时间。下面如果延迟级别是 3,则消息会延迟 10s 才会拉取。

RocketMQ 的延时消息如下图:

生产者把消费发送到 Broker 后,Broker 首先把消息保存到 SCHEDULE_TOPIC_XXXX 这个 Topic,然后调度任务会判断是否到期,如果到期,会把消息从 SCHEDULE_TOPIC_XXXX 取出投递到原始的 queue,这样消费者就可以消费到了。

RocketMQ 的延时消息只支持最大两个小时的延时,不过 RocketMQ5.0 基于时间轮算法实现了定时消息,解决了这个问题。

(2) Pulsar 实现

Pulsar 的实现如下图:

Pulsar 的延时消息首先会写入一个 Delayed Message Tracker 的数据结构中,Delayed Message Tracker 根据延时时间构建 delayed index 优先级队列。消费者拉取消息时,首先去 Delayed Message Tracker 检查是否有到期的消息。如果有则直接拉取进行消费。

(3) RabbitMQ 实现

RabbitMQ 的实现方式有两种,一种是投递到普通队列都不消费,等消息过期后被投递到死信队列,消费者消费死信队列。如下图:

第二种方式是生产者发送消息时,先发送到本地 Mnesia 数据库,消息到期后定时器再将消息投递到 broker。

(4) Kafka 实现

Kafka 本身并没有延时队列,不过可以通过生产者拦截器来实现消息延时发送,也可以定义延时 Topic,利用类似 RocketMQ 的方案来实现延时消息。

6. 事务消息

事务消息是指生产消息和消费消息满足事务的特性。

RabbitMQ 和 Kafka 的事务消息都是只支持生产消息的事务特性,即一批消息要不全部发送成功,要不全部发送失败。

RabbitMQ 通过 Channel 来开启事务消息,代码如下:

ConnectionFactory factory=new ConnectionFactory();
connection=factory.newConnection();
Channel channel=connection.createChannel();
//开启事务
channel.txSelect();
channel.basicPublish("directTransactionExchange","transactionRoutingKey",null,message.getBytes("utf-8"));
//提交事务 或者 channel.txRollback()回滚事务
channel.txCommit();

Kafka 可以给多个生产者设置同一个事务 ID ,从而把多个 Topic 、多个 Partition 放在一个事务中,实现原子性写入。

Pulsar 的事务消息对于事务语义的定义是:允许事件流应用将消费、处理、生产消息整个过程定义为一个原子操作。可见,Pulsar 的事务消息可以覆盖消息流整个过程。

RocketMQ 的事务消息是通过 half 消息来实现的。以电商购物场景来看,账户服务扣减账户金额后,发送消息给 Broker,库存服务来消费这条消息进行扣减库存。如下图:

可见,RocketMQ 只能保证生产者发送消息和本地事务的原子性,并不能保证消费消息的原子性。

7. 轨迹消息

轨迹消息主要用于跟踪消息的生命周期,当消息丢失时可以很方便地找出原因。

轨迹消息也跟普通消息一样,也需要存储和查询,也会占用消息队列的资源,所以选择轨迹消息要考虑下面几点:

RabbitMQ Broker 实现了轨迹消息的功能,打开 Trace 开关,就可以把轨迹消息发送到 amq.rabbitmq.trace 这个 exchange,但是要考虑轨迹消息会不会给 Broker 造成 压力进而导致消息积压。RabbitMQ 的生产者和消费者都没有实现轨迹消息,需要开发者自己来实现。

RocketMQ 生产者、Broker 和消费者都实现了轨迹消息,不过默认是关闭的,需要手工开启。

使用轨迹消息,需要考虑记录哪些节点、存储介质、性能、查询方式等问题。

8. Kafka 是否会消息丢失?

(1)只对“已提交”的消息做有限度的持久化保证

(2)生产者丢失数据

(3) 消费者程序丢失数据

9. Kafka 如何持久化?

(1)消息日志(Log)保存数据,磁盘追加写(Append-only)

(2)定期删除消息(日志段)

10. Kafka 文件存储机制

(1)每个 partition 相当于一个巨型文件 → 多个大小相等 segment 数据文件中

(2)每个 partition 只需要顺序读写就行了,segment 文件生命周期由配置决定

(3)segment file 组成:

(4)segment file 文件命名规则:

一对 segment file

message 物理结构

11. Kafka 分区

为什么分区?

分区策略?

12. MQ 消息堆积问题处理

消息堆积可能的原因: 队列中消息不能被及时的消费,导致大量堆积在队列里面 rocketMq Kafka RabbitMq 都会有这样的问题 产生消息堆积的可以从 mq 的生产消费模型去考虑,从生产者到消息中间件、再到消费者,都会发生堆积。

处理消息堆积问题: 

(1)消费者:

(2)MQ 消息队列: 增加 MQ 的服务器资源,cpu、内存、磁盘,提高 mq 处理能力 也可以通过分区队列将消息分散到多个队列中,提高整体的处理能力。(这个则是 Kafka、Rocket 采用的)

控制队列容量,避免堆积过多,设置持久化策略。rabbitMQ 的懒加载队列,兼顾了持久化和堆积上限

(3)监控告警(重要) 设置监控系统,比如普罗米修斯,监控消息数量,消费者处理速度,队列状态等等,在堆积发生前,即使的告警,及时采取措施。

But 上面的策略是通用的一些解决方案,不同的 MQ,生产消费模型是不一样的,导致需要针对不同 mq 的消息堆积解决方案不一样。

RabbitMq、Kafka、RocketMq 发生消息堆积,分别该如何去解决?

这里先点一下,增加消费者数量,并不是通用的,只适合 RabbitMq。

总结

Kafka 与 Pulsar 都是腾讯云主打的消息队列中间件,都具有高性能,高可靠,支持多种场景。Kafka 推出的时间较早,各种场景比如日志、大数据处理等都有较成熟的解决方案。而 Pulsar 作为一个新秀,支持的功能比 CKafka 更丰富,而且跨地域容灾,多租户等功能,解决了很多 Kafka 设计缺陷和运维成本问题,整体稳定性更强。很多国内外大公司也有很多 Pulsar 的实践案例。因此,一些传统的日志、大数据处理等场景,对高吞吐量有要求的,对消息可靠性的要求没那么高的,可以选用 Kafka,有很多优秀的文档说明怎么参数调优提高性能。而一些对消息可靠性、容灾要求更好,或者有高分区、延迟队列等需求的场景,可以选用 Pulsar。

我们后台的技术栈是基于 Golang 的,在上文的对比中,还挑了一个基于 Golang 开发的消息队列 NSQ,如果有一些定制化需求或者需要二次开发的,可以选用 NSQ。也可以通过阅读 NSQ 的源码,学习一些优秀高性能消息队列中间件的实现方式,比如里边 diskqueue 组件,一个基于磁盘的消息队列,在某些场景下可能也可以进行二次利用。

来源:腾讯技术工程内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯