消息队列是重要的分布式系统组件,在高性能、高可用、低耦合等系统架构中扮演着重要作用。可用于异步通信、削峰填谷、解耦系统、数据缓存等多种业务场景。本文是关于消息队列(MQ)选型和常见问题的精心整理。在这篇文章中,我们将详细介绍消息队列的概念、作用以及如何选择适合自己需求的消息队列系统。
一、概述
消息队列是分布式系统中重要的中间件,在高性能、高可用、低耦合等系统架构中扮演着重要作用。分布式系统可以借助消息队列的能力,轻松实现以下功能:
- 解耦:将一个流程的上下游拆解开,上游专注于生产消息,下游专注于处理消息;
- 广播:上游生产的消息可以轻松被多个下游服务处理;
- 缓冲:应对突发流量,消息队列扮演缓冲器的作用,保护下游服务,使其可以根据自身的实际消费能力处理消息;
- 异步:上游发送消息后可以马上返回,下游可以异步处理消息;
- 冗余:保留历史消息,处理失败或当出现异常时可以进行重试或者回溯,防止丢失;
二、架构简介
1. Kafka
(1) 系统框架
一个 Kafka 集群由多个 Broker 和一个 ZooKeeper 集群组成,Broker 作为 Kafka 节点的服务器。同一个消息主题 Topic 可以由多个分区 Partition 组成,分区物理存储在 Broker 上。负载均衡考虑,同一个 Topic 的多个分区存储在多个不同的 Broker 上,为了提高可靠性,每个分区在不同的 Broker 会存在副本。
ZookKeeper 是一个分布式开源的应用程序协调服务,可以实现统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等工作。Kafka 里的 ZooKeeper 主要有一下几个作用:
- Broker 注册,当有 Broker 故障的时候能及时感知。
- Topic 注册,维护 Topic 各分区的个副本所在的 Broker 节点,以及对应 leader/follower 的角色。
- Consumer 注册,维护消费者组的 offset 以及消费者与分区的对应关系,实现负载均衡。
(2) 基本术语
- Producer:消息生产者。一般情况下,一条消息会被发送到特定的主题上。通常情况下,写入的消息会通过轮询将消息写入各分区。生产者也可以通过设定消息 key 值将消息写入指定分区。写入分区的数据越均匀 Kafka 的性能才能更好发挥。
- Topic:Topic 是个抽象的虚拟概念,一个集群可以有多个 Topic,作为一类消息的标识。一个生产者将消息发送到 topic,消费者通过订阅 Topic 获取分区消息。
- Partition:Partition 是个物理概念,一个 Topic 对应一个或多个 Partition。新消息会以追加的方式写入分区里,在同一个 Partition 里消息是有序的。Kafka 通过分区,实现消息的冗余和伸缩性,以及支持物理上的并发读、写,大大提高了吞吐量。
- Replicas:一个 Partition 有多个 Replicas 副本。这些副本保存在 broker,每个 broker 存储着成百上千个不同主题和分区的副本,存储的内容分为两种:master 副本,每个 Partition 都有一个 master 副本,所有内容的写入和消费都会经过 master 副本;follower 副本不处理任何客户端的请求,只同步 master 的内容进行复制。如果 master 发生了异常,很快会有一个 follower 成为新的 master。
- Consumer:消息读取者。消费者订阅主题,并按照一定顺序读取消息。Kafka 保证每个分区只能被一个消费者使用。
- Offset:偏移量是一种元数据,是不断递增的整数。在消息写入时 Kafka 会把它添加到消息里。在分区内偏移量是唯一的。消费过程中,会将最后读取的偏移量存储在 Kafka 中,消费者关闭偏移量不会丢失,重启会继续从上次位置开始消费。
- Broker:独立的 Kafka 服务器。一个 Topic 有 N 个 Partition,一个集群有 N 个 Broker,那么每个 Broker 都会存储一个这个 Topic 的 Partition。如果某 topic 有 N 个 partition,集群有(N+M)个 broker,那么其中有 N 个 broker 存储该 topic 的一个 partition,剩下的 M 个 broker 不存储该 topic 的 partition 数据。如果某 topic 有 N 个 partition,集群中 broker 数目少于 N 个,那么一个 broker 存储该 topic 的一个或多个 partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致 Kafka 集群数据不均衡。
2. Pulsar
(1) 系统框架
Pulsar 有三个重要的组件,Broker、BookKeeper 和ZooKeeper,Broker 是无状态服务,客户端需要连接到 Broker 上进行消息的传递。BookKeeper 与 ZooKeeper 是有状态服务。BookKeeper 的节点叫 Bookie,负责存储消息和游标,ZooKeeper 存储 Broker 和 Bookie 的元数据。Pulsar 以这种架构,实现存储和计算分离,Broker 负责计算,Bookie 负责有状态存储。
Pulsar 的多层架构影响了存储数据的方式。Pulsar 将 Topic 分区划分为分片(Segment),然后将这些分片存储在 Apache BookKeeper 的存储节点上,以提高性能、可伸缩性和可用性。Pulsar 的分布式日志以分片为中心,借助扩展日志存储(通过 Apache BookKeeper)实现,内置分层存储支持,因此分片可以均匀地分布在存储节点上。由于与任一给定 Topic 相关的数据都不会与特定存储节点进行捆绑,因此很容易替换存储节点或缩扩容。另外,集群中最小或最慢的节点也不会成为存储或带宽的短板。
(2) 基本术语
- Property:代表租户,每个 property 都可以代表一个团队、一个功能、一个产品线。一个 property 可包含多个 namesapce,多租户是一种资源隔离手段,可以提高资源利用率;
- Namespace:Pulsar 的基本管理单元,在 namaspace 级别可设置权限、消息 TTL、Retention 策略等。一个 namaspace 里的所有 topic 都继承相同的设置。命名空间分为两种:本地命名空间,只在集群内可见、全局命名空间对多个集群可见集群命名空间;
- Producer:数据生产方,负责创建消息并将消息投递到 Pulsar 中;
- Consumer:数据消费方,连接到 Pulsar 接收消息并进行相应的处理;
- Broker:无状态 Proxy 服务,负责接收消息、传递消息、集群负载均衡等操作,它对 client 屏蔽了服务端读写流程的复杂性,是保证数据一致性与数据负载均衡的重要角色。Broker 不会持久化保存元数据。可以扩容但不能缩容;
- BookKeeper:有状态,负责持久化存储消息。当集群扩容时,Pulsar 会在新增 BookKeeper 和 Segment(即 Bookeeper 的 Ledger),不需要像 kafka 一样在扩容时进行 Rebalance。扩容结果是 Fragments 跨多个 Bookies 以带状分布,同一个 Ledger 的 Fragments 分布在多个 Bookie 上,导致读取和写入会在多个 Bookies 之间跳跃;
- ZooKeeper:存储 Pulsar 、 BookKeeper 的元数据,集群配置等信息,负责集群间的协调、服务发现等;
- Topic:用作从 producer 到 consumer 传输消息。Pulsar 在 Topic 级别拥有一个 leader Broker,称之为拥有 Topic 的所有权,针对该 Topic 所有的 R/W 都经过该 Broker 完成。Topic 的 Ledger 和 Fragment 之间映射关系等元数据存储在 Zookeeper 中,Pulsar Broker 需要实时跟踪这些关系进行读写流程;
- Ledger:即 Segment,Pulsar 底层数据以 Ledger 的形式存储在 BookKeeper 上。是 Pulsar 删除的最小单位;
- Fragment :每个 Ledger 由若干 Fragment 组成。
3. RocketMQ
(1) 系统框架
RocketMQ 是阿里开源的消息中间件,它是一个开源的分布式消息传递和流式数据平台。总共有四大部分:NameServer,Broker,Producer,Consumer。
NameServer 主要用来管理 brokers 以及路由信息。broker 服务器启动时会注册到 NameServer 上,并且两者之间保持心跳监测机制,以此来保证 NameServer 知道 broker 的存活状态。而且,每一台 NameServer 都存有全部的 broker 集群信息和生产者/消费者客户端的请求信息。
Broker 负责管理消息存储分发,主从数据同步,为消息建立索引,提供消息查询等能力。
(2) 基本术语
- Topic:一个 Topic 可以有 0 个、1 个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。一个 Topic 也可以被 0 个、1 个、多个消费者订阅;
- Tag:消息二级类型,可以为用户提供额外的灵活度,一条消息可以没有 tag;
- Producer:消息生产者;
- Broker:存储消息,以 Topic 为纬度轻量级的队列;转发消息,单个 Broker 节点与所有的 NameServer 节点保持长连接及心跳,会定时将 Topic 信息注册到 NameServer;
- Consumer:消息消费者,负责接收并消费消息;
- MessageQueue:消息的物理管理单位,一个 Topic 可以有多个 Queue,Queue 的引入实现了水平扩展的能力;
- NameServer:负责对原数据的管理,包括 Topic 和路由信息,每个 NameServer 之间是没有通信的;
- Group:一个组可以订阅多个 Topic,ProducerGroup、ConsumerGroup 分别是一类生产者和一类消费者;
- Offset:通过 Offset 访问存储单元,RocketMQ 中所有消息都是持久化的,且存储单元定长。Offset 为 Java Long 类型,理论上 100 年内不会溢出,所以认为 Message Queue 是无限长的数据,Offset 是下标;
- Consumer:支持 PUSH 和 PULL 两种消费模式,支持集群消费和广播消费。
4. RabbitMQ
(1) 系统框架
RabbitMQ 基于 AMQP 协议来实现,主要由 Exchange 和 Queue 两部分组成,然后通过 RoutingKey 关联起来,消息投递到 Exchange 然后通过 Queue 接收。
(2) 基本术语
- Broker:接收客户端链接实体,实现 AMQP 消息队列和路由功能;
- Virtual Host:是一个虚拟概念,权限控制的最小单位。一个 Virtual Host 里包含多个 Exchange 和 Queue;
- Exchange:接收消息生产者的消息并将消息转发到队列。发送消息时根据不同 ExchangeType 的决定路由规则,ExchangeType 常用的有:direct、fanout 和 topic 三种;
- Message Queue:消息队列,存储为被消费的消息;
- Message:由 Header 和 Body 组成,Header 是生产者添加的各种属性,包含 Message 是否持久化、哪个 MessageQueue 接收、优先级。Body 是具体的消息内容;
- Binding:Binding 连接起了 Exchange 和 Message Queue。在服务器运行时,会生成一张路由表,这张路由表上记录着 MessageQueue 的条件和 BindingKey 值。当 Exchange 收到消息后,会解析消息中的 Header 得到 BindingKey,并根据路由表和 ExchangeType 将消息发送到对应的 MessageQueue。最终的匹配模式是由 ExchangeType 决定;
- Connection:在 Broker 和客户端之间的 TCP 连接;
- Channel:信道。Broker 和客户端只有 tcp 连接是不能发送消息的,必须创建信道。AMQP 协议规定只有通过 Channel 才能执行 AMQP 命令。一个 Connection 可以包含多个 Channel。之所以需要建立 Channel,是因为每个 TCP 连接都是很宝贵的。如果每个客户端、每个线程都需要和 Broker 交互,都需要维护一个 TCP 连接的话是机器耗费资源的,一般建议共享 Connection。RabbitMQ 不建议客户端线程之前共享 Channel,至少保证同一 Channel 发小消息是穿行的;
- Command:AMQP 命令,客户端通过 Command 来完成和 AMQP 服务器的交互。
5. NSQ
(1) 系统框架
NSQ 主要有 nsqlookup、nsqd 两部分组成:
- Nsqlookup 为守护进程,负责管理拓扑信息并提供发现服务。客户端通过查询 nsqlookupd 获取指定 Topic 所在的 nsqd 节点。nsqd 往 nsqlookup 上注册和广播自身 topic 和 channel 的信息。
- nsqd 在服务端运行的守护进程,负责接收,排队,投递消息给客户端。
NSQ 由 3 个守护进程组成:
- nsqd 是接收、队列和传送消息到客户端的守护进程。
- nsqlookupd 是管理的拓扑信息,并提供了最终一致发现服务的守护进程。客户端通过查询 nsqlookupd 获取指定 Topic 所在的 nsqd 节点。nsqd 往 nsqlookup 上注册和广播自身 topic 和 channel 的信息。
- nsqadmin是一个 Web UI 来实时监控集群(和执行各种管理任务)。
三、选型要点
1. 选型参考
- 消息顺序:发送到队列的消息,消费时是否可以保证消费的顺序;
- 伸缩:当消息队列性能有问题,比如消费太慢,是否可以快速支持扩容;当消费队列过多,浪费系统资源,是否可以支持缩容。
- 消息留存:消息消费成功后,是否还会继续保留在消息队列;
- 容错性:当一条消息消费失败后,是否有一些机制,保证这条消息一定能成功,比如异步第三方退款消息,需要保证这条消息消费掉,才能确定给用户退款成功,所以必须保证这条消息消费成功的准确性;
- 消息可靠性:是否会存在丢消息的情况,比如有 A/B 两个消息,最后只有 B 消息能消费,A 消息丢失;
- 消息时序:主要包括“消息存活时间”和“延迟消息”;
- 吞吐量:支持的最高并发数;
- 消息路由:根据路由规则,只订阅匹配路由规则的消息,比如有 A/B 两者规则的消息,消费者可以只订阅 A 消息,B 消息不会消费。
2. 消息队列对比
注:作为 LShift 和 CohesiveFT 于 2007 年成立的合资企业,RabbitMQ 于 2010 年 4 月被 VMware 旗下的 SpringSource 收购。
四、功能剖析
1. 消费推拉模式
客户端消费者获取消息的方式,Kafka 和 RocketMQ 是通过长轮询 Pull 的方式拉取消息,RabbitMQ、Pulsar、NSQ 都是通过 Push 的方式。
pull 类型的消息队列更适合高吞吐量的场景,允许消费者自己进行流量控制,根据消费者实际的消费能力去获取消息。而 push 类型的消息队列,实时性更好,但需要有一套良好的流控策略(backpressure)当消费者消费能力不足时,减少 push 的消费数量,避免压垮消费端。
2. 延迟队列
消息延迟投递,当消息产生送达消息队列时,有些业务场景并不希望消费者立刻收到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。延迟队列一般分为两种,基于消息的延迟和基于队列的延迟。基于消息的延迟指为每条消息设置不同的延迟时间,当队列有新消息进入的时候根据延迟时间排序,当然这样会对性能造成较大影响。另一种基于队列的延迟指的是设置不同延迟级别的队列,队列中每个消息的延迟时间都是相同的,这样免去了基于延迟时间排序对性能带来的损耗,通过一定的扫描策略即可投递超时的消息。
延迟消息的使用场景比如异常检测重试,订单超时取消等,例如:
- 服务请求异常,需要将异常请求放到单独的队列,隔 5 分钟后进行重试;
- 用户购买商品,但一直处于未支付状态,需要定期提醒用户支付,超时则关闭订单;
- 面试或者会议预约,在面试或者会议开始前半小时,发送通知再次提醒。
Kafka 不支持延迟消息。Pulsar 支持秒级的延迟消息,所有延迟投递的消息会被 Delayed Message Tracker 记录对应的 index,consumer 在消费时,会先去 Delayed Message Tracker 检查,是否有到期需要投递的消息,如果有到期的消息,则从 Tracker 中拿出对应的 index,找到对应的消息进行消费,如果没有到期的消息,则直接消费正常的消息。对于长时间的延迟消息,会被存储在磁盘中,当快到延迟间隔时才被加载到内存里。
RocketMQ 开源版本延迟消息临时存储在一个内部主题中,不支持任意时间精度,支持特定的 level,例如定时 5s,10s,1m 等。
RabbitMQ 需要安装一个 rabbitmq_delayed_message_exchange 插件。
NSQ 通过内存中的优先级队列来保存延迟消息,支持秒级精度,最多 2 个小时延迟。
3. 死信队列
由于某些原因消息无法被正确的投递,为了确保消息不会被无故的丢弃,一般将其置于一个特殊角色的队列,这个队列一般称之为死信队列。与此对应的还有一个“回退队列”的概念,试想如果消费者在消费时发生了异常,那么就不会对这一次消费进行确认(Ack), 进而发生回滚消息的操作之后消息始终会放在队列的顶部,然后不断被处理和回滚,导致队列陷入死循环。为了解决这个问题,可以为每个队列设置一个回退队列,它和死信队列都是为异常的处理提供的一种机制保障。实际情况下,回退队列的角色可以由死信队列和重试队列来扮演。
- Kafka 没有死信队列,通过 Offset 的方式记录当前消费的偏移量。
- Pulsar 有重试机制,当某些消息第一次被消费者消费后,没有得到正常的回应,则会进入重试 Topic 中,当重试达到一定次数后,停止重试,投递到死信 Topic 中。
- RocketMQ 通过 DLQ 来记录所有消费失败的消息。
- RabbitMQ 是利用类似于延迟队列的形式实现死信队列。
NSQ 没有死信队列。
4. 优先级队列
有一些业务场景下,我们需要优先处理一些消息,比如银行里面的金卡客户、银卡客户优先级高于普通客户,他们的业务需要优先处理。如下图:
优先级队列不同于先进先出队列,优先级高的消息具备优先被消费的特权,这样可以为下游提供不同消息级别的保证。不过这个优先级也是需要有一个前提的:如果消费者的消费速度大于生产者的速度,并且消息中间件服务器(一般简单的称之为 Broker)中没有消息堆积,那么对于发送的消息设置优先级也就没有什么实质性的意义了,因为生产者刚发送完一条消息就被消费者消费了,那么就相当于 Broker 中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。
Kafka、RocketMQ、Pulsar、NSQ 不支持优先级队列,可以通过不同的队列来实现消息优先级。
RabbitMQ 支持优先级消息。
5. 消息回溯
一般消息在消费完成之后就被处理了,之后再也不能消费到该条消息。消息回溯正好相反,是指消息在消费完成之后,还能消费到之前被消费掉的消息。对于消息而言,经常面临的问题是“消息丢失”,至于是真正由于消息中间件的缺陷丢失还是由于使用方的误用而丢失一般很难追查,如果消息中间件本身具备消息回溯功能的话,可以通过回溯消费复现“丢失的”消息进而查出问题的源头之所在。消息回溯的作用远不止与此,比如还有索引恢复、本地缓存重建,有些业务补偿方案也可以采用回溯的方式来实现。
- Kafka 支持消息回溯,可以根据时间戳或指定 Offset,重置 Consumer 的 Offset 使其可以重复消费。
- Pulsar 支持按时间对消息进行回溯。
- RocketMQ 支持按时间回溯,实现的原理跟 Kafka 一致。
- RabbitMQ 不支持回溯,消息一旦标记确认就会被标记删除。
- NSQ 一般消息是不可回溯的,但可以通过 nsq_to_file 工具,将消息写入到文件,然后从文件里重放消息。
6. 消息持久化
流量削峰是消息中间件的一个非常重要的功能,而这个功能其实得益于其消息堆积能力。从某种意义上来讲,如果一个消息中间件不具备消息堆积的能力,那么就不能把它看做是一个合格的消息中间件。消息堆积分内存式堆积和磁盘式堆积。一般来说,磁盘的容量会比内存的容量要大得多,对于磁盘式的堆积其堆积能力就是整个磁盘的大小。从另外一个角度讲,消息堆积也为消息中间件提供了冗余存储的功能。
Kafka 和 RocketMQ 直接将消息刷入磁盘文件中进行持久化,所有的消息都存储在磁盘中。只要磁盘容量够,可以做到无限消息堆积。
RabbitMQ 是典型的内存式堆积,但这并非绝对,在某些条件触发后会有换页动作来将内存中的消息换页到磁盘(换页动作会影响吞吐),或者直接使用惰性队列来将消息直接持久化至磁盘中。
Pulsar 消息是存储在 BookKeeper 存储集群上,也是磁盘文件。
NSQ 通过 nsq_to_file 工具,将消息写入到文件。
7. 消息确认机制
消息队列需要管理消费进度,确认消费者是否成功处理消息,使用 push 的方式的消息队列组件往往是对单条消息进行确认,对于未确认的消息,进行延迟重新投递或者进入死信队列。
Kafka通过 Offset 的方式确认消息:
- 发送方确认机制 ack=0,不管消息是否成功写入分区 ack=1,消息成功写入首领分区后,返回成功 ack=all,消息成功写入所有分区后,返回成功。
- 接收方确认机制 自动或者手动提交分区偏移量,早期版本的 kafka 偏移量是提交给 Zookeeper 的,这样使得 zookeeper 的压力比较大,更新版本的 kafka 的偏移量是提交给 kafka 服务器的,不再依赖于 zookeeper 群组,集群的性能更加稳定。
RocketMQ与 Kafka 类似也会提交 Offset,区别在于消费者对于消费失败的消息,可以标记为消息消费失败,Broker 会重试投递,如果累计多次消费失败,会投递到死信队列。
RabbitMQ和 NSQ 类似,消费者确认单条消息,否则会重新放回队列中等待下次投递:
- 发送方确认机制,消息被投递到所有匹配的队列后,返回成功。如果消息和队列是可持久化的,那么在写入磁盘后,返回成功。支持批量确认和异步确认。
- 接收方确认机制,设置 autoAck 为 false,需要显式确认,设置 autoAck 为 true,自动确认。当 autoAck 为 false 的时候,rabbitmq 队列会分成两部分,一部分是等待投递给 consumer 的消息,一部分是已经投递但是没收到确认的消息。如果一直没有收到确认信号,并且 consumer 已经断开连接,rabbitmq 会安排这个消息重新进入队列,投递给原来的消费者或者下一个消费者。未确认的消息不会有过期时间,如果一直没有确认,并且没有断开连接,rabbitmq 会一直等待,rabbitmq 允许一条消息处理的时间可以很久很久。
Pulsar使用专门的 Cursor 管理。累积确认和 Kafka 效果一样;提供单条或选择性确认。
8. 消息 TTL
消息 TTL 表示一条消息的生存时间,如果消息发出来后,在 TTL 的时间内没有消费者进行消费,消息队列会将消息删除或者放入死信队列中。
Kafka 根据设置的保留期来删除消息。有可能消息没被消费,过期后被删除。不支持 TTL。
Pulsar 支持 TTL,如果消息未在配置的 TTL 时间段内被任何消费者使用,则消息将自动标记为已确认。消息保留期与消息 TTL 之间的区别在于:消息保留期作用于标记为已确认并设置为已删除的消息,而 TTL 作用于未 ack 的消息。上面的图例中说明了 Pulsar 中的 TTL。例如,如果订阅 B 没有活动消费者,则在配置的 TTL 时间段过后,消息 M10 将自动标记为已确认,即使没有消费者实际读取该消息。
RocketMQ 提及到消息 TTL 的资料比较少,不过看接口似乎是支持的。
RabbitMQ 有两种方式,一个是声明队列的时候在队列属性中设置,整个队列中的消息都有相同的有效期。还可以发送消息的时候给消息设置属性,可以位每条消息都设置不同的 TTL。
NSQ 似乎还没支持,有一个 Feature Request 的 Issue 处于 Open 状态。
9. 多租户隔离
多租户是指通过一个软件实例为多个租户提供服务的能力。租户是指对系统有着相同“视图”的一组用户。不支持多租户的系统里边,往往要为不同用户或者不同集群创建多个消息队列实例实现物理隔离,这样会带来较高的运维成本。作为一种企业级的消息系统,Pulsar 的多租户能力按照设计可满足下列需求:
- 确保严苛的 SLA 可顺利满足。
- 保证不同租户之间的隔离。
- 针对资源利用率强制实施配额。
- 提供每租户和系统级的安全性。
- 确保低成本运维以及尽可能简单的管理。
Pulsar 通过下列方式满足了上述需求:
- 通过为每个租户进行身份验证、授权和 ACL(访问控制列表)获得所需安全性。
- 为每个租户强制实施存储配额。
- 以策略的方式定义所有隔离机制,策略可在运行过程中更改,借此降低运维成本并简化管理工作。
10. 消息顺序性
消息顺序性是指保证消息有序。消息消费顺序跟生产的顺序保持一致。
Kafka 保证了分区内的消息有序。
Pulsar 支持两种消费模式,独占订阅的流模式只保证了消息的顺序性,共享订阅队列模型不保证有序性。
RocketMQ 需要用到锁来保证一个队列同时只有一个消费者线程进行消费,保证消息的有序性。
RabbitMQ 顺序性的条件比较苛刻,需要单线程发送、单线程消费,并且不采用延迟队列、优先级队列等高级功能。
NSQ 是利用了 golang 自身的 case/select 实现的消息分发,本身不提供有序性保障,不能够把特性消息和消费者对应起来,无法实现消息的有序性。
11. 消息查询
在实际开发中,经常要查看 MQ 中消息的内容,比如通过某个 MessageKey/ID,查询到 MQ 的具体消息。或者是对消息进行链路追踪,知道消息从哪里来,发送到哪里去,进而快速对问题进行排查定位。
Kafka 存储层是以分布式提交日志的形式实现,每次写操作都顺序追加到日志的末尾。读也是顺序读。不支持检索功能。
Pulsar 可以通过消息 ID,查询到具体某条消息的消息内容、消息参数和消息轨迹。
RocketMQ 支持按 Message Key、Unique Key、Message Id 对消息进行查询。
RabbitMQ 使用基于索引的存储系统。这些将数据保存在树结构中,以提供确认单个消息所需的快速访问。由于 RabbitMQ 的消息在确认后会被删除,因此只能查询未确认的消息。
NSQ 自身不支持消息持久化和消息检索,不过可以使用 nsq_to_http 等工具将消息写入可支持索引的存储里。
12. 消费模式
Kafka 有两种消费模式,最终都会保证一个分区只有 1 个消费者在消费:
- subscribe 方式:当主题分区数量变化或者 consumer 数量变化时,会进行 rebalance;注册 rebalance 监听器,可以手动管理 offset 不注册监听器,kafka 自动管理。
- assign 方式:手动将 consumer 与 partition 进行对应,kafka 不会进行 rebanlance。
Pulsar 有以下四种消费模式,其中独占模式和灾备模式跟 Kafka 类似,为流模型,每个分区只有 1 个消费者消费,能保证消息有序性。共享模式和 Key 共享模式为队列模型,多个消费者能提高消费速度,但不能保证有序性。
- Exclusive 独占模式(默认模式):一个 Subscription 只能与一个 Consumer 关联,只有这个 Consumer 可以接收到 Topic 的全部消息,如果该 Consumer 出现故障了就会停止消费。
- 灾备模式(Failover):当存在多个 consumer 时,将会按字典顺序排序,第一个 consumer 被初始化为唯一接受消息的消费者。当第一个 consumer 断开时,所有的消息(未被确认和后续进入的)将会被分发给队列中的下一个 consumer。
- 共享模式(Shared):消息通过 round robin 轮询机制(也可以自定义)分发给不同的消费者,并且每个消息仅会被分发给一个消费者。当消费者断开连接,所有被发送给他,但没有被确认的消息将被重新安排,分发给其它存活的消费者。
- KEY 共享模式(Key_Shared):当存在多个 consumer 时,将根据消息的 key 进行分发,key 相同的消息只会被分发到同一个消费者。
RocketMQ 有两种消费模式,BROADCASTING 广播模式,CLUSTERING 集群模式。
- 广播消费指的是:一条消息被多个 consumer 消费,即使这些 consumer 属于同一个 ConsumerGroup,消息也会被 ConsumerGroup 中的每个 Consumer 都消费一次,广播消费中 ConsumerGroup 概念可以认为在消息划分方面无意义。
- 集群消费模式:一个 ConsumerGroup 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个 ConsumerGroup 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中部分,消费完的消息不能被其他实例消费。
RabbitMQ 和 NSQ 的消费比较类似,都是跟 Pulsar 共享模式类似的,队列的形式,增加一个消费者组里的消费者数量能提高消费速度。
13. 消息可靠性
消息丢失是使用消息中间件时所不得不面对的一个同点,其背后消息可靠性也是衡量消息中间件好坏的一个关键因素。尤其是在金融支付领域,消息可靠性尤为重要。比如当服务出现故障时,一些对于生产者来说已经生产成功的消息,是否会在高可用切换时丢失。同步刷盘是增强一个组件可靠性的有效方式,消息中间件也不例外,Kafka 和 RabbitMQ 都可以支持同步刷盘,但绝大多数情景下,一个组件的可靠性不应该由同步刷盘这种极其损耗性能的操作来保障,而是采用多副本的机制来保证。
Kafka 可以通过配置 request.required.acks 参数设置可靠级别,表示一条消息有多少个副本确认接收成功后,才被任务发送成功。
- request.required.acks=-1 (全量同步确认,强可靠性保证)
- request.required.acks=1(leader 确认收到,默认)
- request.required.acks=0 (不确认,但是吞吐量大)
Pulsar 有跟 Kafka 类似的概念,叫 Ack Quorum Size(Qa),Qa 是每次写请求发送完毕后需要回复确认的 Bookie 的个数,其数值越大则需要确认写成功的时间越长,其值上限是副本数 Qw。为了一致性,Qa 应该是:(Qw+1)/2 或者更,即为了确保数据安全性,Qa 下限是 (Qw+1)/2。
RocketMQ 与 Kafka 类似。
- RabbitMQ 是主从架构,通过镜像环形队列实现多副本及强一致性语义的。多副本可以保证在 master 节点宕机异常之后可以提升 slave 作为新的 master 而继续提供服务来保障可用性。
- NSQ 会通过 go-diskqueue 组件将消息落盘到本地文件中,通过 mem-queue-size 参数控制内存中队列大小,如果 mem-queue-size=0 每条消息都会存储到磁盘里,不用担心节点重启引起的消息丢失。但由于是存储在本地磁盘中,如果节点离线,堆积在节点磁盘里的消息会丢失。
14. 负载均衡
Kafka:支持负载均衡。一个 broker 通常就是一台服务器节点。对于同一个 Topic 的不同分区,Kafka 会尽力将这些分区分布到不同的 Broker 服务器上,zookeeper 保存了 broker、主题和分区的元数据信息。分区首领会处理来自客户端的生产请求,kafka 分区首领会被分配到不同的 broker 服务器上,让不同的 broker 服务器共同分担任务。
每一个 broker 都缓存了元数据信息,客户端可以从任意一个 broker 获取元数据信息并缓存起来,根据元数据信息知道要往哪里发送请求。
kafka 的消费者组订阅同一个 topic,会尽可能地使得每一个消费者分配到相同数量的分区,分摊负载。
当消费者加入或者退出消费者组的时候,还会触发再均衡,为每一个消费者重新分配分区,分摊负载。
kafka 的负载均衡大部分是自动完成的,分区的创建也是 kafka 完成的,隐藏了很多细节,避免了繁琐的配置和人为疏忽造成的负载问题。
发送端由 topic 和 key 来决定消息发往哪个分区,如果 key 为 null,那么会使用轮询算法将消息均衡地发送到同一个 topic 的不同分区中。如果 key 不为 null,那么会根据 key 的 hashcode 取模计算出要发往的分区。
rabbitmq:对负载均衡的支持不好。消息被投递到哪个队列是由交换器和 key 决定的,交换器、路由键、队列都需要手动创建。
rabbitmq 客户端发送消息要和 broker 建立连接,需要事先知道 broker 上有哪些交换器,有哪些队列。通常要声明要发送的目标队列,如果没有目标队列,会在 broker 上创建一个队列,如果有,就什么都不处理,接着往这个队列发送消息。假设大部分繁重任务的队列都创建在同一个 broker 上,那么这个 broker 的负载就会过大。(可以在上线前预先创建队列,无需声明要发送的队列,但是发送时不会尝试创建队列,可能出现找不到队列的问题,rabbitmq 的备份交换器会把找不到队列的消息保存到一个专门的队列中,以便以后查询使用)
使用镜像队列机制建立 rabbitmq 集群可以解决这个问题,形成 master-slave 的架构,master 节点会均匀分布在不同的服务器上,让每一台服务器分摊负载。slave 节点只是负责转发,在 master 失效时会选择加入时间最长的 slave 成为 master。
当新节点加入镜像队列的时候,队列中的消息不会同步到新的 slave 中,除非调用同步命令,但是调用命令后,队列会阻塞,不能在生产环境中调用同步命令。
当 rabbitmq 队列拥有多个消费者的时候,队列收到的消息将以轮询的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者,不会重复。
这种方式非常适合扩展,而且是专门为并发程序设计的。
如果某些消费者的任务比较繁重,那么可以设置 basicQos 限制信道上消费者能保持的最大未确认消息的数量,在达到上限时,rabbitmq 不再向这个消费者发送任何消息。
对于 rabbitmq 而言,客户端与集群建立的 TCP 连接不是与集群中所有的节点建立连接,而是挑选其中一个节点建立连接。
但是 rabbitmq 集群可以借助 HAProxy、LVS 技术,或者在客户端使用算法实现负载均衡,引入负载均衡之后,各个客户端的连接可以分摊到集群的各个节点之中。
客户端均衡算法:
- 轮询法。按顺序返回下一个服务器的连接地址。
- 加权轮询法。给配置高、负载低的机器配置更高的权重,让其处理更多的请求;而配置低、负载高的机器,给其分配较低的权重,降低其系统负载。
- 随机法。随机选取一个服务器的连接地址。
- 加权随机法。按照概率随机选取连接地址。
- 源地址哈希法。通过哈希函数计算得到的一个数值,用该数值对服务器列表的大小进行取模运算。
- 最小连接数法。动态选择当前连接数最少的一台服务器的连接地址。
zeromq:去中心化,不支持负载均衡。本身只是一个多线程网络库。
rocketmq:支持负载均衡。一个 broker 通常是一个服务器节点,broker 分为 master 和 slave,master 和 slave 存储的数据一样,slave 从 master 同步数据。
nameserver 与每个集群成员保持心跳,保存着 Topic-Broker 路由信息,同一个 topic 的队列会分布在不同的服务器上。
发送消息通过轮询队列的方式发送,每个队列接收平均的消息量。发送消息指定 topic、tags、keys,无法指定投递到哪个队列(没有意义,集群消费和广播消费跟消息存放在哪个队列没有关系)。
tags 选填,类似于 Gmail 为每封邮件设置的标签,方便服务器过滤使用。目前只支 持每个消息设置一个 tag,所以也可以类比为 Notify 的 MessageType 概念。
keys 选填,代表这条消息的业务关键词,服务器会根据 keys 创建哈希索引,设置后, 可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引,请尽可能 保证 key 唯一,例如订单号,商品 Id 等。
rocketmq 的负载均衡策略规定:Consumer 数量应该小于等于 Queue 数量,如果 Consumer 超过 Queue 数量,那么多余的 Consumer 将不能消费消息。这一点和 kafka 是一致的,rocketmq 会尽可能地为每一个 Consumer 分配相同数量的队列,分摊负载。
activemq:支持负载均衡。可以基于 zookeeper 实现负载均衡。
15. 集群方式
Kafka:天然的‘Leader-Slave’无状态集群,每台服务器既是 Master 也是 Slave。分区首领均匀地分布在不同的 kafka 服务器上,分区副本也均匀地分布在不同的 kafka 服务器上,所以每一台 kafka 服务器既含有分区首领,同时又含有分区副本,每一台 kafka 服务器是某一台 kafka 服务器的 Slave,同时也是某一台 kafka 服务器的 leader。
kafka 的集群依赖于 zookeeper,zookeeper 支持热扩展,所有的 broker、消费者、分区都可以动态加入移除,而无需关闭服务,与不依靠 zookeeper 集群的 mq 相比,这是最大的优势。
rabbitmq:支持简单集群,'复制'模式,对高级集群模式支持不好。
rabbitmq 的每一个节点,不管是单一节点系统或者是集群中的一部分,要么是内存节点,要么是磁盘节点,集群中至少要有一个是磁盘节点。
在 rabbitmq 集群中创建队列,集群只会在单个节点创建队列进程和完整的队列信息(元数据、状态、内容),而不是在所有节点上创建。
引入镜像队列,可以避免单点故障,确保服务的可用性,但是需要人为地为某些重要的队列配置镜像。
- zeromq:去中心化,不支持集群。
- rocketmq:常用 多对'Master-Slave' 模式,开源版本需手动切换 Slave 变成 Master
Name Server 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
Broker 部署相对复杂,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。每个 Broker 与 Name Server 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 Name Server。
Producer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。
Consumer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。
客户端先找到 NameServer, 然后通过 NameServer 再找到 Broker。
一个 topic 有多个队列,这些队列会均匀地分布在不同的 broker 服务器上。rocketmq 队列的概念和 kafka 的分区概念是基本一致的,kafka 同一个 topic 的分区尽可能地分布在不同的 broker 上,分区副本也会分布在不同的 broker 上。
rocketmq 集群的 slave 会从 master 拉取数据备份,master 分布在不同的 broker 上。
activemq:支持简单集群模式,比如'主-备',对高级集群模式支持不好。
五、性能
Kafka 的公司 Confluent 在 2020 年 8 月发了一篇 Benchmarking Apache Kafka, Apache Pulsar, and RabbitMQ: Which is the Fastest?文章,并且提出了一个开源的 MQ Benchmark 框架 THE OPENMESSAGING BENCHMARK FRAMEWORK,在这个文档里,对比了 Kafka、Pulsar、RabbitMQ 的吞吐量、端到端延迟等性能数据。最后得出结论 Kafka 相对来说性能最好。
但接下来 StreamNative 在 2020 年 12 月指出了 Confluence 的基准测试的一些问题,并对 Pulsar 进行了参数调优之后重新执行了一遍结果,测试报告展示 Pulsar 能达到跟 Kafka 同样的吞吐量,在某些场景下,Pulsar 的延迟显著低于 Kafka。
而且在性能测试上,有很多客户端、服务端参数设置、机器性能配置等影响,比如消息可靠性级别,压缩算法等,很难做到“完全”控制变量公平的测试。而且 OpenMessaging Benchmark 的开源 Github 的 Readme 上也提到了。
不过有几个关注点:
- RabbitMQ 的延迟是微秒级的,其他组件的延迟都是毫秒级,RabbitMQ 应该是 MQ 组件里相对来说较低的。
- Kafka 单实例在主题/分区数比较多的情况下,性能会明显降低。
- kafka 是一个分区一个文件,当 topic 过多,分区的总量也会增加,kafka 中存在过多的文件,当对消息刷盘时,就会出现文件竞争磁盘,出现性能的下降。
- 还有 Kafka 每个消费者加入或退出都会进行重平衡,当分区数比较多时重平衡可能耗时较久,在重平衡的阶段消费者是不能消费消息的。
- 而 Pulsar 由于存储与计算分离的架构,使得它可以支持百万级别的 Topic 数量。
Pulsar 和 Kafka 都被广泛用于各个企业,也各有优势,都能通过数量基本相同的硬件处理大流量。部分用户误以为 Pulsar 使用了很多组件,因此需要很多服务器来实现与 Kafka 相匹敌的性能。这种想法适用于一些特定硬件配置,但在多数资源配置相同的情况中,Pulsar 的优势更加明显,可以用相同的资源实现更好的性能。举例来说,Splunk 最近分享了他们选择 Pulsar 放弃 Kafka 的原因,其中提到“由于分层架构,Pulsar 帮助他们将成本降低了 30%-50%,延迟降低了 80%-98%,运营成本降低了 33%-50%”。Splunk 团队发现 Pulsar 可以更好地利用磁盘 IO,降低 CPU 利用率,同时更好地控制内存。
在分布式系统里,单机性能指标虽然也很重要,分布式系统整体的性能以及灵活扩缩容、高可用容灾等能力也会是评估的一个重要参考。MQ 中间件具体的性能指标,也需要我们自己根据实际的情况,根据实际购买的集群配置和客户端参数,进行压测调优来评估。
六、运维
在使用过程中难免会出现各种异常情况,比如宕机、网络抖动、扩容等。消息队列具备异地容灾,高可用架构等能力,能避免一些计算节点、网络等基础设施不可用导致的故障。
1. 高可用
Kafka 通过分区多副本的方式解决高可用问题。
Pulsar 的计算集群 Broker 是无状态的,可以灵活扩缩容,存储节点 Bookie 上通过消息分区分片副本的方式,每个分片都有一个或多个副本,保证在某一个 Bookie 挂掉后,有其他分片可以提供服务。
RocketMQ 和 RabbitMQ 都是主从架构,当 master 挂掉后,由原来的从节点继续提供服务。备机提供消费服务,保证消息不丢,但不提供写服务。
NSQ 是类似分布式架构,不过由于消息存储是在节点本地磁盘上,如果一个节点离线,堆积在节点磁盘上的消息会丢失。
2. 跨地域容灾
Pulsar 原生支持跨地域容灾功能,在这个图中,每当 P1、P2 和 P3 的生产者分别向 Cluster-A、Cluster-B 和 Cluster-C 中的 T1 topic 发送消息时,这些消息很快在不同的集群中复制。一旦消息完成复制,消费者 C1 和 C2 会从各自的集群消费到这个消息。
在这个跨地域容灾的设计支撑下,其一,我们可以比较容易的将服务分散到多个机房;其二,可以应对机房级别的故障,即在一个机房不可用的情况下,服务可以转接到其它的机房来继续对外提供服务。
一句话概括,Pulsar 的跨地域复制,其实就是在一个本地集群中创建一个 Producer,把异地的集群作为这个 Producer 的发送地址,将本地集群的消息发送过去,并且在本地维护一个 Cusor 来保证消息可靠性和幂等性。
3. 集群扩容
当消息量突然上涨,消息队列集群到达瓶颈的时候,需要对集群进行扩容,扩容一般分为水平扩容和垂直扩容两种方式,水平扩容指的是往往集群中增加节点,垂直扩容指的是把集群中部分节点的配置调高,增加处理能力。
Kafka 集群由于主题分区是物理存储在 Broker 节点上的,新加入的集群的节点并没有存储分区分片,也就无法提供马上提供服务,因此需要把一些 Topic 的分区分配到新加入的节点里,这里会涉及到一个分区数据均衡的过程,将某些分区的数据复制到新节点上。这个过程跟分区当前堆积的数据量、Broker 性能有关,有可能会出现由于源 Broker 负载过高,堆积数据过大,导致数据均衡的时间变长。
Pulsar 的无限分布式日志以分片为中心,借助扩展日志存储(通过 Apache BookKeeper)实现,内置分层存储支持,因此分片可以均匀地分布在存储节点上。由于与任一给定 topic 相关的数据都不会与特定存储节点进行捆绑,因此很容易替换存储节点或缩扩容。另外,集群中最小或最慢的节点也不会成为存储或带宽的短板。
RocketMQ 新节点直接加入到集群中,在新的 broker 创建新 topic 并且分配队列,或者在已有 topic 基础上分配队列。与 Kafka 的区别是,Kafka 的分区是在不同的物理机器上,而 Rocketmq 是逻辑分区,用的队列形式,因此不存在出现数据不均衡的情况。
RabbitMQ 和 NSQ 类似,由于不涉及过多的消息持久化,直接往集群中增加节点。
4. 使用成本
Kafka/Pulsar/RocketMQ/RabbitMQ 在腾讯云上都上线了标准产品,可以直接购买创建实例(产品选型),能大大降低部署运维成本。而 NSQ 目前暂时还没有上线,需要自行部署。
七、常见问题 & 使用场景
1. Kafka
日志收集:大量的日志消息先写入 kafka,数据服务通过消费 kafka 消息将数据落地;
2. RocketMQ
为金融互联网领域而生,对于可靠性要求很高的场景。
3. 普通消息
消息队列最基础的功能就是生产者发送消息、Broker 保存消息,消费者来消费消息,以此实现系统解耦、削峰填谷的作用。
普通消息是消息队列必备的消息类型,也是系统使用场景最多的一种消息。
4. 顺序消息
顺序消息是指生产者发送消息的顺序和消费者消费消息的顺序是一致的。比如在一个电商场景,同一个用户提交订单、订单支付、订单出库,这三个消息消费者需要按照顺序来进行消费。如下图:
顺序消息的实现并不容易,原因如下:
- 生产者集群中,有多个生产者发送消息,网络延迟不一样,很难保证发送到 Broker 的消息落盘顺序是一致的;
- 如果 Broker 有多个分区或队列,生产者发送的消息会进入多个分区,也无法保证顺序消费;
- 如果有多个消费者来异步消费同一个分区,很难保证消费顺序跟生产者发送顺序一致。
要保证消息有序,需要满足两个条件:
- 同一个生产者必须同步发送消息到同一个分区;
- 一个分区只能给同一个消费者消费。
如下图:
上面第二个条件是比较容易实现的,一个分区绑定一个消费者就可以,主要是第一个条件。
在主流消息队列的实现中,Kafka 和 Pulsar 的实现方式类似,生产者给消息赋值一个 key,对 key 做 Hash 运算来指定消息发送到哪一个分区。比如上面电商的例子,对同一个用户的一笔订单,提交订单、订单支付、订单出库这三个消息赋值同一个 key,就可以把这三条消息发送到同一个分区。
对于 RocketMQ,生产者在发送消息的时候,可以通过 MessageQueueSelector 指定把消息投递到那个 MessageQueue,如下图:
5. 延时消息
或者也叫定时消息,是指消息发送后不会立即被消费,而是指定一个时间,到时间后再消费。经典的场景比如电商购物时,30 分钟未支付订单,让订单自动失效。
(1) RocketMQ 实现
RocketMQ 定义了 18 个延时级别,每个延时级别对应一个延时时间。下面如果延迟级别是 3,则消息会延迟 10s 才会拉取。
RocketMQ 的延时消息如下图:
生产者把消费发送到 Broker 后,Broker 首先把消息保存到 SCHEDULE_TOPIC_XXXX 这个 Topic,然后调度任务会判断是否到期,如果到期,会把消息从 SCHEDULE_TOPIC_XXXX 取出投递到原始的 queue,这样消费者就可以消费到了。
RocketMQ 的延时消息只支持最大两个小时的延时,不过 RocketMQ5.0 基于时间轮算法实现了定时消息,解决了这个问题。
(2) Pulsar 实现
Pulsar 的实现如下图:
Pulsar 的延时消息首先会写入一个 Delayed Message Tracker 的数据结构中,Delayed Message Tracker 根据延时时间构建 delayed index 优先级队列。消费者拉取消息时,首先去 Delayed Message Tracker 检查是否有到期的消息。如果有则直接拉取进行消费。
(3) RabbitMQ 实现
RabbitMQ 的实现方式有两种,一种是投递到普通队列都不消费,等消息过期后被投递到死信队列,消费者消费死信队列。如下图:
第二种方式是生产者发送消息时,先发送到本地 Mnesia 数据库,消息到期后定时器再将消息投递到 broker。
(4) Kafka 实现
Kafka 本身并没有延时队列,不过可以通过生产者拦截器来实现消息延时发送,也可以定义延时 Topic,利用类似 RocketMQ 的方案来实现延时消息。
6. 事务消息
事务消息是指生产消息和消费消息满足事务的特性。
RabbitMQ 和 Kafka 的事务消息都是只支持生产消息的事务特性,即一批消息要不全部发送成功,要不全部发送失败。
RabbitMQ 通过 Channel 来开启事务消息,代码如下:
ConnectionFactory factory=new ConnectionFactory();
connection=factory.newConnection();
Channel channel=connection.createChannel();
//开启事务
channel.txSelect();
channel.basicPublish("directTransactionExchange","transactionRoutingKey",null,message.getBytes("utf-8"));
//提交事务 或者 channel.txRollback()回滚事务
channel.txCommit();
Kafka 可以给多个生产者设置同一个事务 ID ,从而把多个 Topic 、多个 Partition 放在一个事务中,实现原子性写入。
Pulsar 的事务消息对于事务语义的定义是:允许事件流应用将消费、处理、生产消息整个过程定义为一个原子操作。可见,Pulsar 的事务消息可以覆盖消息流整个过程。
RocketMQ 的事务消息是通过 half 消息来实现的。以电商购物场景来看,账户服务扣减账户金额后,发送消息给 Broker,库存服务来消费这条消息进行扣减库存。如下图:
可见,RocketMQ 只能保证生产者发送消息和本地事务的原子性,并不能保证消费消息的原子性。
7. 轨迹消息
轨迹消息主要用于跟踪消息的生命周期,当消息丢失时可以很方便地找出原因。
轨迹消息也跟普通消息一样,也需要存储和查询,也会占用消息队列的资源,所以选择轨迹消息要考虑下面几点:
- 消息生命周期的关键节点一定要记录;
- 不能影响正常消息的发送和消费性能;
- 不能影响 Broker 的消息存储性能;
- 要考虑消息查询维度和性能。
RabbitMQ Broker 实现了轨迹消息的功能,打开 Trace 开关,就可以把轨迹消息发送到 amq.rabbitmq.trace 这个 exchange,但是要考虑轨迹消息会不会给 Broker 造成 压力进而导致消息积压。RabbitMQ 的生产者和消费者都没有实现轨迹消息,需要开发者自己来实现。
RocketMQ 生产者、Broker 和消费者都实现了轨迹消息,不过默认是关闭的,需要手工开启。
使用轨迹消息,需要考虑记录哪些节点、存储介质、性能、查询方式等问题。
8. Kafka 是否会消息丢失?
(1)只对“已提交”的消息做有限度的持久化保证
- 已提交的消息:消息写入日志文件
- 有限度的持久化保证:N 个 broker 至少一个存活
(2)生产者丢失数据
- producer.send(msg) 异步发送消息,不保证数据到达 Kafka
- producer.send(msg, callback) 判断回调
(3) 消费者程序丢失数据
- 应该「先消费消息,后更新位移的顺序」
- 新问题:消息的重复处理
- 多线程异步处理消息,Consumer 不要开启自动提交位移,应用程序手动提交位移
9. Kafka 如何持久化?
(1)消息日志(Log)保存数据,磁盘追加写(Append-only)
- 避免缓慢的随机 I/O 操作
- 高吞吐
(2)定期删除消息(日志段)
10. Kafka 文件存储机制
(1)每个 partition 相当于一个巨型文件 → 多个大小相等 segment 数据文件中
(2)每个 partition 只需要顺序读写就行了,segment 文件生命周期由配置决定
(3)segment file 组成:
- index file:索引文件
- data file:数据文件
(4)segment file 文件命名规则:
- 全局第一个 segment 是 0
- 后序每个加上全局 partition 的最大 offset
一对 segment file
message 物理结构
11. Kafka 分区
为什么分区?
- Kafka 的消息组织方式:主题-分区-消息
- 一条消息,仅存在某一个分区中
- 提高伸缩性,不同分区可以放到不同机器,读写操作也是以分区粒度
分区策略?
- 轮询
- 随机
- 按 key 保序,单分区有序
12. MQ 消息堆积问题处理
消息堆积可能的原因: 队列中消息不能被及时的消费,导致大量堆积在队列里面 rocketMq Kafka RabbitMq 都会有这样的问题 产生消息堆积的可以从 mq 的生产消费模型去考虑,从生产者到消息中间件、再到消费者,都会发生堆积。
- 消费者:消费者处理速度过慢,或者消费者故障、延迟,无法即使的处理消息,导致消息堆积
- 生产者:生产者产生速度过快,消费者无法即使处理
- MQ 消息队列:Mq 服务器的性能不足,比如它所在的机器,cpu、内存、磁盘等超载,无法即使的处理消息,导致消息堆积
- 其他:其他方面也会有这样的问题, 比如网络故障,连接问题,消息在传递过程中过慢,从而导致消息堆积 业务方面,消息消费失败重试,不断的重试,没有设置重试次数,导致消息堆积。
处理消息堆积问题:
(1)消费者:
- 增加消费者的数量,提高消费的处理速度;(注意这个不通用,只适合 RabbitMq) 需要注意不能一味的水平扩展消费者 因为其他关键链路性能是否抗的住大量的水平扩展,比如 mysq、redis,详细见下方 rabbitmq 消息堆积解决方案
- 或者提高消费者的处理能力,比如通过并发处理、异步处理提高消费者吞吐量。这个则要注意通过线程池、队列,把 mq 拉到程序的队列中,要承担对应的宕机导致消息丢失风险。
(2)MQ 消息队列: 增加 MQ 的服务器资源,cpu、内存、磁盘,提高 mq 处理能力 也可以通过分区队列将消息分散到多个队列中,提高整体的处理能力。(这个则是 Kafka、Rocket 采用的)
控制队列容量,避免堆积过多,设置持久化策略。rabbitMQ 的懒加载队列,兼顾了持久化和堆积上限
(3)监控告警(重要) 设置监控系统,比如普罗米修斯,监控消息数量,消费者处理速度,队列状态等等,在堆积发生前,即使的告警,及时采取措施。
But 上面的策略是通用的一些解决方案,不同的 MQ,生产消费模型是不一样的,导致需要针对不同 mq 的消息堆积解决方案不一样。
RabbitMq、Kafka、RocketMq 发生消息堆积,分别该如何去解决?
这里先点一下,增加消费者数量,并不是通用的,只适合 RabbitMq。
总结
Kafka 与 Pulsar 都是腾讯云主打的消息队列中间件,都具有高性能,高可靠,支持多种场景。Kafka 推出的时间较早,各种场景比如日志、大数据处理等都有较成熟的解决方案。而 Pulsar 作为一个新秀,支持的功能比 CKafka 更丰富,而且跨地域容灾,多租户等功能,解决了很多 Kafka 设计缺陷和运维成本问题,整体稳定性更强。很多国内外大公司也有很多 Pulsar 的实践案例。因此,一些传统的日志、大数据处理等场景,对高吞吐量有要求的,对消息可靠性的要求没那么高的,可以选用 Kafka,有很多优秀的文档说明怎么参数调优提高性能。而一些对消息可靠性、容灾要求更好,或者有高分区、延迟队列等需求的场景,可以选用 Pulsar。
我们后台的技术栈是基于 Golang 的,在上文的对比中,还挑了一个基于 Golang 开发的消息队列 NSQ,如果有一些定制化需求或者需要二次开发的,可以选用 NSQ。也可以通过阅读 NSQ 的源码,学习一些优秀高性能消息队列中间件的实现方式,比如里边 diskqueue 组件,一个基于磁盘的消息队列,在某些场景下可能也可以进行二次利用。