文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Kafka消费者那些事儿

2024-11-30 13:21

关注

消费者消费规则

kafka是以消费者组进行消费,一个消费者组,由多个consumer组成,他们和topic的消费规则如下:

通过这种分组、分区的消费方式,可以提高消费者的吞吐量,同时也能够实现消息的发布/订阅模式和点对点两种模式。

消费者整体工作流程

消费者消费总体分为两个步骤,第一步是制定消费的方案,就是这个组下哪个消费者消费哪个分区,第二个是建立网络连接,获取消息数据。

一、制定消费方案

  1. 消费者consumerA,consumerB, consumerC向kafka集群中的协调器coordinator发送JoinGroup的请求。coordinator主要是用来辅助实现消费者组的初始化和分区的分配。
  1. 选出一个 consumer作为消费中的leader,比如上图中的consumerB。
  2. 消费者leader制定出消费方案,比如谁来消费哪个分区等,有Range分区策略、RoundRobin分区策略等。
  3. 把消费方案告诉给coordinator
  4. 最后coordinator就把消费方案下发给各个consumer, 图中只画了一条线,实际上是会下发到各个consumer。

二、消费者消费细节

现在已经初始化消费者组信息,知道哪个消费者消费哪个分区,接着我们来看看消费者细节。

  1. 消费者创建一个网络连接客户端ConsumerNetworkClient, 发送消费请求,可以进行如下配置:
  1. 发送请求到kafka集群
  2. 获取数据成功,会将数据保存到completedFetches队列中
  3. 消费者从队列中抓取数据,根据配置max.poll.records一次拉取数据返回消息的最大条数,默认500条。
  4. 获取到数据后,经过反序列化器、拦截器后,得到最终的消息。
  5. 最后一步是提交保存消费的位移offset,也就是这个消费者消费到什么位置了,这样下次重启也可以继续从这个位置开始消费,关于offset的管理后面详细介绍。

消费者分区策略

前面简单提到了消费者组初始化的时候会对分区进行分配,那么具体的分配策略是什么呢,也就是哪个消费者消费哪个分区数据?

kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。

Range 分区策略

如上图所示:有 7 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2。7/3 = 2 余 1 ,除不尽,那么 消费者 C0 便会多消费 1 个分区。 8/3=2余2,除不尽,那么C0和C1分别多消费一个。

这种方式容易造成数据倾斜!如果有 N 多个 topic,那么针对每个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。

RoundRobin 分区策略

RoundRobin 针对集群中所有topic而言,RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。

Sticky 和Cooperative Sticky分区策略

Sticky是粘性的意思,它是从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,在rebalance会尽量保持原有分配的分区不变化,这样可以节省开销。

Cooperative Sticky和Sticky类似,但是它会将原来的一次大规模rebalance操作,拆分成了多次小规模的rebalance,直至最终平衡完成,所以体验上会更好。

关于什么是rebalance继续往下看你就知道了。

消费者再均衡

上面也提到了rebalance,也就是再均衡。当kafka发生下面的情况会进行在均衡,也就是重新给消费者分配分区:

消费者位移offset管理

消费者需要保存当前消费到分区的什么位置了,这样哪怕消费者故障,重启后也能继续消费,这就是消费者的维护offset管理。

一、消费者位移offset存储位置

消费者位移offset存储在哪呢?

如何查看__consumer_offsets主题内容?

bin/kafka-console-consumer.sh --topic 
__consumer_offsets --bootstrap-server hadoop102:9092 --
consumer.config config/consumer.properties --formatter 
"kafka.coordinator.group.GroupMetadataManager$OffsetsMessageForm
atter" --from-beginning
## topic1 1号分区
[offset,topic1,1]::OffsetAndMetadata(offset=7, 
leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, 
expireTimestamp=None)
## topic1 0号分区
[offset,topic1,0]::OffsetAndMetadata(offset=8, 
leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, 
expireTimestamp=None)

二、消费者位移offset提交保存模式

消费者是如何提交保存位移offset呢?

自动提交

为了使我们能够专注于自己的业务逻辑,kafka默认提供了自动提交offset的功能。这个由消费者客户端参数 enable.auto.commit 配置, 默认值为 true 。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数 auto.commit.interval.ms 配置,默认值为 5 秒。

自动提交会带来什么问题?

自动提交消费位移的方式非常简便,但会带来是重复消费的问题。

假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象。

我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样 并不能避免重复消费的发送,而且也会使位移提交更加频繁。

手动提交

很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更 加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费。手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。

// 是否自动提交 offset
 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync()和 commitAsync()两种类型的方法。

同步提交会阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败),它必须等待offset提交完毕,再去消费下一批数据。

// 同步提交 offset
consumer.commitSync();

异步提交则没有失败重试机制,故有可能提交失败。它发送完提交offset请求后,就开始消费下一批数据了。

// 异步提交 offset
consumer.commitAsync();

那么手动提交会带来什么问题呢?可能会出现"漏消息"的情况。

设置offset为手动提交,当offset被提交时,数据还在内存中未落盘,此时刚好消费者线程被kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。

我们可以通过消费者事物来解决这样的问题。

其实无论是手动提交还是自动提交,都有可能出现消息重复和是漏消息,与我们的编程模型有关,需要我们开发的时候根据消息的重要程度来选择合适的消费方案。

消费者API

一个正常的消费逻辑需要具备以下几个步骤:

(1)配置消费者客户端参数及创建相应的消费者实例;

(2)订阅主题;

(3)拉取消息并消费;

(4)提交消费位移 offset;

(5)关闭消费者实例。

public class MyConsumer { 
    public static void main(String[] args) { 
        Properties props = new Properties(); 
        // 定义 kakfa 服务的地址,不需要将所有 broker 指定上 
        props.put("bootstrap.servers", "doitedu01:9092"); 
        // 制定 consumer group 
        props.put("group.id", "g1"); 
        // 是否自动提交 offset 
        props.put("enable.auto.commit", "true"); 
        // 自动提交 offset 的时间间隔 
        props.put("auto.commit.interval.ms", "1000");
        // key 的反序列化类 
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        // value 的反序列化类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        // 如果没有消费偏移量记录,则自动重设为起始 offset:latest, earliest, none 
        props.put("auto.offset.reset","earliest");
    
    	// 定义 consumer 
        KafkaConsumer consumer = new KafkaConsumer<>(props); 
        // 消费者订阅的 topic, 可同时订阅多个 
        consumer.subscribe(Arrays.asList("first", "test","test1"));
        while (true) { 
            // 读取数据,读取超时时间为 100ms 
            ConsumerRecords records = consumer.poll(100); 
            for (ConsumerRecord record : records) 
            	System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); 
        } 
	} 
}

订阅主题

consumer.subscribe(Arrays.asList(topicl )); 
consumer subscribe(Arrays.asList(topic2))

如果消费者采用的是正则表达式的方式(subscribe(Pattern))订阅, 在之后的过程中,如果 有人又创建了新的主题,并且主题名字与正表达式相匹配,那么这个消费者就可以消费到 新添加的主题中的消息。

consumer.subscribe(Pattern.compile ("topic.*" ));

消费者不仅可以通过 KafkaConsumer.subscribe()方法订阅主题,还可直接订阅某些主题的指定分区。

consumer.assign(Arrays.asList(new TopicPartition ("tpc_1" , 0),new TopicPartition(“tpc_2”,1))) ;

取消订阅

通过unsubscribe()方法采取消主题的订阅。

consumer.unsubscribe();

poll()拉取消息

kafka 中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用 poll() 方法, poll()方法返回的是所订阅的主题(分区)上的一组消息。

对于 poll () 方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空。

public ConsumerRecords poll(final Duration timeout)

超时时间参数 timeout ,用来控制 poll() 方法的阻塞时间,在消费者的缓冲区里没有可用数据时会发生阻塞。

指定位移消费

有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而 KafkaConsumer 中的 seek( 方法正好提供了这个功能,让我们可以追前消费或回溯消费。

public void seek(TopicPartiton partition,long offset)

消费者重要参数

最后我们总结一下消费者中重要的参数配置。

参数名称

描述

bootstrap.servers

向 Kafka 集群建立初始连接用到的 host/port 列表。

key.deserializer 和value.deserializer

指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。

group.id

标记消费者所属的消费者组。

enable.auto.commit

默认值为 true,消费者会自动周期性地向服务器提交偏移量。

auto.commit.interval.ms

如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。

auto.offset.reset

当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。

offsets.topic.num.partitions

__consumer_offsets 的分区数,默认是 50 个分区。

heartbeat.interval.ms

Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms ,也不应该高于session.timeout.ms 的 1/3。

session.timeout.ms

Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。

max.poll.interval.ms

消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。

fetch.min.bytes

默认 1 个字节。消费者获取服务器端一批消息最小的字节数。

fetch.max.wait.ms

默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。

fetch.max.bytes

默认 Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。

max.poll.records

一次 poll 拉取数据返回消息的最大条数,默认是 500 条。

总结

kafka消费是很重要的一个环节,本文总结kafka消费者的一些重要机制,包括消费者的整个流程,消费的分区策略,消费的再平衡以及消费的位移管理。在明白这些机制以后,简单讲解了如何使用消费者consumer的API以及消费者中重要的参数。

来源:JAVA旭阳内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯