文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Spring Boot 集成 Kafkad的实现方法

2023-06-14 11:39

关注

本篇内容介绍了“Spring Boot 集成 Kafkad的实现方法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

Spring Boot 作为主流微服务框架,拥有成熟的社区生态。市场应用广泛,为了方便大家,整理了一个基于spring boot的常用中间件快速集成入门系列手册,涉及RPC、缓存、消息队列、分库分表、注册中心、分布式配置等常用开源组件,大概有几十篇文章,陆续会开放出来,感兴趣同学请提前关注&收藏

消息通信有两种基本模型,即发布-订阅(Pub-Sub)模型和点对点(Point to Point)模型,发布-订阅支持生产者消费者之间的一对多关系,而点对点模型中有且仅有一个消费者。

前言

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”。

Kafka高效地处理实时流式数据,可以实现与Storm、HBase和Spark的集成。作为聚类部署到多台服务器上,Kafka处理它所有的发布和订阅消息系统使用了四个API,即生产者API、消费者API、Stream API和Connector API。它能够传递大规模流式消息,自带容错功能,已经取代了一些传统消息系统,如JMS、AMQP等。

为什么使用kafka?

业务场景

基本架构

Kafka 运行在一个由一台或多台服务器组成的集群上,并且分区可以跨集群节点分布

Spring Boot 集成 Kafkad的实现方法

Producer 生产消息,发送到Broker中

Leader状态的Broker接收消息,写入到相应topic中。在一个分区内,这些消息被索引并连同时间戳存储在一起

Leader状态的Broker接收完毕以后,传给Follow状态的Broker作为副本备份

Consumer 消费者的进程可以从分区订阅,并消费消息

常用术语

代码演示

外部依赖:

在 pom.xml 中添加 Kafka 依赖:

<dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId></dependency>

由于spring-boot-starter-parent 指定的版本号是2.1.5.RELEASE,spring boot 会对外部框架的版本号统一管理,spring-kafka 引入的版本是 2.2.6.RELEASE

配置文件:

在配置文件 application.yaml 中配置 Kafka 的相关参数,具体内容如下:

Spring:  kafka:    bootstrap-servers: localhost:9092    producer:      retries: 3  # 生产者发送失败时,重试次数      batch-size: 16384      buffer-memory: 33554432      key-serializer: org.apache.kafka.common.serialization.StringSerializer # 生产者消息key和消息value的序列化处理类      value-serializer: org.apache.kafka.common.serialization.StringSerializer    consumer:      group-id: tomge-consumer-group  # 默认消费者group id      auto-offset-reset: earliest      enable-auto-commit: true      auto-commit-interval: 100      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

对应的配置类 org.springframework.boot.autoconfigure.kafka.KafkaProperties,来初始化kafka相关的bean实例对象,并注册到spring容器中。

发送消息:

Spring Boot 作为一款支持快速开发的集成性框架,同样提供了一批以 -Template 命名的模板工具类用于实现消息通信。对于 Kafka 而言,这个工具类就是KafkaTemplate

KafkaTemplate 提供了一系列 send 方法用来发送消息,典型的 send 方法定义如下代码所示:

public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) { 。。。。 省略}

生产端提供了一个restful接口,模拟发送一条创建新用户消息。

@GetMapping("/add_user")public Object add() {    try {        Long id = Long.valueOf(new Random().nextInt(1000));        User user = User.builder().id(id).userName("TomGE").age(29).address("上海").build();        ListenableFuture<SendResult> listenableFuture = kafkaTemplate.send(addUserTopic, JSON.toJSONString(user));                // 提供回调方法,可以监控消息的成功或失败的后续处理        listenableFuture.addCallback(new ListenableFutureCallback<SendResult>() {            @Override            public void onFailure(Throwable throwable) {                System.out.println("发送消息失败," + throwable.getMessage());            }            @Override            public void onSuccess(SendResult sendResult) {                // 消息发送到的topic                String topic = sendResult.getRecordMetadata().topic();                // 消息发送到的分区                int partition = sendResult.getRecordMetadata().partition();                // 消息在分区内的offset                long offset = sendResult.getRecordMetadata().offset();                System.out.println(String.format("发送消息成功,topc:%s, partition: %s, offset:%s ", topic, partition, offset));            }        });        return "消息发送成功";    } catch (Exception e) {        e.printStackTrace();        return "消息发送失败";    }}

实际上开发使用的Kafka默认允许自动创建Topic,创建Topic时默认的分区数量是1,可以通过server.properties文件中的num.partitions=1修改默认分区数量。在生产环境中通常会关闭自动创建功能,Topic需要由运维人员先创建好。

消费消息:

在 Kafka 中消息通过服务器推送给各个消费者,而 Kafka 的消费者在消费消息时,需要提供一个监听器(Listener)对某个 Topic 实现监听,从而获取消息,这也是 Kafka 消费消息的唯一方式。

定义一个消费类,在处理具体消息业务逻辑的方法上添加 @KafkaListener 注解,并配置要消费的topic,代码如下所示:

@Componentpublic class UserConsumer {    @KafkaListener(topics = "add_user")    public void receiveMesage(String content) {        System.out.println("消费消息:" + content);    }}

是不是很简单,添加kafka依赖、使用KafkaTemplate、@KafkaListener注解就完成消息的生产和消费,其实是SpringBoot在背后默默的做了很多工作,如果感兴趣可以研究下spring-boot-autoconfigure ,里面提供了常用开源框架的客户端实例封装。

“Spring Boot 集成 Kafkad的实现方法”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程网网站,小编将为大家输出更多高质量的实用文章!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯