文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

kafka复习:(24)consume-transform-produce模式

2023-08-30 15:41

关注
package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.*;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.errors.ProducerFencedException;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;import java.util.*;public class KafkaTest24 {    public static final String brokerList = "k8s-master:9092";    public static Properties getConsumerProperties() {        Properties props = new Properties();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        //必须配置手动提交        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);        props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");        return props;    }    public static Properties getProducerProperties() {        Properties props = new Properties();        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");        return props;    }    //先从source-topic消费,再往sink-topic生产    public static void main(String[] args) {        KafkaConsumer consumer = new KafkaConsumer<>(getConsumerProperties());        consumer.subscribe(Collections.singletonList("source-topic"));        KafkaProducer producer = new KafkaProducer<>(getProducerProperties());        //初始化事务        producer.initTransactions();        while (true) {            ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));            if (!records.isEmpty()) {                Map offsets = new HashMap<>();                //开启事务                producer.beginTransaction();                try {                    for (TopicPartition partition : records.partitions()) {                        List> partitionRecords = records.records(partition);                        for (ConsumerRecord record : partitionRecords) {ProducerRecord producerRecord =        new ProducerRecord<>("sink-topic", record.key(), record.value());producer.send(producerRecord);System.out.println("sent :" + record.value());                        }                        long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();                        offsets.put(partition, new OffsetAndMetadata(lastConsumedOffset + 1));                    }                    // 提交消费位移                    // consume-transform-produce模式,此处的group id 必须要配置成consumer 中配置的group id                    producer.sendOffsetsToTransaction(offsets, "groupId");                    producer.commitTransaction();                } catch (ProducerFencedException e) {                    producer.abortTransaction();                }            }        }    }}

来源地址:https://blog.csdn.net/amadeus_liu2/article/details/132578447

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-人工智能
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯