消息中间件是我们工作中使用最频繁的一类中间件,它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。今天,指北君就来详细讲讲RocketMQ生产者和消费者在使用时的一些注意事项。
一. 生产者
1.1 发送消息注意事项
1)消息大小
建议消息大小不要超过512K。
2)异步发送
默认的发送为同步发送,send方法会一直阻塞,等待broker端的响应。如果你关注性能问题,可以通过send(msg, callback)来发起异步调用。
3)生产者组
正常情况下生产者组是没有作用的,但是在发送事务消息时,如果producer中途意外宕机,broker会主动回调producer group 内的任意一台机器来确认事务的状态。(目前开源版本还不支持事务消息)。
4)线程安全问题
生产者实例是线程安全的,在应用中只需要实例化一次即可。
5)性能问题
如果你希望在一个jvm进程内使用多个producer实例来提高发送能,我们建议:
使用异步发送,并且producer实例只需要3 ~ 5个即可 对每一个producer 调用 setInstanceName,区别不同的生产者。
6)发送超时时间
当客户端向broker发送请求超时时,客户端会抛出 RemotingTimeoutException,默认的超时时间是3秒。通过调用send(msg, timeout) 可以设置超时时间。超时时间建议不要设置过小,因为 broker 可能需要时间刷盘或向 slave 同步数据。
7)对于同一个应用最好只使用一个Topic,消息的子类型可以使用 tags 来标识,tags 可以由应用自由设置。当发送的消息设置了 tags 时,消费方在订阅消息时可以使用 tags 在 broker 做消息过滤。注意这里的命名虽然是复数,但是一条消息只能有一个tag。
8)消息在业务层面的唯一标识可以设置到 keys 字段,方便根据 keys 来定位消息。broker 会为每个消息创建索引(哈希索引),应用可以通过topic 、key 查询这条消息的内容(MessageExt),以及消息被谁消费(MessageTrack,精确到consumer group)。由于是哈希索引,请尽量保证key 的唯一,这样可以避免潜在的哈希冲突。
9)消息发送不管是成功还是失败都要打印消息日志,日志内容务必包含 sendResult 和 key 字段。
10)对于消息不可丢失的应用,务必要有消息重发机制。例如如果消息发送失败,可以将消息存储到数据库,然后通过定时程序或者人工的方式触发重发。
11)调用send 同步发送消息时,假定此时设置了 isWaitStoreMsgOK=true(default is true),只要不抛出异常就代表发送成功,但当 isWaitStoreMsgOK = false 时,发送永远返回 SEND_OK。但是对于发送“成功”会有多个状态,在 SendStatus 中定义如下:
FLUSH_DISK_TIMEOUT
如果 broker 设置的 FlushDiskType = SYNC_FLUSH,当 broker 的在刷盘超时时(MessageStoreConfig.syncFlushTimeout,默认5秒)会返回该状态。此时消息任然保存在内存中,只有broker 宕机时消息才会丢失。
FLUSH_SLAVE_TIMEOU
如果 broker 的 role 是 SYNC_MASTER,当 slave 同步数据的时间超过了 MessageStoreConfig.syncFlushTimeout (默认5秒) 时会返回此状态。此时只有主从都宕机,并且主也没有刷盘时,消息才会丢失。
SLAVE_NOT_AVAILABLE
如果 broker 的 role 是 SYNC_MASTER,并且此时 slave 不可用时会返回该状态。
SEND_OK
发送成功。为了保证消息不丢失还需要配置 SYNC_MASTER or SYNC_FLUSH。
12)消息重复
当发送消息时返回 FLUSH_DISK_TIMEOUT/FLUSH_SLAVE_TIMEOUT,若非常不幸的 broker 也宕机了,消息将会丢失。此时如果什么都不做,消息可能会丢失,如果重发消息,消息可能会出现重复。
通常我们建议发送端重发消息,由消费方来保证消息消费的幂等性。
1.2 消息发送失败如何处理
Producer 的 send 方法本生支持内部重试,重试逻辑如下:
至多重试3次 如果发送失败,则轮转到下一个broker 这个方法的总耗时时间不超过 sendMsgTimeout,默认3秒 所以发送消息已经产生超时异常的话就不会再重试。以上策略仍不能保证消息发送一定成功,为保证消息发送一定成功,建议应用这么做:如果调用 send 同步发送失败,则尝试将消息存储到db,由后台线程定时重试,保证消息一定到达 Broker。
1.3 oneway 的发送形式
对于可靠性要求不高的应用,可以采用 oneway 的发送形式,oneway 形式不等待应答。
1.4 发送顺序消息
顺序消息分为分区有序和全局有序。
分区有序要求 producer 在send 时传入 MessageQueueSelector 的实现类,最终将某一类消息发送到同一队列。但是一旦发生通信异常、broker 重启等,由于队列总数发生变化,哈希取模后定位的队列会变化,会产生短暂的顺序不一致。如果业务能容忍在集群异常情况下(如某个 broker 宕机或者重启)消息短暂的乱序,使用分区有序比较合适。
全局严格有序的消息即便在异常情况下也能保证消息的有序性,但是却牺牲了分布式的 failover 特性,即 broker 集群中只有要一台机器不可用,则整个集群都不可用,服务可用性会大大降低。
顺序消息的缺点:
发送顺序消息无法利用集群的 FailOver 特性 消费顺序消息的并行度依赖于队列数量 队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消费堆积问题 遇到消费失败的消息,无法跳过,当前队列需要暂停 5.发送事务消息 目前暂不支持。
二. 消费者
2.1 消费者组和订阅
不同的消费者组可以独立消费相同的topic,这点类似于ActiveMQ的虚拟 topic. 另外对于相同的消费者组,需要确保组内的消费者订阅消息的规则是一致的!
MQ 里的一个Consumer Group 代表一个 Consumer 实例群组。对于大多数分布式应用来说,一个 Consumer Group 下通常会挂载多个 Consumer 实例。订阅关系一致指的是同一个 Consumer Group 下所有 Consumer 实例的处理逻辑必须完全一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。
由于 MQ 的订阅关系主要由 Topic+Tag 共同组成,因此,保持订阅关系一致意味着同一个 Consumer Group 下所有的实例需在以下两方面均保持一致:
订阅的 Topic 必须一致;订阅的 Topic 中的 Tag 必须一致。
技术架构 > Consumer 最佳实践 > image2017-11-15 15:50:13.png
2.2 MessageListener
1)顺序消费 MessageListenerOrderly
顺序消费时消费者会锁定队列,以确保消息被顺序消费,但是这样也会造成一定的性能损耗。当消费出现异常的时候,建议不要抛出异常,而是返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT,让消费暂停一会,暂停时间由 context.setSuspendCurrentQueueTimeMillis 方法指定。
2)并发消费
并发消费是推荐的消费方式,在此种模式下,消息将被并发的消费。消费出现异常时不建议抛出异常,只需要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 即可。为了保证消息肯定被至少消费一次,消息将会被重发回 broker (topic不是原topic而是这个消费组的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置,通过 delayLevelWhenNextConsume 和 MessageStoreConfig.messageDelayLevel 设置)后,再次投递到这个 ConsumerGroup,而如果一直这样重复消费都持续失败到一定次数(默认是16次,DefaultMQPushConsumer.maxReconsumeTimes),就会投递到DLQ队列。应用可以监控死信队列来做人工干预。
3)返回状态
在并行消费时可以通过返回 RECONSUME_LATER 来告诉 Consumer 当前无法消费该消息,等延时一段时间再重新消费,但是此时消费不会停止,你可以继续消费其他消息。但在顺序消费时,因为要保证消费的顺序性,所以你不能跳过失败的消息,此时你可以通过返回 SUSPEND_CURRENT_QUEUE_A_MOMENT 来告诉 Consumer 先暂停一会。
4)阻塞
不建议阻塞Listener,因为这会阻塞住线程池,同时也有可能造成消费者线程终止。
2.3 线程数
consumer 内部通过一个 ThreadPoolExecutor 来消费消息,可以通过 setConsumeThreadMin 和 setConsumeThreadMax 来改变线程池的大小。
2.4 ConsumeFromWhere
当新实例启动的时候,PushConsumer会拿到本消费组broker已经记录好的消费进度(consumer offset),按照这个进度发起自己的第一次Pull请求。
如果这个消费进度在Broker并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:
CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息。
CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍。
CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前 注意:这些配置只对全新的消费组有效,老的消费组都是按已经存储过的消费进度继续消费。
对于老消费组想跳过历史消息可以采用以下几种方法:
1)判断消息的发送时间,太老的消息直接返回 CONSUME_SUCCESS。
2)判断消息的 offset 和 MAX_OFFSET 的差距,如果落后太多,可以直接。返回 CONSUME_SUCCESS。
3)消费者启动前,先调整该消费组的消费进度,再开始消费。可以人工使用命令 resetOffsetByTimeStamp,详见 ResetOffsetByTimeCommand.java。
2.5 消息幂等
由于 RocketMQ 无法避免消费重复,所以如果业务对消息重复非常敏感,务必在业务层面去重。
2.6 消费速度慢处理方式
1)提高消费并行度
大部分消息消费行为都属于 IO 密集型业务,适当的提高并发度可以显著的改善消费的吞吐量。
2)批量方式消费
默认情况下 consumer 的 consumeMessageBatchMaxSize 为1,即一次只消费一个消息,如果应用可以批量消费消息,则可以很大程度上提高消费吞吐量。
3)跳过非重要消息
当消堆积严重时可以丢弃不重要的消息。
4)优化消息消费过程
2.7 打印消费日志
建议在消费入口方法打印消息,方便后续排查问题,消费失败时也打印失败日志。
2.8 利用broker过滤消息,避免多余的消息传输
三. 小结
好了,RocketMQ生产者与消费者的使用事项就总结完毕了,相信大家对RocketMQ的使用应该会更有信心了。