文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

kafka—消费者

2023-10-03 21:23

关注

学习目录

一、消费者工作流程

消费者Consumer采用从broker中主动拉取数据,Kafka采用这种方式

  1. 生产者Producer向每一个分区的leader发送数据,follower主动跟leader同步数据保证数据的可靠性
  2. 消费者Consumer消费某一个分区的数据,一个消费者可以消费多个分区的数据
  3. 每个分区的数据只能有一个消费者组中的一个消费者消费,即同一个分区不能有消费者组中的两个消费者同时消费
  4. 每个消费者的offset(分区中数据的偏移量),由消费者保存在主题中。如果某台消费者宕机了(挂了)重启的之后通过offset得到以前消费数据的位置

二、消费者组

Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同

特点:

1.消费者组初始化流程

coordinator:辅助实现消费者组的初始化和分区的分配
coordinator节点选择 = groupid的hashcode值 % __consumer_offsets的分区数量
例如: groupid的hashcode值 = 1,__consumer_offsets为50,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset

  1. 所有者的消费者都会主动的向消费者发送请求加入消费者组当中
  2. coordinator从多个消费者中选择一个消费者作为leader(老大)
  3. coordinator将之前所有消费者信息发给leader,其中包括主题情况等
  4. 消费者leader会负责制度消费方案
  5. 消费者leader把消费方案发给coordinator
  6. coordinator把消费者方案分别发给各个消费者consumer

2.特殊情况☆☆☆☆☆

每3秒 每个消费者都会和coordinator保持心跳(默认3s),一旦超时超过45s(session.timeout.ms=45s),该消费者会被移除,并触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms5分钟)超过了五分钟,也会触发再平衡

再平衡:把挂掉的消费者的任务分配给其他消费者

3.消费者组详细消费流程

  1. 首先创建一个消费者网络连接的客户端去跟kafka集群进行交互,然后调用sendFetches方法用来抓取数据,进行初始化,需要设置以下参数
    (1)参数一:Fetch.min.bytes 每批次最小抓取数据大小 默认1字节,可以设置
    (2)参数二:fetch.max.wait.ms 每批数据未到最小抓取数据的大小的超时时间,默认为500ms
    (3)参数三:Fetch.max.bytes 每批次最
    大抓取大小,默认50M
  2. 再调用send方法发送请求,通过onSuccess回调方法,把对应的数据拉去回来,将一批一批的数据 放入消息队列queue中
  3. 消费者从队列中拉去数据,Max.poll.records一次拉取数据返回消息的最大条数,默认500条
  4. 生产者对数据进行序列号,那么消费者则对数据进行反序列化,通过拦截器,最后进行数据处理

三、快速入门

在 IDEA 中编写生产者和消费者程序,生产者往主题为first3中发送数据,消费者从主题为first3中拉去数据

注意:运行程序之前,需要启动zk和kafka集群

生产者 CustomProducer01 类

package com.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducer01 {    public static void main(String[] args) {        //配置        Properties properties = new Properties();        //连接集群        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");    //写两个节点是为了防止客户挂掉,另一个能够正常工作        //指定对应的key和value的序列化类型        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());        // 1.创建kafka生成对象        //  表示 k的数据类型,和v的数据类型        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);        // 2.发送数据        for (int i = 0; i<5;i++){            //第一个参数为生产者的主题名,第二个生产者生产的数据value。还有其他配置选项            kafkaProducer.send(new ProducerRecord<String, String>("first3","kafka"));        }        // 3.关闭资源        kafkaProducer.close();    }}

消费者 CustomConsumer_01 类

package com.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.ArrayList;import java.util.Properties;public class CustomConsumer_01 {    public static void main(String[] args) {        //配置        Properties properties = new Properties();        //连接集群        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");    //多写一个,避免其中一台挂掉,保证数据的可靠性        //反序列化        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        //配置消费者组ID 可以任意起        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");        //1.创建一个消费者 "","hello"        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);        //2.订阅主题 first3        ArrayList<String> topics = new ArrayList<String>();        topics.add("first3");        kafkaConsumer.subscribe(topics);        //3.消费数据        while (true){            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//每1秒拉取一批数据            //循环打印消费的数据 consumerRecords.for            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {                System.out.println(consumerRecord);            }        }    }}

来源地址:https://blog.csdn.net/weixin_44604159/article/details/127538779

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-服务器
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯