文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

SpringBoot怎么整合Pulsar

2023-07-02 14:58

关注

这篇文章主要介绍了SpringBoot怎么整合Pulsar的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇SpringBoot怎么整合Pulsar文章都会有所收获,下面我们一起来看看吧。

一、添加pom.xml依赖

<parent>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-parent</artifactId>    <version>2.7.0</version></parent><dependencies>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-web</artifactId>    </dependency>    <dependency>        <groupId>org.apache.pulsar</groupId>        <artifactId>pulsar-client</artifactId>        <version>2.10.0</version>    </dependency>    <dependency>        <groupId>org.projectlombok</groupId>        <artifactId>lombok</artifactId>        <version>1.18.24</version>        <scope>provided</scope>    </dependency></dependencies><build>    <plugins>        <plugin>            <groupId>org.apache.maven.plugins</groupId>            <artifactId>maven-compiler-plugin</artifactId>            <configuration>                <source>8</source>                <target>8</target>            </configuration>        </plugin>    </plugins></build>

二、Pulsar 参数类

import lombok.Data;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.stereotype.Component;import java.util.Map;@Component@ConfigurationProperties(prefix = "tdmq.pulsar")@Datapublic class PulsarProperties {        private String serviceurl;        private String tdcNamespace;        private String tdcToken;        private String cluster;        private Map<String, String> topicMap;        private Map<String, String> subMap;        private String onOff;}

三、Pulsar 配置类

import org.apache.pulsar.client.api.AuthenticationFactory;import org.apache.pulsar.client.api.PulsarClient;import org.apache.pulsar.client.api.PulsarClientException;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.context.properties.EnableConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration@EnableConfigurationProperties(PulsarProperties.class)public class PulsarConfig {    @Autowired    PulsarProperties pulsarProperties;    @Bean    public PulsarClient getPulsarClient() {        try {            return PulsarClient.builder()                    .authentication(AuthenticationFactory.token(pulsarProperties.getTdcToken()))                    .serviceUrl(pulsarProperties.getServiceurl())                    .build();        } catch (PulsarClientException e) {            System.out.println(e);            throw new RuntimeException("初始化Pulsar Client失败");        }    }}

四、不同消费数据类型的监听器

import com.yibo.pulsar.pojo.User;import org.apache.pulsar.client.api.Consumer;import org.apache.pulsar.client.api.Message;import org.apache.pulsar.client.api.MessageListener;import org.springframework.stereotype.Component;@Componentpublic class UserMessageListener implements MessageListener<User> {    @Override    public void received(Consumer<User> consumer, Message<User> msg) {        try {            User user = msg.getValue();            System.out.println(user);            consumer.acknowledge(msg);        } catch (Exception e) {            consumer.negativeAcknowledge(msg);        }    }}import org.apache.pulsar.client.api.Consumer;import org.apache.pulsar.client.api.Message;import org.apache.pulsar.client.api.MessageListener;import org.springframework.stereotype.Component;@Componentpublic class StringMessageListener implements MessageListener<String> {    @Override    public void received(Consumer<String> consumer, Message<String> msg) {        try {            System.out.println(msg.getValue());            consumer.acknowledge(msg);        } catch (Exception e) {            consumer.negativeAcknowledge(msg);        }    }}

五、Pulsar的核心服务类

import com.yibo.pulsar.common.listener.StringMessageListener;import com.yibo.pulsar.common.listener.UserMessageListener;import com.yibo.pulsar.pojo.User;import org.apache.pulsar.client.api.*;import org.apache.pulsar.client.impl.schema.AvroSchema;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;@Componentpublic class PulsarCommon {    @Autowired    private PulsarProperties pulsarProperties;    @Autowired    private PulsarClient client;    @Autowired    private UserMessageListener userMessageListener;    @Autowired    private StringMessageListener stringMessageListener;        public <T> Producer<T> createProducer(String topic, Schema<T> schema) {        try {            return client.newProducer(schema)                    .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)                    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)                    .sendTimeout(10, TimeUnit.SECONDS)                    .blockIfQueueFull(true)                    .create();        } catch (PulsarClientException e) {            throw new RuntimeException("初始化Pulsar Producer失败");        }    }        public <T> Consumer<T> createConsumer(String topic, String subscription,                                   MessageListener<T> messageListener, Schema<T> schema) {        try {            return client.newConsumer(schema)                    .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)                    .subscriptionName(subscription)                    .ackTimeout(10, TimeUnit.SECONDS)                    .subscriptionType(SubscriptionType.Shared)                    .messageListener(messageListener)                    .subscribe();        } catch (PulsarClientException e) {            throw new RuntimeException("初始化Pulsar Consumer失败");        }    }            public <T> void sendAsyncMessage(T message, Producer<T> producer) {        producer.sendAsync(message).thenAccept(msgId -> {        });    }                public <T> void sendSyncMessage(T message, Producer<T> producer) throws PulsarClientException {        MessageId send = producer.send(message);        System.out.println();        System.out.println();        System.out.println();        System.out.println();        System.out.println(send);    }        //-----------consumer-----------    @Bean(name = "comment-publish-topic-consumer")    public Consumer<String> getCommentPublishTopicConsumer() {        return this.createConsumer(pulsarProperties.getTopicMap().get("comment-publish-topic"),                pulsarProperties.getSubMap().get("comment-publish-topic-test"),                stringMessageListener, Schema.STRING);    }    @Bean(name = "reply-publish-topic-consumer")    public Consumer<User> getReplyPublishTopicConsumer() {        return this.createConsumer(pulsarProperties.getTopicMap().get("reply-publish-topic"),                pulsarProperties.getSubMap().get("reply-publish-topic-test"),                userMessageListener, AvroSchema.of(User.class));    }    //-----------producer-----------    @Bean(name = "comment-publish-topic-producer")    public Producer<String> getCommentPublishTopicProducer() {        return this.createProducer(pulsarProperties.getTopicMap().get("comment-publish-topic"),Schema.STRING);    }    @Bean(name = "reply-publish-topic-producer")    public Producer<User> getReplyPublishTopicProducer() {        return this.createProducer(pulsarProperties.getTopicMap().get("reply-publish-topic"), AvroSchema.of(User.class));    }}

六、Pulsar整合Spring Cloud

后来发现如上代码会导致BUG-> 在更新Nacos配置之后 Consumer会挂掉
经排查发现结果是由于@RefreshScope注解导致,此注解将摧毁Bean,PulsarConsumer和Producer都将被摧毁,只是说Producer将在下⼀次调⽤中完成重启,Consumer则不能重启,因为没有调⽤,那么怎么解决呢?

就是发布系列事件以刷新容器

import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationEvent;import org.springframework.context.ApplicationListener;import org.springframework.stereotype.Component;@Component@Slf4jpublic class RefreshPulsarListener implements ApplicationListener {    @Autowired    ApplicationContext applicationContext;    @Override    public void onApplicationEvent(ApplicationEvent event) {        if (event.getSource().equals("__refreshAll__")) {            log.info("Nacos配置中心配置修改 重启Pulsar====================================");            log.info("重启PulsarClient,{}", applicationContext.getBean("getPulsarClient"));            log.info("重启PulsarConsumer,{}", applicationContext.getBean("comment-publish-topic-consumer"));            log.info("重启PulsarConsumer,{}", applicationContext.getBean("reply-publish-topic-consumer"));        }    }}

关于“SpringBoot怎么整合Pulsar”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“SpringBoot怎么整合Pulsar”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注编程网行业资讯频道。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯