文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

微服务同时接入多个Kafka

2023-08-19 09:01

关注

最近在做微服务的迁移改造工作,其中有一个服务需要订阅多个Kafka,如果使用spring kafka自动配置的话只能配置一个Kafka,不符合需求,该文总结了如何配置多个Kafka,希望对您有帮助。

准备工作

# 配置文件选择自己对应的目录zookeeper-server-start.sh ../config/zookeeper.properties

Windows

windows/zookeeper-server-start.bat ../config/zookeeper.properties

打开另外一个终端,启动KafkaServer
Linux

kafka-server-start.sh ../config/server.properties

Windows

windows/kafka-server-start.bat ../config/server.properties

最小化配置Kafka

如下是最小化配置Kafka
pom.xml 引入依赖

<dependency><groupId>org.springframework.kafkagroupId><artifactId>spring-kafkaartifactId>dependency>

application.properties

server.port=8090spring.application.name=single-kafka-server#kafka 服务器地址spring.kafka.bootstrap-servers=localhost:9092#消费者分组,配置后,自动创建spring.kafka.consumer.group-id=default_group

KafkaProducer 生产者

@Slf4j@Component@EnableSchedulingpublic class KafkaProducer {    @Resource    private KafkaTemplate kafkaTemplate;    private void sendTest() {    //topic 会自动创建        kafkaTemplate.send("topic1", "hello kafka");    }    @Scheduled(fixedRate = 1000 * 10)    public void testKafka() {        log.info("send message...");        sendTest();    }}

KafkaConsumer 消费者

@Slf4j@Componentpublic class KafkaConsumer {    @KafkaListener(topics = {"topic1"})    public void processMessage(String spuId) {        log.warn("process spuId ={}", spuId);    }}

运行效果:
在这里插入图片描述

多Kafka配置

配置稍微复杂了一点,灵魂就是手动创建,同样引入依赖
pom.xml

<dependency><groupId>org.springframework.kafkagroupId><artifactId>spring-kafkaartifactId>dependency>

application.properties

server.port=8090spring.application.name=kafka-server#kafka1#服务器地址spring.kafka.one.bootstrap-servers=localhost:9092spring.kafka.one.consumer.group-id=default_group#kafka2spring.kafka.two.bootstrap-servers=localhost:9092spring.kafka.two.consumer.group-id=default_group2

第一个Kafka配置,需要区分各Bean的名称
KafkaOneConfig

@Configurationpublic class KafkaOneConfig {    @Value("${spring.kafka.one.bootstrap-servers}")    private String bootstrapServers;    @Value("${spring.kafka.one.consumer.group-id}")    private String groupId;    @Bean    public KafkaTemplate<String, String> kafkaOneTemplate() {        return new KafkaTemplate<>(producerFactory());    }    @Bean(name = "kafkaOneContainerFactory")    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaOneContainerFactory() {        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactory());        factory.getContainerProperties().setPollTimeout(3000);        return factory;    }    private ProducerFactory<String, String> producerFactory() {        return new DefaultKafkaProducerFactory<>(producerConfigs());    }    private ConsumerFactory<Integer, String> consumerFactory() {        return new DefaultKafkaConsumerFactory<>(consumerConfigs());    }    private Map<String, Object> producerConfigs() {        Map<String, Object> props = new HashMap<>();        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        return props;    }    private Map<String, Object> consumerConfigs() {        Map<String, Object> props = new HashMap<>();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        return props;    }}

kafkaOneTemplate 定义第一个Kafka的高级模板,用来发送消息
kafkaOneContainerFactory 消费监听容器,配置在@KafkaListener中,
producerFactory 生产者工厂
consumerFactory 消费者工厂
producerConfigs 生产者配置
consumerConfigs 消费者配置

同样创建第二个Kafka,配置含义,同第一个Kafka
KafkaTwoConfig

@Configurationpublic class KafkaTwoConfig {    @Value("${spring.kafka.two.bootstrap-servers}")    private String bootstrapServers;    @Value("${spring.kafka.two.consumer.group-id}")    private String groupId;    @Bean    public KafkaTemplate<String, String> kafkaTwoTemplate() {        return new KafkaTemplate<>(producerFactory());    }    @Bean(name = "kafkaTwoContainerFactory")    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactory());        factory.getContainerProperties().setPollTimeout(3000);        return factory;    }    private ProducerFactory<String, String> producerFactory() {        return new DefaultKafkaProducerFactory<>(producerConfigs());    }    public ConsumerFactory<Integer, String> consumerFactory() {        return new DefaultKafkaConsumerFactory<>(consumerConfigs());    }    private Map<String, Object> producerConfigs() {        Map<String, Object> props = new HashMap<>();        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        return props;    }    private Map<String, Object> consumerConfigs() {        Map<String, Object> props = new HashMap<>();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        return props;    }}

创建一个测试的消费者,注意配置不同的监听容器containerFactory
KafkaConsumer

@Slf4j@Componentpublic class KafkaConsumer {    @KafkaListener(topics = {"topic1"}, containerFactory = "kafkaOneContainerFactory")    public void oneProcessItemcenterSpuMessage(String spuId) {        log.warn("one process spuId ={}", spuId);    }    @KafkaListener(topics = {"topic2"},containerFactory = "kafkaTwoContainerFactory")    public void twoProcessItemcenterSpuMessage(String spuId) {        log.warn("two process spuId ={}", spuId);    }}

创建一个测试的生产者,定时往两个topic中发送消息
KafkaProducer

@Slf4j@Componentpublic class KafkaProducer {    @Resource    private KafkaTemplate kafkaOneTemplate;    @Resource    private KafkaTemplate kafkaTwoTemplate;    private void sendTest() {        kafkaOneTemplate.send("topic1", "hello kafka one");        kafkaTwoTemplate.send("topic2", "hello kafka two");    }    @Scheduled(fixedRate = 1000 * 10)    public void testKafka() {        log.info("send message...");        sendTest();    }}

最后运行效果:
在这里插入图片描述

其他kafka文章:
【从面试题看源码】-看完Kafka性能优化-让你吊打面试官

来源地址:https://blog.csdn.net/weixin_40972073/article/details/126682094

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯