文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

spring integration怎么连接MQTT

2023-07-05 11:12

关注

本篇内容主要讲解“spring integration怎么连接MQTT”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“spring integration怎么连接MQTT”吧!

MQTT一种物联网数据传输协议,构建在TCP之上,采用发布与订阅的模式进行数据交互,发布与订阅是两个独立的连接通道,这里采用spring-integration-mqt来实现发布与订阅MQTT,与直接采用MQTT的SDK相对要简单许多,服务端采用ActiveMQ来支持MQTT的消息服务并实现消息转发。

首先需要引入spring-integration-mqt的包

这里只需要引入这一个包即可。

<dependency>     <groupId>org.springframework.integration</groupId>     <artifactId>spring-integration-mqtt</artifactId>     <version>5.3.1.RELEASE</version></dependency>

MQTT的配置比较简单

和spring-integration集成一样,需要配置相对应的入站、出站就可以了

具体配置如下:

package org.noka.serialservice.config; import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.noka.serialservice.service.MsgSendService;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;import org.springframework.context.ApplicationEvent;import org.springframework.context.ApplicationListener;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.integration.config.EnableIntegration;import org.springframework.integration.endpoint.MessageProducerSupport;import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;import org.springframework.integration.mqtt.core.MqttPahoClientFactory;import org.springframework.integration.mqtt.event.MqttSubscribedEvent;import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;import org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler;import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;import org.springframework.integration.support.MessageBuilder; @EnableIntegration@Configuration@ConditionalOnProperty("mqtt.services")public class MQTTConfig implements ApplicationListener<ApplicationEvent> {    private static Logger logger = LoggerFactory.getLogger(MQTTConfig.class);     private final MsgSendService msgSendService;//发布消息到消息中间件接口     @Value("${mqtt.appid:mqtt_id}")    private String appid;//客户端ID     @Value("${mqtt.input.topic:mqtt_input_topic}")    private String[] inputTopic;//订阅主题,可以是多个主题     @Value("${mqtt.out.topic:mqtt_out_topic}")    private String[] outTopic;//发布主题,可以是多个主题     @Value("${mqtt.services:#{null}}")    private String[] mqttServices;//服务器地址以及端口     @Value("${mqtt.user:#{null}}")    private String user;//用户名     @Value("${mqtt.password:#{null}}")    private String password;//密码     @Value("${mqtt.KeepAliveInterval:300}")    private Integer KeepAliveInterval;//心跳时间,默认为5分钟     @Value("${mqtt.CleanSession:false}")    private Boolean CleanSession;//是否不保持session,默认为session保持     @Value("${mqtt.AutomaticReconnect:true}")    private Boolean AutomaticReconnect;//是否自动重联,默认为开启自动重联     @Value("${mqtt.CompletionTimeout:30000}")    private Long CompletionTimeout;//连接超时,默认为30秒     @Value("${mqtt.Qos:1}")    private Integer Qos;//通信质量,详见MQTT协议      public MQTTConfig(MsgSendService msgSendService) {        this.msgSendService = msgSendService;    }         @Bean    public MqttPahoClientFactory mqttClientFactory() {        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();//连接工厂类        MqttConnectOptions options = new MqttConnectOptions();//连接参数        options.setServerURIs(mqttServices);//连接地址        if(null!=user) {            options.setUserName(user);//用户名        }        if(null!=password) {            options.setPassword(password.toCharArray());//密码        }        options.setKeepAliveInterval(KeepAliveInterval);//心跳时间        options.setAutomaticReconnect(AutomaticReconnect);//断开是否自动重联        options.setCleanSession(CleanSession);//保持session        factory.setConnectionOptions(options);        return factory;    }         @Bean    public MessageProducerSupport mqttInput(MqttPahoClientFactory mqttPahoClientFactory){        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(appid, mqttPahoClientFactory, inputTopic);//建立订阅连接        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();        converter.setPayloadAsBytes(true);//bytes类型接收        adapter.setCompletionTimeout(CompletionTimeout);//连接超时的时间        adapter.setConverter(converter);        adapter.setQos(Qos);//消息质量        adapter.setOutputChannelName(ChannelName.INPUT_DATA);//输入管道名称        return adapter;    }        @Bean    @ServiceActivator(inputChannel = ChannelName.OUTPUT_DATA_MQTT)    public AbstractMqttMessageHandler MQTTOutAdapter(MqttPahoClientFactory connectionFactory) {        //创建一个新的出站管道,由于MQTT的发布与订阅是两个独立的连接,因此客户端的ID(即APPID)不能与订阅时所使用的ID一样,否则在服务端会认为是同一个客户端,而造成连接失败        MqttPahoMessageHandler outGate = new MqttPahoMessageHandler(appid + "_put", connectionFactory);        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();        converter.setPayloadAsBytes(true);//bytes类型接收        outGate.setAsync(true);        outGate.setCompletionTimeout(CompletionTimeout);//设置连接超时时时        outGate.setDefaultQos(Qos);//设置通信质量        outGate.setConverter(converter);        return outGate;    }         @Override    public void onApplicationEvent(ApplicationEvent event) {        if (event instanceof MqttSubscribedEvent) {            String msg = "OK";                        msgSendService.send(MessageBuilder.withPayload(msg.getBytes()).build());        }    }}

其中ChanneName是一个常量类

来标识入站、出站管道的名称,以便在其它需要的地方使用,实现方法如下:

public class ChannelName {    public final static String INPUT_DATA="input_data";//入站管道    public final static String OUTPUT_DATA_TCP="output_data_TCP";//TCP出站管道    public final static String OUTPUT_DATA_MQTT="output_data_MQTT";//mqtt出站管道名称}

此时所有配置完成,接下来需要做的就是处理接收到的数据和发布数据,以上配置完成以后,接收和发送数据都是通过数据管道来完成,配置的是数据管道名称。

数据发送网关只是一个接口

用于向指定的数据管道里面发送数据,实现如下:

package org.noka.serialservice.service; import org.noka.serialservice.config.ChannelName;import org.springframework.integration.annotation.Gateway;import org.springframework.integration.annotation.MessagingGateway;import org.springframework.integration.mqtt.support.MqttHeaders;import org.springframework.messaging.Message;import org.springframework.messaging.handler.annotation.Header;import org.springframework.stereotype.Component; @MessagingGateway@Componentpublic interface MsgGateway {        @Gateway(requestChannel = ChannelName.OUTPUT_DATA_MQTT)    void send(@Header(MqttHeaders.TOPIC) String a, Message<byte[]> out);}

在需要的地方,可以向下面这样调用这个接口,向MQTT服务器发送消息

//topic为主题名称,out为消息内容msgGateway.send(topic, out);

MQTT服务器有数据下发时

会自动调将数据放入配置的入站数据管道中,在需要接收数据的地方,向下面这样配置即可

        @ServiceActivator(inputChannel = ChannelName.INPUT_DATA)    public void upCase(Message<byte[]> in) {        logger.info("[net service data]========================================");        logger.info("[net dow data]"+new String(in.getPayload()));//字符串方式打印服务器下发的数据        logger.info("[net dow hex]"+ Hex.encodeHexString(in.getPayload(),false));//16进制方式打印服务器下发的数据        serialService.send(in.getPayload());//将服务器下发的数据转发给串口    }

最后是参数配置文件

#--------MQTT---------------------------#设备ID,唯一标识mqtt.appid=mqtt_id#订阅主题,多个主题用逗号分隔mqtt.input.topic=mqtt_input_topic#发布主题mqtt.out.topic=mqtt_out_topic,aac#MQTT服务器地址,可以是多个地址mqtt.services=tcp://47.244.191.41:1883#mqtt用户名,默认无#mqtt.user=guest#mqtt密码,默认无#mqtt.password=guest#心跳间隔时间,默认3000#mqtt.KeepAliveInterval=3000#是否不保持session,默认false#mqtt.CleanSession=false#是否自动连接,默认true#mqtt.AutomaticReconnect=true#连接超时,默认30000#mqtt.CompletionTimeout=30000#传输质量,默认1#mqtt.Qos=1

到此,相信大家对“spring integration怎么连接MQTT”有了更深的了解,不妨来实际操作一番吧!这里是编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯