文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

java:Kafka生产者推送数据与消费者接收数据(参数配置以及案例)

2023-09-06 09:08

关注

1.生产者推送数据

常用参数

bootstrap.servers:Kafka集群中的Broker列表,格式为host1:port1,host2:port2,…。生产者会从这些Broker中选择一个可用的Broker作为消息发送的目标Broker。

acks:Broker对消息的确认模式。可选值为0、1、all。0表示生产者不会等待Broker的任何确认消息;1表示生产者会等待Broker的Leader副本确认消息;all表示生产者会等待所有副本都确认消息。确认模式越高,可靠性越高,但延迟也越大。

retries:消息发送失败时的重试次数。默认值为0,表示不进行重试。可以将其设置为大于0的值,例如3,表示最多重试3次。

batch.size:消息批量发送的大小。当生产者累积到一定数量的消息时,会将其打包成一个批次一次性发送给Broker。默认值为16384字节,即16KB。

linger.ms:消息发送的延迟时间。生产者会等待一定的时间,以便将更多的消息打包成一个批次一次性发送给Broker。默认值为0,表示立即发送。设置较大的值可以提高吞吐量,但可能会增加消息的延迟。

buffer.memory:生产者可用于缓存消息的内存大小。默认值为33554432字节,即32MB。如果生产者生产消息的速度快于发送消息的速度,可能会导致缓存溢出。可以调整该参数来适应生产者的生产速度。

key.serializer:Key的序列化器。Kafka消息可以包含Key和Value,Key和Value都需要进行序列化。该参数指定Key的序列化器。

value.serializer:Value的序列化器。该参数指定Value的序列化器。

max.block.ms:生产者在发送消息之前等待Broker元数据信息的最长时间。如果在该时间内无法获取到Broker元数据信息,则会抛出TimeoutException异常。默认值为60000毫秒,即60秒。

compression.type:消息压缩类型。可选值为none、gzip、snappy、lz4。默认值为none,表示不进行压缩。压缩可以减少消息的传输大小,提高网络带宽的利用率,但会增加CPU的消耗。

interceptor.classes:消息拦截器列表。可以指定多个消息拦截器对消息进行加工处理。例如,可以在消息中添加时间戳、添加消息来源等信息。
以上参数只是一部分,Kafka生产者还有更多参数可以进行配置。需要根据实际情况选择合适的参数进行配置。

例子

下面是一个单例模式配置 kafka生产者的例子(避免多次创建实例,减少资源的消耗)

public class SingletonKafkaProducerExample {    private static SingletonKafkaProducerExample instance;    private static Producer<String, String> producer;    private SingletonKafkaProducerExample() {    //参数设置        Properties props = new Properties();        props.put("bootstrap.servers", "ip:端口");        props.put("acks", "all");        props.put("max.block.ms",120000);//默认60s        props.put("retries", 3)//默认0;        props.put("batch.size", 16384);        props.put("linger.ms", 1);        props.put("buffer.memory", 33554432);        props.put("request.timeout.ms",60*1000);        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        //sasl认证 (根据实际情况看是否配置)props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';");producer = new KafkaProducer<>(props);logger.info("kafka连接成功");    }    public static SingletonKafkaProducerExample getInstance() {        if (instance == null) {            synchronized (SingletonKafkaProducerExample.class) {                if (instance == null) {                    instance = new SingletonKafkaProducerExample();                }            }        }        return instance;    }    public void sendMessage(String topic, String key, String value) {        try {        //这里也可以不用设置key和partition,例如不设置分区 系统会使用轮询算法自动匹配partition            ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);                        Future<RecordMetadata> future = producer.send(record, (metadata, exception) -> {                if (exception != null) {                    System.err.println("发送消息到" + metadata.topic() + "失败:" + exception.getMessage());                } else {                    System.out.println("发送消息到" + metadata.topic() + "成功:partition=" + metadata.partition() + ", offset=" + metadata.offset());                }            });            future.get(); // 等待返回数据        } catch (InterruptedException | ExecutionException e) {            System.err.println("发送消息失败:" + e.getMessage());        }     }    public void closeProducer() {        producer.close();    }}

以上参数配置只是案例,实际参数配置需要根据业务情况自己设置

可能遇见的问题

1.多个topic发送消息的时候总有1.2发送失败 报Failed to update metadata after 60000ms
这种情况出现的原因可能是Kafka集群中Broker的元数据信息还没有被更新到Kafka客户端中,导致Kafka客户端无法连接到指定的Broker。

解决

增加等待时间:可以通过设置max.block.ms属性来增加等待时间
提高重试次数:可以通过设置retries属性来提高重试次数
检查Broker配置
检查网络连接
检查Kafka版本

如果下面3个都没问题,就增加等待时间和重试次数。本人遇到这样的问题解决了

消费者 推送数据

import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class KafkaConsumerExample {    public static void main(String[] args) {        // 配置消费者参数Properties props = new Properties();props.put("bootstrap.servers", "ip:port");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("session.timeout.ms", "30000");props.put("auto.offset.reset", "earliest");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("max.poll.records", "10000");props.put("fetch.min.bytes", "1024");props.put("fetch.max.bytes", "1048576");props.put("fetch.max.wait.ms", "500");props.put("max.partition.fetch.bytes", "1024");props.put("connections.max.idle.ms", "540000");props.put("request.timeout.ms", "40000");props.put("retry.backoff.ms", "500");props.put("security.protocol", "SSL");props.put("ssl.keystore.location", "/path/to/keystore");props.put("ssl.keystore.password", "password");props.put("ssl.truststore.location", "/path/to/truststore");props.put("ssl.truststore.password", "password");        // 创建Kafka消费者实例        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);        // 订阅主题        consumer.subscribe(Arrays.asList("my-topic"));        // 创建线程池        ExecutorService executor = Executors.newFixedThreadPool(6);        // 消费消息        while (true) {            ConsumerRecords<String, String> records = consumer.poll(100);            for (ConsumerRecord<String, String> record : records) {                // 获取消息所在分区的编号                int partition = record.partition();                // 将消息提交给对应的线程进行处理                executor.submit(new MessageHandler(record.value(), partition));            }        }    }    // 消息处理器    static class MessageHandler implements Runnable {        private final String message;        private final int partition;        public MessageHandler(String message, int partition) {            this.message = message;            this.partition = partition;        }        @Override        public void run() {            // 对消息进行处理            System.out.printf("Partition %d: Message received: %s%n", partition, message);        }    }}

以上参数根据自己需求填写
可以根据分区 使用多线程执行

来源地址:https://blog.csdn.net/qq_44819486/article/details/130400332

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     801人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     348人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     311人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     432人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     220人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯