文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Python简单试用MQTT服务器

2023-01-31 06:10

关注

经历过各种问题的磨难终于基本搭建完成了自己的MQTT服务器,接下来我就赶紧写个Python程序测试下.

这里采用paho.mqtt.python编写程序,详情参阅这里
打开powershell,执行pip install paho-mqtt安装模块

# coding=utf-8
import json
import threading

import paho.mqtt.client as mqtt

# 当连接上服务器后回调此函数
import time

from my_lib.code_handle.code_handle import auto_code
from windows_info.read_info import Win_psutil


class MqttClient:
    client = mqtt.Client('tester')

    def __init__(self, host, port):
        self._host = host
        self._port = port
        self.client.on_connect = self._on_connect  # 设置连接上服务器回调函数
        self.client.on_message = self._on_message  # 设置接收到服务器消息回调函数

    def connect(self, username='tester', password='tester'):
        self.client.username_pw_set(username, password)
        self.client.connect(self._host, self._port, 60)  # 连接服务器,端口为1883,维持心跳为60秒

    def publish(self, topic, data):
        self.client.publish(topic, data)

    def loop(self, timeout=None):
        thread = threading.Thread(target=self._loop, args=(timeout,))
        # thread.setDaemon(True)
        thread.start()

    def _loop(self, timeout=None):
        if not timeout:
            self.client.loop_forever()
        else:
            self.client.loop(timeout)

    def _on_connect(self, client, userdata, flags, rc):
        print("\nConnected with result code " + str(rc))
        client.subscribe("test-0")

    def _on_message(self, client, userdata, msg):  # 从服务器接受到消息后回调此函数
        print "\n主题:" + auto_code(str(msg.topic)) + " 消息:" + auto_code(str(msg.payload))

    def _is_json(self, data):
        try:
            json.loads(data)
        except ValueError:
            return False
        return True

    def publish_loop(self):
        pass


if __name__ == '__main__':
	host=None
    client = MqttClient(host, 1883)
    client.connect('tester','tester')
    client.publish('test-0', '我上线啦!')
	client.loop()
    wp = Win_psutil()#自己定义的一个类
    while True:
        data_json=wp.auto_json()#方法返回一个包含CPU和进程信息的JSON字符串
        client.publish('test-0',data_json)
        time.sleep(2)

这里自己封装了类,主要功能是连上服务器订阅默认主题,接收到消息即打印出来.
在主程序中先实例化类,接着使用默认用户名与密码登陆,在主题"test-0上"发布信息,接着定时将打包成JSON信息的数据发布到"test-0"这个主题

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-服务器
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯