文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

spring-kafka使消费者动态订阅新增的topic问题

2022-12-27 12:01

关注

一、前言

在Java中使用kafka,方式很多,例如:

这里讨论的话题是,如何在spring-kafka中,使得一个消费者可以动态订阅新增的topic?

本文不讨论利用SpringCloudConfig或Apollo等分布式配置中心,利用@RefreshScope的方式来达到目的,这种方式有点杀鸡用牛刀,也会增加系统复杂度和维护成本。

我的环境:jdk 1.8,Spring 2.1.3.RELEASE,kafka_2.12-2.3.0单节点。

二、需求分析

上面已经提到,spring-kafka通过 @KafkaListener 的方式配置订阅的topic,最常用的属性可能是 topics,而要实现本文的需求,就要使用另一个属性 topicPattern,查看它的属性说明:

The topic pattern for this listener. 
The entries can be 'topic pattern', a'property-placeholder key' or an 'expression'. 
The framework will create acontainer that subscribes to all topics matching the specified pattern to getdynamically assigned partitions. 
The pattern matching will be performedperiodically against topics existing at the time of check. 
An expression mustbe resolved to the topic pattern (String or Pattern result types are supported). 

将其翻译过来:

此侦听器的主题模式。条目可以是“主题模式”,“属性占位符键”或“表达式”。
该框架将创建一个容器,该容器订阅与指定模式匹配的所有主题以获取动态分配的分区。
模式匹配将针对检查时存在的主题【定期执行】。
表达式必须解析为主题模式(支持字符串或模式结果类型)。

注意:从说明信息来看,topicPattern 已经可以做到定期检查topic列表,然后将新加入的topic分配至某个消费者。

下面列出消费端的核心测试代码:

@Component
public class SinkConsumer {
    @KafkaListener(topicPattern = "test_topic2.*")
    public void listen2(ConsumerRecord<?, ?> record) throws Exception {
        System.out.printf("topic2.* = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
    }
}

代码实现很简洁,就是期待我们新增一个符合 topicPattern 的topic后,spring-kafka能否自动为新建的topic分配到此目标消费者。

三、测试运行

3.1 启动消费者服务

配置文件中,spring该配的配,kafka该配的配,接着启动即可。

3.2 新建topic

新建 test_topic2_3,刚创建完不能立刻分配到目标消费者,从 topicPattern 的注释得知spring-kafka会定期扫描topic列表,我们要给它几分钟等待扫描到新topic,并为它成功分配到目标消费者后,再去发送第一条消息(所以可以先去洗个手,此时19:02)。

3.3 等待topic被分配到消费者

洗手期间的控制台日志提示:已为新建的 test_topic2_3 分配到我们的目标消费者,并将offset设置到起始位置0,日志如下:

2019-11-15 19:05:12.958  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, groupId=test] Revoking previously assigned partitions [test_topic2_2-0, test_topic2_1-0]
2019-11-15 19:05:12.958  INFO 7768 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [test_topic2_2-0, test_topic2_1-0]
2019-11-15 19:05:12.958  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, groupId=test] (Re-)joining group
2019-11-15 19:05:15.757  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=test] Attempt to heartbeat failed since group is rebalancing
2019-11-15 19:05:15.761  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=test] Revoking previously assigned partitions [test_topic-0]
2019-11-15 19:05:15.762  INFO 7768 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [test_topic-0]
2019-11-15 19:05:15.762  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=test] (Re-)joining group
2019-11-15 19:05:16.025  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-3, groupId=test] Successfully joined group with generation 6
2019-11-15 19:05:16.025  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=test] Successfully joined group with generation 6
2019-11-15 19:05:16.026  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-3, groupId=test] Setting newly assigned partitions [test_topic2_2-0, test_topic2_3-0, test_topic2_1-0]
2019-11-15 19:05:16.026  INFO 7768 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=test] Setting newly assigned partitions [test_topic-0]
2019-11-15 19:05:16.028  INFO 7768 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [test_topic-0]
2019-11-15 19:05:16.032  INFO 7768 --- [ntainer#1-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-3, groupId=test] Resetting offset for partition test_topic2_3-0 to offset 0.
2019-11-15 19:05:16.032  INFO 7768 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [test_topic2_2-0, test_topic2_3-0, test_topic2_1-0]

3.4 发送第一条消息

洗手完毕,看到3.3小节里的日志,然后确认成功分配到目标消费者,且offset被设为0之后,发送第一条消息【我是第1个test_topic2_3的消息】,控制台日志打印出此消息信息,代表成功消费:

topic2.* = test_topic2_3, offset = 0, value = {"date":"2019-11-15 19:11:13","msg":"我是第1个test_topic2_3的消息"} 

3.5 注意事项

若不等到offset被设为0之后,过早发送消息,则会在消费端丢失过早发送的消息,并且当spring-kafka自动设置offset的时候,日志提示,offset被设置为1,而不是起始位置0:

INFO o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-3, groupId=test] Resetting offset for partition test_topic2_1-0 to offset 1.

在上面的3.1至3.4的整个过程中,可能会日志警告,代表暂时不能为新增的topic分配到目标消费者:

WARN o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=test] The following subscribed topics are not assigned to any members: [test_topic2_3] 

所以只需等待日志提示可以成功分配到目标消费者,且offset被设为0之后,即可发送第一条消息。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯