文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

成为 Kafka 高手的秘籍:生产者深度实践总结

2024-12-11 17:39

关注

张晋尉,腾讯云消息队列专项支持团队成员,kafka,puslar资深开发者,kafka sdk贡献者,在流式数据处理,消息队列方向有多年实践经验。

Kafka 简介

kafka 是一款已经发布了近10年的分布式消息队列系统,是一款非常成熟的产品,在各大公司或者产品中或多或少都有他的身影,特别是大数据流处理,log 流处理之类的场景,kafka 更是充当着几乎必不可少的角色。

这款消息队列在官方给出的定义中被称为“分布式流式处理平台”,其主要目的是在大数据流处理中承担着存储记录流的一个作用,不过到了现在这个年代,越来越多的业务架构更倾向于将 kafka 当作消息队列来使用,用来取代比较厚重且性能有限的 RabbitMQ。

kafka 这样一个系统为了确保其简洁性和高性能,其实将很多逻辑细节和配置放到了 client 端,所以我们将从客户端的视角出发,从使用者的角度通过生产者和消费者两个方面来介绍 kafka 在实践生产中遇到的一些问题和相应的技术细节。本文是系列文章的第一篇,介绍生产者。

标准 producer API 简介

这里我们先介绍下最经常使用的生产者 API,相信看本文各位已经是 kafka 使用的熟手了,不过为了后续介绍可能会使用的一些术语,我们还是先复习下 kafka 基础概念,这里我们只关注于生产这部分,忽略其他的无关细节。

首先我们画出生产者和 kafka 交互的一张图,这张图用于描述生产者消息数据的流向和 kafka server 为了接受消息需要用到什么组件。

图片如上,现在让我们分别介绍下图上所绘内容以及相应的专业术语

现在我们开始讲述下关于生产者 API 的使用和一些在生产的时候需要注意的配置,这里的生产者 API 指的是 kafka 提供的几乎无状态的 API,非常的轻便,同时也可以提供非常不错的性能。不过如果使用这个 API 来进行生产,kafka 只保证最少一次和最多一次语义。

接下来我们通过一个代码片段的实例来看下如何使用生产者 api,同时看一下一些重要的配置,首先让我们创建一个有两个 partition,每个 partition 都有两个 replicas 的 topic

  1. bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 2 --topic test 

然后让我们写一段 java 生产者片段,代码非常简单,只是配置一些生产者 client 的相关配置,然后调用 producer的 send 方法将需要发送的消息提交到 kafka 的 client 库

  1. Properties props = new Properties(); 
  2.  
  3. props.put("bootstrap.servers""localhost:9092"); 
  4.  
  5. props.put("acks""all"); 
  6.  
  7. props.put("retries",3); 
  8.  
  9. props.put("retry.backoff.ms",2000); 
  10.  
  11. props.put("compression.type","lz4"); 
  12.  
  13. props.put("batch.size"16384); 
  14.  
  15. props.put("linger.ms",200); 
  16.  
  17. props.put("max.request.size",1048576); 
  18.  
  19. props.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer"); 
  20.  
  21. props.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer"); 
  22.  
  23. props.put("request.timeout.ms"10000); 
  24.  
  25. props.put("max.block.ms"30000); 
  26.  
  27. Producer producer = new KafkaProducer(props); for (int i = 0; i < 1000; i++) { 
  28.  
  29. Future future = producer.send(new ProducerRecord<>("test", UUID.randomUUID().toString())); 
  30.  
  31. System.out.println("produce offset:" + future.get().offset()); 
  32.  
  33.  
  34. producer.close(); 

通过以上代码就可以完成消息的生产了,kafka 给我们提供的这个 API 的功能确实非常简单易用,当然这里面实际上包含上比较多的细节,不过被 client 封装了进去,这里我们继续往深处挖掘下,看看隐藏在这段代码里面的可能存在的坑。

首先我们来简单分析下在这段代码里面 client 会做什么,(注:这里我们更倾向于给出一个通用工作流程,所以可能会忽略部分 java客户端独有特性 )。

client 通过代码中给出的 bootstrap.servers 去连接 broker,这里如果第一个broker 连接失败,那么 client 会从左往右重试去连接,直到全部连接失败或者某一个地址成功连接。

当连接成功后,kafka client 会发起 ApiVersions request去kafka server 查询server 端支持各个 api 以及每个 api 最大的支持版本。从而达到 kafka 一个向下兼容的目的。当然由于 ApiVersions 是一个大约在 0.10 版本加入的 api,所以新版client 如果访问 0.9 版本的 kafka server 会引起ArrayIndexOutOfBoundsException 的报错,这个错误 kafka 官方在 0.10 的时候修复了。

接下来 client 会查询将要发送消息的 topic 的元数据信息,向已经连接的 broker 发送 Metadata request,通过这个 api,kafka client 将会拿到集群 broker 的各种信息,包括 ip 和 port,以及 broker 对应的唯一 id,同时 client 也将获取到 topic的相关信息,parition 的 id 和 partition 选择出来的 leader replicas 所在 broker 的id,然后 client 将会建立 leader replicas 所在 broker 的连接,作为实际发送消息的数据链路。

在这里我们有三个细节需要注意

client 开始根据 message 中的 key 来计算 hash,确定这个 message 会被投递到哪个 partition 中去,然后 client 投递消息到本地的一个队列中,实际连接到partition 的投递者类,将从队列中取出消息,然后 client 会做两个检查之后调用Produce request 去投递消息。

在这一步中主要涉及到以下几个配置

以上就是整个 producer api 在使用过程中的一些细节了,明白了这些细节,在生产时遇到kafka的一些奇怪报错就会有一些思路去定位和处理。当然从代码上来看,代码里面还有一些配置在上面的文章中没有覆盖到,这里我在一起介绍一下

幂等生产者(Idempotent Producer)简介

幂等生产者提供了生产者在单一分区上的恰好一次语义,但是他不能覆盖到生产者对于复数 partition 操作的一致性,这种一致性需要通过后续的事务消息来解决。

现在让我们先看下幂等生产者如何使用,以及一些涉及到的细节。

为什么我们需要使用到幂等生产者,其主要的原因是生产者发送消息到服务端后,如果遇到了网络问题导致连接断开,生产者是无法感知到消息到底是写入成功还是失败,对于 kafka 一般的生产者 api 来说我们会设置 retries 参数,始终去进行重试,这也就是我们所谓至少一次语义,因为我们无法感知是否写入成功,如果写入成功,但是我们没有接收到成功的回复,我们进行重试动作,就会导致消息的重复写入,如果消息消费依赖于消息顺序,这种重试甚至会导致顺序的错乱。

现在通过幂等生产者,kafka 可以在我们进行这样的重试的时候丢弃掉这种重复写入的消息。

现在让我们看看如何使用幂等生产者。(这里让我们来看下代码,代码中让我们忽略掉一些不重要的配置)。

  1. Properties props = new Properties(); 
  2.  
  3. props.put("bootstrap.servers""localhost:9092"); 
  4.  
  5. props.put("enable.idempotence"true); 
  6.  
  7. Producer producer = new KafkaProducer(props); for (int i = 0; i < 1000; i++) { 
  8.  
  9. Future future = producer.send(new ProducerRecord<>("test", UUID.randomUUID().toString())); 
  10.  
  11. System.out.println("produce offset:" + future.get().offset()); 
  12.  
  13.  
  14. producer.close(); 

在用户的使用上,要启动幂等生产者只需要添加设置 enable.idempotence 为 true 就好,让我们继续关注下细节,看看启用幂等生产者后 kafka client 会做什么。首先 client会强制设置一些生产者的配置值。

 

  1. acks 会被强制设置为all,如果客户本来使用的是0,1级别的 acks 那么用户需要考虑下被设置为 all 的时候对于自己业务性能的影响,如果用户本来就是设置为all的情况,那么使用幂等生产者是几乎不会有额外代价的。
  2. retries 必须设置为大于1的数字,一般 librdkafka 和 java kafka client 会把 retries设置为一个非常大的数比如 Integer.MAX_VALUE,基本靠近于无限重试。确保消息一定会成功发送。
  3. max.inflight.requests.per.connection 必须小于5,其中 java kafka client 如果版本小于1.0.0,会把 max.inflight.requests.per.connection 设置为 1,确保一条数据链路上一次只有一个请求,这会导致一定情况下 tps 有所下降。
  4. 发送的消息格式必须是 v2 格式。不支持低版本的消息格式。

完成生产者配置之后,client 开始执行生产消息的发送,这里我们省略在上文提到过的生产 api 的逻辑,只关注于启用幂等后多出来的逻辑和步骤

  1. 生产者 client 向 broker 发起 InitProducerId request 请求一个 PID,后续发送的消息,都会带上这一个 PID 用于标明生产者的身份。
  2. 每个消息会带上一个单调递增的 Sequence ID。kafka server会记录下同一个PID最后一次提交消息的 Sequence ID,如果当前发送的消息 Sequence ID 小于等于最后一次提交的 ID,那么 server 会认为当前消息已经过期了,并拒绝接受消息。client 收到这样的拒绝请求后就可以感知到之前的消息一定是投递成功了,并停止重试发送,丢弃掉消息。

通过以上的这些步骤,kafka 确保了每个消费者对于单 partition 操作的一个幂等性,这是一个非常实用的功能,特别是在使用消费者 api 的时候本来就已经设置了acks 为 all 的业务,启用生产者幂等几乎没有额外消耗,这也是一个 kafka 推出了比较久的功能了(从 kafka 0.11 开始支持),但是目前看起使用本功能的用户还是比较少。

事务消息(Transactional Messaging)简介

事务消息是目前 kafka 为了确保恰好一次语义所提供的最强约束,他确保了一个生产者如果生产多个相互关联的消息到不同的 partition 上时要么最后同时成功,要么同时失败。同时启用事物消息的前提必须启用幂等生产者,所以单 partition 上的恰好一次语义就由幂等的特性来保证。

不过一般很少有业务会直接使用 kafka 的事物消息,会涉及使用事物消息的业务其实基本上都是通过 kafka stream 进行流处理,而 kafka stream 依赖于事务消息并且对于业务隐藏掉了事务细节,所以这里我们来看看如何直接使用事务消息并继续尝试分析下client 在这期间做了什么,先让我们放出一份代码片段。

  1. Properties props = new Properties(); 
  2.  
  3. props.put("bootstrap.servers""localhost:9092"); 
  4.  
  5. props.put("enable.idempotence""true"); 
  6.  
  7. props.put("transactional.id""testtrans-1"); 
  8.  
  9. KafkaProducer producer = new KafkaProducer(props); 
  10.  
  11. producer.initTransactions(); try
  12.  
  13. producer.beginTransaction(); 
  14.  
  15. producer.send(record0); 
  16.  
  17. producer.send(record1); 
  18.  
  19. producer.sendOffsetsToTxn(…); 
  20.  
  21. producer.commitTransaction(); 
  22.  
  23. catch( ProducerFencedException e) { 
  24.  
  25. producer.close(); 
  26.  
  27. catch( KafkaException e ) { 
  28.  
  29. producer.abortTransaction(); 
  30.  

首先代码里面执行 initTransactions 作为第一步,在这个逻辑中 client 将会请求 InitProducerId 并传递事务 id,用来建立一个事务 id 和 PID 一对一的关系。如果有多个生产者加入到同一个事务 id 中,前面加入的生产者都会被后面加入的替代。前面生产者的请求都会被拒绝。

值得注意的是如果 client 短线重新连接,它会在请求 InitProducerId 的时候提交之前使用的 PID 以及 epoch,如果成功随后 server 会返回 epoch+1,同时会拒绝所有 epoch小于当前 epoch 的生产者消息,这是为了解决分布式系统中所谓的僵死问题。

然后接下来的代码调用就和很多事务代码一样了,启动一个事务,写入所有需要写入的信息,最后再 commit,如果失败则回滚,如果成功就会一起提交所有写入,然后做接下来的业务逻辑。一般大部分事务的实现都是一个状态机,这里我们就放上一张图不继续分析下去了。

在看完了事务代码后,我们似乎没有提到 sendOffsetsToTxn 这个函数,这个函数实际上是用于当前事务消息是一个从一个 topic 消费,然后写入到事务消息的时候使用的,消费的 offset 可以通过这个函数提交到协调者,后续在事物提交的时候再一并提交消费者消费掉的 offset。防止事务失败的时候用户还需要手动管理消费者 offset。是一个非常有用的帮助函数。

总结

到此为止,我们从客户端视角出发简单的去分析了 kafka 生产者的一些用法和相应需要注意的坑,由于作者的本篇文章是从工作中遇到的一些问题出发的,所以相应的如果某些地方用得多有人咨询的多,那么可能会写的稍微详细一些,有的地方咨询的人少,遇到的问题也相应比较少,可能就会简略一些。

希望本文能够让各位读者有所收获,能够对 kafka 生产者这部分有更好的了解。感谢各位的阅读,让我们下一篇文章 kafka 消费者再见。

 

 

来源:高效运维内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯