什么是Kafka
Kafka是最初由Linkedin公司开发,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目,也是一个开源【分布式流处理平台】,由Scala和Java编写,(也当做MQ系统,但不是纯粹的消息系统)
目前 Kafka 已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。目前越来越多的开源分布式处理系统如 Cloudera、Storm、Spark、Flink 等都支持与 Kafka 集成
生产者与消费者机制
在Kafka中,生产者(producer)将消息发送给Broker,Broker将生产者发送的消息存储到磁盘当中,而消费者(Consumer)负责从Broker订阅并且消费消息,消费者(Consumer)使用pull这种模式从服务端拉取消息。而zookeeper是负责整个集群的元数据管理与控制器的选举。具体如下图所示。
Kafka的producer生产者发送到Broker分区策略
发布订阅的对象是主题(Topic),生产者将消息发送到指定的主题,消费者再对负责订阅的主题来进行消费。在Kafka里的分区机制是怎么样的呢?它是将主题划分成了多个分区(partition),每一个分区又有多个副本,在同一个主题下的不同分区里的消息也是不一样的。在生产者生产出来的每一条消息都只会发送到一个分区里,Kafka里的分区编号都是从0开始的,如果生产者向两个分区的主题发送一条消息,那么这条不是在分区0里,就是在分区1里。
那么如何指定消息到指定的分区里呢?
这时候就可以看看生产者的发送逻辑了,在此之前我们需要知道一个叫ProducerRecord的玩意,这个是什么?
ProducerRecord就是发送给Broker的Key/value键值对,封装基础数据信息,简称为PR。
内部结构
- Topic(名字)
- PartitionID(可选)
- Key(可选)
- Value
生产者发送逻辑
如果指定了Partition ID的话,那么PR就会被发送到指定的Partition里。
如果没有指定Partition ID,但是指定了Key,那么PR就会按照hash(key)发送到相对应的Partition里
如果没有指定Partition ID,也没有指定Key,PR就会使用默认的round-robin轮训发送到每一个Partition里(消费者消费partition分区默认是range模式)
如果同时指定了Partition ID与Key的话,PR只会发送到指定的Partition(这时候的Key不起作用,代码逻辑决定)
注意:Partition有多个副本,但是的话只有一个replicationLeader来负责这个Partition和生产者消费者交互
生产者到Broker的发送流程
kafka的客户端发送数据到服务器里(并不是来一条发一条),会经过内存的缓冲区,在通过KafkaProducer发送出去的消息都是先进入到客户端的本地缓存里,然后再把消息收集到Batch里,再一次性的发送到Broker上去的,这样的性能才可能提高。
生产者常见的配置
- #kafka地址,即broker地址
- bootstrap.servers
-
- #当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别,分别是0, 1,all。
- acks
-
- #请求失败,生产者会自动重试,指定是0次,如果启用重试,则会有重复消息的可能性
- retries
-
- #每个分区未发送消息总字节大小,单位:字节,超过设置的值就会提交数据到服务端,默认值是16KB
- batch.size
-
- # 默认值就是0,消息是立刻发送的,即便batch.size缓冲空间还没有满,如果想减少请求的数量,可以设置 linger.ms 大于#0,即消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
- # 通俗解释是,本该早就发出去的消息被迫至少等待了linger.ms时间,相对于这时间内积累了更多消息,批量发送 减少请求
- #如果batch被填满或者linger.ms达到上限,满足其中一个就会被发送
- linger.ms
-
- # buffer.memory的用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB。
- # 如果buffer.memory设置的太小,可能导致消息快速的写入内存缓冲里,但Sender线程来不及把消息发送到Kafka服务器
- # 会造成内存缓冲很快就被写满,而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了
- # buffer.memory要大于batch.size,否则会报申请内存不足的错误,不要超过物理内存,根据实际情况调整
- buffer.memory
-
- # key的序列化器,将用户提供的 key和value对象ProducerRecord 进行序列化处理,key.serializer必须被设置,即使
- #消息中没有指定key,序列化器必须是一个实现org.apache.kafka.common.serialization.Serializer接口的类,将#key序列化成字节数组。
- key.serializer
- value.serializer
Kafka的Consumer消费者机制和分区策略讲解
消费者根据什么模式从broker获取数据的?为什么是pull模式,而不是broker主动push?
答案可以看文章一开始的图,消费者是采用Pull拉取方式从broker的partition获取数据,那为什么是pull模式而不是push呢?pull模式可以根据消费者的消费能力来进行自己调整,不同的消费者性能不一样。如果broker没有数据的话,消费者可以配置timeout的世界,进行阻塞等待一段时间后再返回。但如果是broker主动Push,push的优点是可以快速的处理消息,但是容易对消费者处理不过来,造成消息的堆积和延迟。
消费者从哪个分区进行消费?
我们知道一个topic有多个partition,一个消费者组里面就有多个消费者,那是怎么分配的呢?一个主题topic可以有多个消费者,因为里面有多个partition分区(leader分区),一个partition leader可以由一个消费组里的一个消费者来消费。
那么消费者从哪个分区来进行消费呢?
策略一、round-robin (RoundRobinAssignor非默认策略)轮训,按照消费者组来进行轮训分配,同个消费者组监听不同的主题也是一样,是把所有的partition和所有的consumer都列出来,所以的话消费者组里面的订阅主题是一样的才可以,主题不一样的话会出现分配不均匀的问题。比如下面这个例子:
- #有七个分区,同组里有两个消费者
- topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5/topic-p6 (分区)
-
- c-1: topic-p0/topic-p2/topic-p4/topic-p6 (消费者1)
-
- c-2:topic-p1/topic-p3/topic-p5 (消费者2)
这样会有什么弊端,如果是同一消费者组里,所订阅的消息是不相同的,在执行分区的时候分配不是轮询分配,这样可能会导致分区分配的不均匀。例如现在有三个消费者C0、C1、C2,它们共订阅了3个主题:t0、t1、t2。这时候t0有1个分区(p0),t1有2个分区(p0,p1),t2有3个分区(p0,p1,p2)。消费者C0订阅了主题t0,消费者C1订阅主题t0和t1,消费者C2订阅的是t0,t1,t2。因为是轮询的机制,当C0订阅到T0后,C1就订阅不了到T0了,但是可以订阅到T1,C2也一样的订阅不了T0,但是T1和T2都能订阅到,这时候T2也就只有C2订阅,其他的C0与C1是不可见的,这时候T2的的消息也就给C2这个消费者来消费了。这个情况就是分配不均的问题。
策略二、range(RangeAssignor默认策略)范围,按照主题来进行分配,如果不平均分配的话,则第一个消费者会分配比较多的分区,一个消费者监听不同的主题也不影响,这一种策略有什么弊端呢,只是针对一个topic来说的话,c-1多消费一个分区的话影响并不大,如果有多个topic,那么针对每一个topic的话,消费者C-1都将多消费1个分区,topic越多的话那么久消费的分区也越多,性能会有所下降。
【面试题】Consumer消费者重新分配策略与offset维护机制
什么是Rebalance操作
Kafka怎么均匀的分配某一个topic下所有的partition到各个消费者的呢,从而使得消息的消费速度达到了最快,这就是平衡。而rebalance(重平衡)其实就是重新进行partition的分配,从而使得partition的分配重新达到了平衡的状态。如下图,有两个Consumer,A和B,当第三个成员C加入时,Kafka就会触发Rebalance,重新分配策略为A、B、C重新分区,Rebalance之后的分配依旧还是公平的,每个Consumer实例都获取了两个分区的消费权。
当消费者在消费过程突然宕机了,重新恢复后是从哪里消费,会有什么问题?
消费者会记录offset,故障恢复后会从这里继续消费,那么这个offset记录在哪里呢?记录在zookeeper和本地,新版的默认将offset保证在kafka的内置topic中,名称为_consumer_offsets。在这个topic默认会有50个Partition,每一个Partition都有3个副本,分区数量就是由参数offset.topic.num.partition配置的。通过groupid的哈希值和该参数的取模方式来确定某个消费者组已消费的offset保存到_consumer_offsets主题的哪个分区中。这个由消费者组名+主题+分区,来确定唯一的offset的key,从而获取对应的值。