一、前景摘要
- DES版本:DevEco Studio 3.0 Release
- SDK版本:3.2.2.5 ( API9)
- npm版本:6.14.16
- EMQX:Linux(Ubuntu)
- MQTTX:Version: v1.9.2
二、了解MQTT
1、什么是MQTT?
MQTT**(**消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件。
2、MQTT特性
MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:
使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
- 对负载内容屏蔽的消息传输。
- 使用 TCP/IP 提供网络连接。
- 有三种消息发布服务质量:QoS(定阅等级),分0、1、2三个等级,简单来说是等级越高越可靠。
- 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
- 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。
3、应用场景(多客户端,少量消息)
- 车联网
- 工业互联网
- 智能家居
- 视频直播弹幕
- IM实时聊天(一对一聊天,群组聊天)
- 推送服务,比如推送实时新闻
- 金融交易数据订阅推送
4、基本名词
MQTT协议中的三种身份
- MQTT Broker:代理服务器
- Publish:发布者,发布消息
- Subscribe:订阅者,订阅消息
MQTT传输的消息
主题(Topic),负载(payload)和QoS
- Topic:消息的类型,订阅者订阅后就会收到该主题的消息内容(payload).
- Payload:消息的内容,指订阅者具体要使用的内容。
- QoS:服务质量.
“至多一次”(QoS0):消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。即是推送之后就完事了,至于对方有没有收到,收到是什么,数据有没有丢失,都不管。
“至少一次”(QoS1):确保消息到达,但消息重复可能会发生。即是你收到推送后,你还得返回一个puback给对方,告诉对方收到了,不然对方会以为你没收到,隔一段时间后重新给你推送,直到你给对方返回一个Puback为止。
“只有一次”(QoS2):确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
异常中断的机制
使用 Last Will 和 Testament 特性通知有关各方客户端。
- Last Will:即遗言机制,用于通知同一主题下的其他设备发送遗言的设备已经断开了连接。
- Testament:遗言机制,功能类似于Last Will。
三、MQTT的简单使用
1、搭建MQTT服务器EMQX
MQTT服务器有很多种,且部署方式也不一样。
Linux (Ubuntu)
安装命令:
curl -s https://assets.emqx.com/scripts/install-emqx-deb.sh | sudo bash
sudo apt-get install emqx :安装
sudo systemctl start emqx :启动
下载链接:https://www.emqx.io/zh/downloads?os=Ubuntu
Docker:https://emqx.io/zh/downloads?os=Docker
Linux(CentOS):https://www.emqx.io/zh/downloads?os=CentOS
Windows:https://www.emqx.io/zh/downloads?os=Windows
测试:确保emqx已正常运行后,可在Linux本地浏览器中输入: http://127.0.0.1:18083。
注意:未登录需修改密码,两次保持一致。
局域网其他电脑访问须知Linux的IP地址。
2、MQTTX
创建连接
创建subscription
收发消息
#代表通配符,代表订阅所有该类型的topic。
3、在OpenHarmony使用MQTT
安装依赖
依赖地址:https://gitee.com/openharmony-tpc/ohos_mqtt。
有所不同的是我用的npm安装的依赖npm install @ohos/mqtt。
创建Mqtt客户端
创建mqtt客户端并建立连接:
const clientOptions: MqttClientOptions = {
url: '192.168.xxx.xxx/:1883',
clientId: 'client_id_' + new Date().getTime(),
persistenceType: 1,
}
const connectOptions: MqttConnectOptions = {
userName: '',
password: '',
connectTimeout: 30,
}
this.mqClient = MqttAsync.createMqtt(this.clientOptions);
this.mqClient.connect(this.connectOptions, (data: MqttResponse) => {
console.log(TAG+" data: "+JSON.stringify(data));
if (data.code == 0) {
this.messageArrived();
this.subscribe('主设备号/#');
}
});
监听
//接收消息,使用此接口后,当订阅的主题有消息发布时,会自动接收到消息。
public messageArrived(): void {
this.mqClient.messageArrived((err, data) => {
console.log(TAG+"messageArrived!!!!!!!!!!!");
console.log(TAG+"messageArrived data:"+JSON.stringify(data));
});
}
订阅消息
// 订阅消息
public subscribe(topic: string, qos: QoS = 1): void {
const subscribeOption: MqttSubscribeOptions = { topic, qos };
this.mqClient.subscribe(subscribeOption, (err, data)=>{
this.handleMessage(data)
});
}
发布消息
// 发布消息
public publish(topic: string, payload: string | Record, qos: QoS = 0): void {
if (typeof payload !== 'string') {
payload = JSON.stringify(payload)
}
const payloadLen = payload.length;
const publishOption: MqttPublishOptions = { topic, payload, qos, payloadLen };
console.log(TAG, 'publishOption data: ' + JSON.stringify(publishOption));
this.mqClient.publish(publishOption, (err, data)=>{
console.log(TAG+"publish!!!!!!!!!!!");
console.log(TAG+"publish data:"+JSON.stringify(data));
}));
}
根据订阅的消息的主题的不同进行不同的处理
handleMessage(data:any){
console.log(TAG+"subscribe!!!!!!!!!!!");
console.log(TAG+"subscribe data:"+JSON.stringify(data));
//根据data的不同进行不同的处理
}
四、扩展
mqtt与MQ中间件的关系。
消息中间件是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统。
MQTT 与消息队列有一定的区别,队列是一种先进先出的数据结构,消息队列常用于应用服务层面,实现参考如 RabbitMQ Kafka RocketMQ。
MQTT 是传输协议,绝大部分 MQTT Broker 不保证消息顺序(Queue),常用语物联网、消息传输等。