文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

【Kafka】Kafka Stream简单使用

2023-08-30 16:39

关注

一、实时流式计算

1. 概念

一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算
在这里插入图片描述
流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。

2. 应用场景

  • 日志分析: 网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策
  • 大屏看板统计: 可以实时的查看网站注册数量,订单数量,购买数量,金额等。
  • 公交实时数据: 可以随时更新公交车方位,计算多久到达站牌等
  • 实时文章分值计算

比如应用较广的 头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐

3. Kafka Stream

近些年来,开源流处理领域涌现出了很多优秀框架。光是在 Apache 基金会孵化的项目,关于流处理的大数据框架就有十几个之多,比如早期的 Apache SamzaApache Storm,以及这些年火爆的 Spark 以及 Flink 等。

3.1 Kafka Streams的特点

在这里插入图片描述

3.2 关键概念

一个最简单的Streaming的结构如下图所示:
在这里插入图片描述

从一个Topic中读取到数据,经过一些处理操作之后,写入到另一个Topic中,这就是一个最简单的Streaming流式计算。其中,Source Topic中的数据会源源不断的产生新数据。
那么,我们再在上面的结构之上扩展一下,假设定义了多个Source TopicDestination Topic,那就构成如下图所示的较为复杂的拓扑结构:
在这里插入图片描述

3.3 KStream

KStream:数据结构类似于map,如下图,key-value键值对

在这里插入图片描述

KStream数据流(data stream),是一段顺序的,可以无限长,不断更新的数据集。
数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

二、测试kafkaStream

先看下简单的kafkaStreamKStream测试

需求分析:求单词个数(word count)
在这里插入图片描述

1. pom.xml引入依赖:

               <dependency>            <groupId>org.springframework.kafkagroupId>            <artifactId>spring-kafkaartifactId>            <exclusions>                <exclusion>                    <groupId>org.apache.kafkagroupId>                    <artifactId>kafka-clientsartifactId>                exclusion>            exclusions>        dependency>        <dependency>            <groupId>org.apache.kafkagroupId>            <artifactId>kafka-clientsartifactId>        dependency>        <dependency>            <groupId>com.alibabagroupId>            <artifactId>fastjsonartifactId>        dependency>        <dependency>            <groupId>org.apache.kafkagroupId>            <artifactId>kafka-streamsartifactId>            <exclusions>                <exclusion>                    <artifactId>connect-jsonartifactId>                    <groupId>org.apache.kafkagroupId>                exclusion>                <exclusion>                    <groupId>org.apache.kafkagroupId>                    <artifactId>kafka-clientsartifactId>                exclusion>            exclusions>        dependency>

2. 配置文件

server:  port: 9991spring:  application:    name: kafka-demo  kafka:    bootstrap-servers: 192.168.200.130:9092    producer:      retries: 10      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer      compression-type: lz4    consumer:      group-id: ${spring.application.name}-test      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3. 编写生产者

ProducerQuickStart.java

package com.kafka.sample;import lombok.extern.slf4j.Slf4j;import org.apache.kafka.clients.producer.*;import java.util.Properties;@Slf4jpublic class ProducerQuickStart {    public static void main(String[] args) {        //1. kafka的配置信息        Properties prop = new Properties();        //kafka的链接信息        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");        //配置重试次数        prop.put(ProducerConfig.RETRIES_CONFIG, 5);        //数据压缩        prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");        //ack配置  消息确认机制   默认ack=1,即只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应//        prop.put(ProducerConfig.ACKS_CONFIG,"all");        消息key的序列化器        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");        //消息value的序列化器        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");        //2. 生产者对象        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);        //封装发送的消息        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("itcast-topic-input", "key_001", "hello kafka");        //3. 发送消息        for (int i = 0; i < 5; i++) {            producer.send(producerRecord);        }        //4. 关闭消息通道  必须关闭,否则消息发不出去        producer.close();    }}

4 编写kafkaStream流式处理

KafkaStreamQuickStart.java

package com.kafka.sample;import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.KeyValue;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.kstream.KStream;import org.apache.kafka.streams.kstream.TimeWindows;import org.apache.kafka.streams.kstream.ValueMapper;import java.time.Duration;import java.util.Arrays;import java.util.Properties;public class KafkaStreamQuickStart {    public static void main(String[] args) {        //kafka的配置信心        Properties prop = new Properties();        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");        //stream 构建器        StreamsBuilder streamsBuilder = new StreamsBuilder();        //流式计算        streamProcessor(streamsBuilder);        //创建kafkaStream对象        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);        //开启流式计算        kafkaStreams.start();    }        private static void streamProcessor(StreamsBuilder streamsBuilder) {        //创建kstream对象,同时指定从那个topic中接收消息        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");                stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {            @Override            public Iterable<String> apply(String value) {                return Arrays.asList(value.split(" "));            }        })                //按照value进行聚合处理                .groupBy((key,value)->value)                //时间窗口                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))                //统计单词的个数                .count()                //转换为kStream                .toStream()                .map((key,value)->{                    System.out.println("key:"+key+",vlaue:"+value);                    return new KeyValue<>(key.key().toString(),value.toString());                })                //发送消息                .to("itcast-topic-out");    }}

5. 编写消费者

ConsumerQuickStart.java

package com.kafka.sample;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class ConsumerQuickStart {    public static void main(String[] args) {        //1. 添加kafka的配置信息        Properties properties = new Properties();        // 配置链接信息        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");        //配置消费者组        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-2");        //配置消息的反序列化器        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");        //2. 消费者对象        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);        //3. 订阅主题        consumer.subscribe(Collections.singletonList("itcast-topic-out"));        //当前线程一直监听消息        while(true){            //4. 消费者拉取消息: 每秒拉取一次            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));            for (ConsumerRecord<String, String> record : records) {                System.out.println(record.key());                System.out.println(record.value());            }        }    }}

启动项目:

  1. 在远端(192.168.200.130:9092)启动docker中的kafka容器
  2. 启动消费者ConsumerQuickStartmain函数
  3. 启动kafkastreammian函数
  4. 启动生产者ProducerQuickStartmain函数

5. 控制台打印结果:

在这里插入图片描述
在这里插入图片描述

整个过程:
生产者向kafka中发送了5条“hello kafka”消息,topic均为itcast-topic-input。kafkastream监听这个topic,每10秒进行一次流式处理,将“hello kakfa”字符串分割,并统计每个单词出现的次数。然后转为kstream,发送消息到kafka中的topic=itcast-topic-out”。消费者监听“itcast-topic-out”的topic,消费消息。

三、Springboot整合kafkaStream

1. 配置文件新增

application.yml

server:  port: 9991spring:  application:    name: kafka-demo  kafka:    bootstrap-servers: 192.168.200.130:9092    producer:      retries: 10      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer      compression-type: lz4    consumer:      group-id: ${spring.application.name}-test      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# kafkaStream新增以下配置kafka:  hosts: 192.168.200.130:9092  group: ${spring.application.name}

2. 在微服务中新增配置类

KafkaStreamConfig.java

package com.kafka.config;import lombok.Getter;import lombok.Setter;import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.StreamsConfig;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafkaStreams;import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;import org.springframework.kafka.config.KafkaStreamsConfiguration;import java.util.HashMap;import java.util.Map;@Setter@Getter@Configuration@EnableKafkaStreams@ConfigurationProperties(prefix="kafka")public class KafkaStreamConfig {    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;    private String hosts;    private String group;    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {        Map<String, Object> props = new HashMap<>();        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");        props.put(StreamsConfig.RETRIES_CONFIG, 10);        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());        return new KafkaStreamsConfiguration(props);    }}

3. 使用kafkaStream监听消息

KafkaStreamHelloListener.java

package com.kafka.stream;import lombok.extern.slf4j.Slf4j;import org.apache.kafka.streams.KeyValue;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.kstream.KStream;import org.apache.kafka.streams.kstream.TimeWindows;import org.apache.kafka.streams.kstream.ValueMapper;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.time.Duration;import java.util.Arrays;@Configuration@Slf4jpublic class KafkaStreamHelloListener {    @Bean    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){        //创建kstream对象,同时指定从那个topic中接收消息        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {            @Override            public Iterable<String> apply(String value) {                return Arrays.asList(value.split(" "));            }        })                //根据value进行聚合分组                .groupBy((key,value)->value)                //聚合计算时间间隔                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))                //求单词的个数                .count()                .toStream()                //处理后的结果转换为string字符串                .map((key,value)->{                    System.out.println("key:"+key+",value:"+value);                    return new KeyValue<>(key.key().toString(),value.toString());                })                //发送消息                .to("itcast-topic-out");        return stream;    }}

测试:

启动springboot应用程序,运行之前的ProducerQuickStart来生产消息,约10秒后,看到kafkaStream消息的处理结果
在这里插入图片描述

说明kafkaStream接收到消息并将多条消息进行了统一处理。

参考(推荐阅读):

  1. https://cloud.tencent.com/developer/article/2100664
  2. https://www.cnblogs.com/tree1123/p/11457851.html

来源地址:https://blog.csdn.net/zsx1713366249/article/details/132522600

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-人工智能
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯