文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

SpringBoot集成MQTT示例详解

2024-04-02 19:55

关注

引言

特别提醒: 文中提到的MQTT服务器Apache-Apollo,现在已经不维护。但是客户端的写法是通用的。目前我常用的是RabbitMQ加mqtt插件。

MQTT

MQTT(消息队列遥测传输)是ISO标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议。国内很多企业都广泛使用MQTT作为Android手机客户端与服务器端推送消息的协议。

特点

MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:

至多一次:消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。

至少一次:确保消息到达,但消息重复可能会发生。

只有一次:确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。

Apache-Apollo

Apache Apollo是一个代理服务器,其是在ActiveMQ基础上发展而来的,可以支持STOMP, AMQP, MQTT, Openwire, SSL, WebSockets 等多种协议。

原理:服务器端创建一个唯一订阅号,发送者可以向这个订阅号中发东西,然后接受者(即订阅了这个订阅号的人)都会收到这个订阅号发出来的消息。以此来完成消息的推送。服务器其实是一个消息中转站。

下载

下载地址:http://archive.apache.org/dist/activemq/activemq-apollo/

配置与启动

SpringBoot2的开发

添加依赖

<!-- 
  spring-boot版本 2.1.0.RELEASE
-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
</dependency>

自定义配置

# src/main/resources/config/mqtt.properties
##################
#  MQTT 配置
##################
# 用户名
mqtt.username=admin
# 密码
mqtt.password=password
# 推送信息的连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.1.61:61613
mqtt.url=tcp://127.0.0.1:61613
##################
#  MQTT 生产者
##################
# 连接服务器默认客户端ID
mqtt.producer.clientId=mqttProducer
# 默认的推送主题,实际可在调用接口时指定
mqtt.producer.defaultTopic=topic1
##################
#  MQTT 消费者
##################
# 连接服务器默认客户端ID
mqtt.consumer.clientId=mqttConsumer
# 默认的接收主题,可以订阅多个Topic,逗号分隔
mqtt.consumer.defaultTopic=topic1

配置MQTT发布和订阅

import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

@Configuration
public class MqttConfig {
  private static final Logger LOGGER = LoggerFactory.getLogger(MqttConfig.class);
  private static final byte[] WILL_DATA;
  static {
    WILL_DATA = "offline".getBytes();
  }
  
  public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
  
  public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
  @Value("${mqtt.username}")
  private String username;
  @Value("${mqtt.password}")
  private String password;
  @Value("${mqtt.url}")
  private String url;
  @Value("${mqtt.producer.clientId}")
  private String producerClientId;
  @Value("${mqtt.producer.defaultTopic}")
  private String producerDefaultTopic;
  @Value("${mqtt.consumer.clientId}")
  private String consumerClientId;
  @Value("${mqtt.consumer.defaultTopic}")
  private String consumerDefaultTopic;
  
  @Bean
  public MqttConnectOptions getMqttConnectOptions() {
    MqttConnectOptions options = new MqttConnectOptions();
    // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
    // 这里设置为true表示每次连接到服务器都以新的身份连接
    options.setCleanSession(true);
    // 设置连接的用户名
    options.setUserName(username);
    // 设置连接的密码
    options.setPassword(password.toCharArray());
    options.setServerURIs(StringUtils.split(url, ","));
    // 设置超时时间 单位为秒
    options.setConnectionTimeout(10);
    // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
    options.setKeepAliveInterval(20);
    // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
    options.setWill("willTopic", WILL_DATA, 2, false);
    return options;
  }
  
  @Bean
  public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setConnectionOptions(getMqttConnectOptions());
    return factory;
  }
  
  @Bean(name = CHANNEL_NAME_OUT)
  public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
  }
  
  @Bean
  @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
  public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
        producerClientId,
        mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic(producerDefaultTopic);
    return messageHandler;
  }
  
  @Bean
  public MessageProducer inbound() {
    // 可以同时消费(订阅)多个Topic
    MqttPahoMessageDrivenChannelAdapter adapter =
        new MqttPahoMessageDrivenChannelAdapter(
            consumerClientId, mqttClientFactory(),
            StringUtils.split(consumerDefaultTopic, ","));
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    // 设置订阅通道
    adapter.setOutputChannel(mqttInboundChannel());
    return adapter;
  }
  
  @Bean(name = CHANNEL_NAME_IN)
  public MessageChannel mqttInboundChannel() {
    return new DirectChannel();
  }
  
  @Bean
  @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
  public MessageHandler handler() {
    return new MessageHandler() {
      @Override
      public void handleMessage(Message<?> message) throws MessagingException {
        LOGGER.error("===================={}============", message.getPayload());
      }
    };
  }
}

消息发布器

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
public interface IMqttSender {
  
  void sendToMqtt(String data);
  
  void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
      String payload);
  
  void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
      @Header(MqttHeaders.QOS) int qos,
      String payload);
}

发送消息


@Controller
@RequestMapping(value = "/")
public class MqttController {
  
  @Resource
  private IMqttSender iMqttSender;
  
  @ResponseBody
  @GetMapping(value = "/mqtt", produces ="text/html")
  public ResponseEntity<String> sendMqtt(@RequestParam(value = "msg") String message) {
    iMqttSender.sendToMqtt(message);
    return new ResponseEntity<>("OK", HttpStatus.OK);
  }
}

入口类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.context.annotation.PropertySource;

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class,
    HibernateJpaAutoConfiguration.class})
@PropertySource(encoding = "UTF-8", value = {"classpath:config/mqtt.properties"})
public class Application {
  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }
}

代码

https://gitee.com/bbfbbf/mqtt-test

以上就是SpringBoot集成MQTT示例详解的详细内容,更多关于SpringBoot集成MQTT的资料请关注编程网其它相关文章!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯