文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

kafka核心消费逻辑是什么

2023-07-06 01:43

关注

这篇文章主要介绍“kafka核心消费逻辑是什么”,在日常操作中,相信很多人在kafka核心消费逻辑是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”kafka核心消费逻辑是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

kafka消费者线程

突击检查八股文,实现线程的方法有哪些?嗯?没复习是吧,行没关系,那感谢参加本次面试哈。

常用的几种方式分别是:

这里我们直接创捷出一个任务类实现Runable方法,重写run方法,一个线程当作一个kafka client,所以要在任务类中声明一个KafkaConsumer的成员变量,另外创建任务需要指定当前任务的名称也就是线程名,还有要监听的topic主题。

private KafkaConsumer<String, String> consumer;private String topic;private String threadName;

name和topic通过构造方法传进来,同时在构造方法里完成对client的初始化操作。

   public KafkaConsumerRunnable(String bootServer, String groupId, String topic) {       this.topic = topic;       Properties props = new Properties();       props.put("bootstrap.servers", bootServer);       props.put("group.id", groupId);       props.put("enable.auto.commit", "false");       props.put("auto.offset.reset", "latest");       props.put("max.poll.records", 5);       props.put("session.timeout.ms", "60000");       props.put("max.poll.interval.ms", 300000);       props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  //键反序列化方式       props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");       this.consumer = new KafkaConsumer&lt;&gt;(props);   }

这里封装kafka client的必要信息,入参bootServer为kafka集群ip,groupId为threadName,我们规定一个线程为一个kafka消费链接,消费一个topic。

上一篇线程池保证了任务不会轻易挂掉,就算挂掉了也会重新提交,所以为了节省资源不做所谓的同groupId的负载操作。session.timeout.ms和max.poll.interval.ms可以根据当前的kafka资源灵活配置,不然可能会引发一些reblance。

enable.auto.commit设置为false,手动提交offset,auto.offset.reset这块由于业务特殊,本来就是流式图表瞬时的展示,如果真的出现了数据丢失那就丢了吧,从最新的数据读取。

接下来只需要处理下消费逻辑,consumer.subscribe(Collections.singletonList(this.topic))开始订阅监听kafka数据,搞一个while true不断的消费数据,try catch只需要对WakeupException做处理,kafka客户端会在关闭的时候抛出WakeupException异常。

finally里提交offset,无论这条offset对应的数据消费成功还是失败都是消费过了,失败了就过去了。

   @Override   public void run() {   consumer.subscribe(Collections.singletonList(this.topic));   String key = "stream_chart:" + this.name;   Thread.currentThread().setName(key);   try {      while (true) {         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));         // 如果队列中没有消息 等待KAFKA_TIME_OUT后调用poll,如果有消息立即消费         for (ConsumerRecord<String, String> record : records) {            String value = record.value();            log.info("线程 {} 消费kafka数据 -> {} \n", Thread.currentThread().getName(), value);            RedisConfig.getRedisTemplate().opsForZSet().add(key, value, Instant.now().getEpochSecond() * 1000);         }      }   } catch (WakeupException e) {      log.info("ignore for shutdown", e);   } finally {      consumer.commitAsync();   }}

我们消费到数据直接放到redis的zset结构里,当前的时间戳作为score,最后留一个关闭客户端的后门

// 退出后关掉客户端public void shutDown() {   consumer.wakeup();}

任务提交

任务提交这块只需要在业务service中注入线程池,创建对应的KafkaRunable任务封装对应的信息,执行execute即可。

这里有个坑需要注意下,第二次突击检查八股文,线程池提交方法submitexecute的区别说一下。不知道的立刻去熟读并背诵。

public class TestTheadPool {    public static void main(String[] args) {        ExecutorService executorService= Executors.newFixedThreadPool(1);        executorService.submit(new task("submit"));        executorService.execute(new task("execute"));    }}class task implements  Runnable{    private String name;    public task(String name) {        this.name = name;    }    @Override    public void run() {        System.out.println(this.name + " start task");        int i=1/0;    }}

熟悉的同学通过示例代码可以看出来,submit提交的线程不会抛出异常代码,只有获取Future返回值并执行get方法才会捕获到异常。这块涉及到异步的东西不再赘述

try {    Future<?> submit = executorService.submit(new task("submit"));    submit.get();} catch (InterruptedException e) {    e.printStackTrace();} catch (ExecutionException e) {    e.printStackTrace();}

所以我们要使用execute执行,不然kafka消费线程里消费失败了拦截不到就不会被重新提交,导致线程挂掉。

到此,关于“kafka核心消费逻辑是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯