第一步:引入maven依赖
<dependency> <groupId>org.springframework.kafkagroupId> <artifactId>spring-kafkaartifactId>dependency>
第二步:新增配置文件
以下为大致结构,供参考
spring: kafka: # 第一个kafka的配置 first: bootstrap-servers: xxx.xxx.xxx.xxx:xxxx producer: retries: x acks: -1 consumer: enable-auto-commit: false group-id: first-consumer listener: ack-mode: xx # 第二个kafka的配置 second: bootstrap-servers: xxx.xxx.xxx.xxx:xxxx producer: batch-size: xxxx buffer-memory: xxxxxx consumer: auto-offset-reset: earliest group-id: second-consumer listener: concurrency: xx
第三步:新增配置类
第一个kafka的配置类
@Configurationpublic class FirstKafkaConfig { @Primary @ConfigurationProperties(prefix = "spring.kafka.first") @Bean public KafkaProperties firstKafkaProperties() { return new KafkaProperties(); } @Primary @Bean public KafkaTemplate<String, String> firstKafkaTemplate( @Autowired @Qualifier("firstKafkaProperties") KafkaProperties firstKafkaProperties) { return new KafkaTemplate<>(firstProducerFactory(firstKafkaProperties)); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> firstKafkaListenerContainerFactory(@Autowired @Qualifier("firstKafkaProperties") KafkaProperties firstKafkaProperties) { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(firstConsumerFactory(firstKafkaProperties)); return factory; } private ConsumerFactory<? super Integer, ? super String> firstConsumerFactory(KafkaProperties firstKafkaProperties) { return new DefaultKafkaConsumerFactory<>(firstKafkaProperties.buildConsumerProperties()); } private DefaultKafkaProducerFactory<String, String> firstProducerFactory(KafkaProperties firstKafkaProperties) { return new DefaultKafkaProducerFactory<>(firstKafkaProperties.buildProducerProperties()); }}
第二个kafka的配置类
@Configurationpublic class SecondKafkaConfig { @ConfigurationProperties(prefix = "spring.kafka.second") @Bean public KafkaProperties secondKafkaProperties() { return new KafkaProperties(); } @Bean public KafkaTemplate<String, String> secondKafkaTemplate( @Autowired @Qualifier("secondKafkaProperties") KafkaProperties secondKafkaProperties) { return new KafkaTemplate<>(secondProducerFactory(secondKafkaProperties)); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> secondKafkaListenerContainerFactory(@Autowired @Qualifier("secondKafkaProperties") KafkaProperties secondKafkaProperties) { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(secondConsumerFactory(secondKafkaProperties)); return factory; } private ConsumerFactory<? super Integer, ? super String> secondConsumerFactory(KafkaProperties secondKafkaProperties) { return new DefaultKafkaConsumerFactory<>(secondKafkaProperties.buildConsumerProperties()); } private DefaultKafkaProducerFactory<String, String> secondProducerFactory(KafkaProperties secondKafkaProperties) { return new DefaultKafkaProducerFactory<>(secondKafkaProperties.buildProducerProperties()); }}
第四步:用
生产者用法
@Resourceprivate KafkaTemplate<String, String> firstKafkaTemplate;@Resource(name = "secondKafkaTemplate")private KafkaTemplate<String, String> secondKafkaTemplate;
消费者用法
@KafkaListener( containerFactory = "secondKafkaListenerContainerFactory", topics = {"xxxx"}, groupId = "second-consumer")public void testConsumer(ConsumerRecord<?, ?> record, Acknowledgment ack) { //do something}
来源地址:https://blog.csdn.net/qq_35893873/article/details/130620679