mqtt协议调试时需要个客户端工具,但网上找的体积包都很大,都不够小巧和便携。于是趁周末时间用python搞出来了个客户端工具,使用python+tinker+paho.mqtt实现。源码量很少但功能不弱,相当的轻量级。分享给有需要的小伙伴,喜欢的可以点击收藏。
前言
用python实现个跨平台的mqtt客户端工具,同时介绍下python的mqtt客户端库paho.mqtt的使用。界面这里选择使用了python自带的tkinter,虽不是很好用,但相当的轻量级,对于造一个工具来说足够啦。且配合ttkbootstrap这个包,界面可以美化,还更换皮肤,这点儿挺不错。但是如果界面特别复杂的话推荐pyqt。
工具下载地址:https://download.csdn.net/download/qq8864/88351834
界面效果:
环境准备
使用python3实现的mqtt客户端工具, 相当的轻量级。源码不大仅一个文件。
需要安装依赖包:
pip install ttkbootstrappip install -i https://pypi.doubanio.com/simple paho-mqtt
tkinter是python自带的标准gui库,对于我们自己日常做一些小程序出来给自己使用是非常不错的。因为tkinter相比较其它强大的gui库(PyQT,WxPython等等)而言要简单、方便、学起来也容易得很多,所以用来造个小工具非常nice,但它做出来的界面不是很好看。
ttkbootstrap
介绍
ttkbootstrap
是一个基于 tkinter
和 ttk
的Python库,它提供了一套现代化的主题和样式,可以用于创建漂亮的图形用户界面(GUI)应用程序。它是基于 Bootstrap
框架的设计风格,为 tkinter
应用程序提供了一致的外观和用户体验。
官方文档:ttkbootstrap - ttkbootstrap
ttkbootstrap
库简单示例,选择主题:
import ttkbootstrap as ttkfrom ttkbootstrap.constants import *root = ttk.Window()root.geometry("500x400+500+150") style = ttk.Style()theme_names = style.theme_names()#以列表的形式返回多个主题名theme_selection = ttk.Frame(root, padding=(10, 10, 10, 0))theme_selection.pack(fill=X, expand=YES)lbl = ttk.Label(theme_selection, text="选择主题:")theme_cbo = ttk.Combobox( master=theme_selection, text=style.theme.name, values=theme_names,)theme_cbo.pack(padx=10, side=RIGHT)theme_cbo.current(theme_names.index(style.theme.name))lbl.pack(side=RIGHT)def change_theme(event): theme_cbo_value = theme_cbo.get() style.theme_use(theme_cbo_value) theme_selected.configure(text=theme_cbo_value) theme_cbo.selection_clear()theme_cbo.bind('<>', change_theme)theme_selected = ttk.Label( master=theme_selection, text="litera", font="-size 24 -weight bold")theme_selected.pack(side=LEFT)root.mainloop()
ttkbootstrap简单使用
import ttkbootstrap as ttk#实例化创建应用程序窗口root = ttk.Window( title="窗口名字", #设置窗口的标题 themename="litera", #设置主题 size=(1066,600), #窗口的大小 position=(100,100), #窗口所在的位置 minsize=(0,0), #窗口的最小宽高 maxsize=(1920,1080), #窗口的最大宽高 resizable=None, #设置窗口是否可以更改大小 alpha=1.0, #设置窗口的透明度(0.0完全透明) )# root.place_window_center() #让显现出的窗口居中# root.resizable(False,False) #让窗口不可更改大小# root.wm_attributes('-topmost', 1)#让窗口位置其它窗口之上root.mainloop()
标签显示:
import ttkbootstrap as ttkfrom ttkbootstrap.constants import *root = ttk.Window()ttk.Label(root,text="标签1",bootstyle=INFO).pack(side=ttk.LEFT, padx=5, pady=10)ttk.Label(root,text="标签2",bootstyle="inverse").pack(side=ttk.LEFT, padx=5, pady=10)ttk.Label(root,text="标签3",bootstyle="inverse-danger").pack(side=ttk.LEFT, padx=5, pady=10)ttk.Label(root, text="标签4", bootstyle=WARNING, font=("微软雅黑", 15), background='#94a2a4').pack(side=LEFT, padx=5, pady=10)root.mainloop()'''# bootstyle colorsPRIMARY = 'primary'SECONDARY = 'secondary'SUCCESS = 'success'DANGER = 'danger'WARNING = 'warning'INFO = 'info'LIGHT = 'light'DARK = 'dark'# bootstyle typesOUTLINE = 'outline'LINK = 'link'TOGGLE = 'toggle'INVERSE = 'inverse'STRIPED = 'striped'TOOLBUTTON = 'toolbutton'ROUND = 'round'SQUARE = 'square''''
paho-mqtt库介绍
paho-mqtt 是目前 Python 中使用较多的 MQTT 客户端库,它在 Python 2.7 或 3.x 上为客户端类提供了对 MQTT v3.1 和 v3.1.1 的支持。它还提供了一些帮助程序功能,使将消息发布到 MQTT 服务器变得非常简单。
paho-mqtt简单使用:
# -*- coding: utf-8 -*-# -*- coding: utf-8 -*- import paho.mqtt.client as mqttimport time def on_connect(client, userdata, flags, rc): print "链接" print("Connected with result code: " + str(rc)) def on_message(client, userdata, msg): print "消息内容" print(msg.topic + " " + str(msg.payload)) # 订阅回调def on_subscribe(client, userdata, mid, granted_qos): print "订阅" print("On Subscribed: qos = %d" % granted_qos) pass # 取消订阅回调def on_unsubscribe(client, userdata, mid, granted_qos): print "取消订阅" print("On unSubscribed: qos = %d" % granted_qos) pass # 发布消息回调def on_publish(client, userdata, mid): print "发布消息" print("On onPublish: qos = %d" % mid) pass # 断开链接回调def on_disconnect(client, userdata, rc): print "断开链接" print("Unexpected disconnection rc = " + str(rc)) pass client = mqtt.Client()client.on_connect = on_connectclient.on_message = on_messageclient.on_publish = on_publishclient.on_disconnect = on_disconnectclient.on_unsubscribe = on_unsubscribeclient.on_subscribe = on_subscribeclient.connect('127.0.0.1', 1883, 600) # 600为keepalive的时间间隔while True: client.publish(topic='mqtt11', payload='amazing', qos=0, retain=False) time.sleep(2)
参数解释:
- keepalive => 心跳间隔,单位是秒,如果 broker 和 client 在这段时间内没有任何通讯,client 会给 broker 发送一个 ping 消息
- retain => 如果设为 Ture ,这条消息会被设为保留消息
- payload => 消息内容,字符串类型,如果设为 None ,会发送一条长度为 0 消息。如果设置了 int 或者 3. float 类型的值,会当做字符串发送,如果你想发送真正的 int 或者 float 值,需要用 struct.pack() 生成消息, mqtt的publish 只支持None, string, int, float 类型的数据, 如果需要发送json类型数据可以通过json.dumps()将数据进行转换后在发送, 接收端在on_message()回调函数中通过json.loads() 将数据解析就可以了
- topic => 这条消息所属的话题
- qos => 消息的安全等级 Qos详细介绍
- qos=0 QoS0,At most once,至多一次;
- QoS0 代表,Sender 发送的一条消息,Receiver 最多能收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,也就算了;
- qos=1 QoS1,At least once,至少一次;
- QoS1 代表,Sender 发送的一条消息,Receiver 至少能收到一次,也就是说 Sender 向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,但是因为重传的原因,Receiver 有可能会收到重复的消息;
- qos=2 QoS2,Exactly once,确保只有一次
- QoS2 代表,Sender 发送的一条消息,Receiver 确保能收到而且只收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息。
源码实现
源码实现比较简单,主要是里面有一些注意事项。比如图片的显示,网上关于tkinter不能显示图片的问题有很多。这里给出绝对可行的做法。
label的图片显示
from PIL import Image, ImageTk #..... img = Image.open("me.jpg") # 替换为你的图片路径 img = img.resize((80,80)) #self._img = ImageTk.PhotoImage(file = "me.jpg") self._img = ImageTk.PhotoImage(img) self.about = Label(self.fr1) self.about.image = self._img self.about.configure(image=self._img) self.about.place(x=65,y=0,width=80,height=80)
判断是否为16进制
def ISHEX(data): #判断输入字符串是否为十六进制 if len(data)%2: return False for item in data: if item not in '0123456789ABCDEFabcdef': #循环判断数字和字符 return False return True
保存文件
def savefiles(self): #保存日志TXT文本 try: with open('log.txt','a') as file: #a方式打开 文本追加模式 file.write(self.txt_rx.get(0.0,'end')) messagebox.showinfo('提示', '保存成功') except: messagebox.showinfo('错误', '保存日志文件失败!')
自动滚动到末尾
def appendTxt(self,msg): current_t = datetime.now() current_ = current_t.strftime("%Y-%m-%d %H:%M:%S ") self.txt_rx.insert(END,current_) self.txt_rx.insert(END,msg) self.txt_rx.insert(END,"\n") #滚动到末尾 self.txt_rx.see(END) self.txt_rx.update_idletasks()
mqtt的使用
def connect(self,addr,port,alive=60): self.client.connect(addr, port,alive) self.client.loop_start() def disconnect(self): self.client.loop_stop() self.client.disconnect() print("disconnect!") def on_connect(self, client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker ok!\n") self.appendTxt("Connected to MQTT Broker ok!\n") self.var_bt1.set("断开") self.isConnect = True else: print("Failed to connect, return code %d\n", rc) self.appendTxt(f"Failed to connect, return code: {rc}\n") self.isConnect = False def on_message(self, client, userdata, msg): self.tx_rx_cnt(1,0) print("Received message: " + msg.payload.decode()) self.appendTxt(f"Received message:\n[topic]:{msg.topic}\n{msg.payload.decode()}\n") def subscribe(self, topic): #item = Entry(self.subitem).get() if topic in self.subitem.get(0, END): print("item already exists.") else: self.appendTxt(f"[订阅topic]:{topic}\n") self.client.subscribe(topic) self.subitem.insert(END, topic) def publish(self, topic, message): self.client.publish(topic, message) self.appendTxt(f"[发布topic]:{topic}\n{message}\n")
打包为exe程序
python脚本打包为exe有很多方法。常用的为pyinstaller,使用简单,但是有不少缺点,如打包体积大,启动速度慢等。所以这里并不推荐它,推荐使用nuitka。
pyinstaller的打包
#pyinstaller方式打包pyinstaller -F -w -i chengzi.ico py_word.py 打包指定exe图标打包
注:以上仅是简单的打包,如果带图片或文件资源,打包后运行也是失败的。
带图片资源的打包方法:
pyi-makespec -F .\mqttclienttool.py
执行上面这条命令,会生成 mqttclienttool.spec文件,这个文件可以编辑。打开并编辑这个文件,
data选项中增加:datas=[('me.jpg', '.')], 小括号内的写法,第一个为文件名,第二个为路径。还有个需要注意的地方是,代码里引用路径需要采用绝对路径的方式如:
img_path = os.path.join(os.path.dirname(__file__), 'me.jpg')img = Image.open(img_path)
# -*- mode: python ; coding: utf-8 -*-block_cipher = Nonea = Analysis( ['mqttclienttool.py'], pathex=[], binaries=[], datas=[('me.jpg', '.')], hiddenimports=[], hookspath=[], hooksconfig={}, runtime_hooks=[], excludes=[], win_no_prefer_redirects=False, win_private_assemblies=False, cipher=block_cipher, noarchive=False,)pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
最后执行以下命令即可。
pyinstaller .\mqttclienttool.spec
nuitka的安装
-
直接利用pip即可安装:
pip install Nuitka
-
需具备(MSVS)或者MinGW64等C++的编译器。
nuitka使用过程
对于第三方依赖包较多的项目(比如需要import torch,tensorflow,cv2,numpy,pandas,geopy等等)而言,这里最好打包的方式是只将属于自己的代码转成C++,不管这些大型的第三方包!
使用举例:
nuitka --standalone --show-memory --show-progress --nofollow-imports --plugin-enable=qt-plugins --follow-import-to=utils,src --output-dir=out --windows-icon-from-ico=./logo.ico ./demo.py
简单介绍下上面的nuitka的命令:
-
--standalone
:方便移植到其他机器,不用再安装python -
--show-memory --show-progress
:展示整个安装的进度过程 -
--nofollow-imports
:不编译代码中所有的import,比如keras,numpy之类的。 -
--plugin-enable=qt-plugins
:如果用到pyqt5来做界面,这里nuitka有其对应的插件。 -
--plugin-enable=qt-plugins
-
--follow-import-to=utils,src
:需要编译成C++代码的指定的2个包含源码的文件夹,这里用,
来进行分隔。 -
--output-dir=out
:指定输出的结果路径为out。 -
--windows-icon-from-ico=./logo.ico
:指定生成的exe的图标为logo.ico这个图标,这里推荐一个将图片转成ico格式文件的网站(比特虫)。 -
--windows-disable-console
:运行exe取消弹框。这里没有放上去是因为我们还需要调试,让可以弹出DOS窗口便于查看print的日志。
完整实现
# -*- coding: utf-8 -*-# @Time : 2023/09/17 12:49# @Author : yangyongzhen# @Email : 534117529@qq.com# @File : mqttclienttool.py# @Project : studyimport timeimport osfrom tkinter.ttk import *from tkinter import *from datetime import datetimeimport timeimport threadingfrom tkinter import messageboxfrom ttkbootstrap import Styleimport paho.mqtt.client as mqttfrom PIL import Image, ImageTkglobal gui #全局型式保存GUI句柄tx_cnt=0 #发送条数统计rx_cnt=0 #接收条数统计def ISHEX(data): #判断输入字符串是否为十六进制 if len(data)%2: return False for item in data: if item not in '0123456789ABCDEFabcdef': #循环判断数字和字符 return False return True'''GUI'''''''''''''''''''''''''''''''''''''''''''''''''''''''''class GUI: def __init__(self): self.root = Tk() self.root.title('MQTT调试助手-author:blog.csdn.net/qq8864') #窗口名称 self.root.geometry("820x560+500+150") #尺寸位置 self.root.resizable(False, False) self.interface() Style(theme='pulse') #主题修改 可选['cyborg', 'journal', 'darkly', 'flatly' 'solar', 'minty', 'litera', 'united', 'pulse', 'cosmo', 'lumen', 'yeti', 'superhero','sandstone'] #self.client.on_log = self.log_callback self.isConnect = False self._img = None def interface(self): """"界面编写位置""" #--------------------------------操作区域-----------------------------# self.fr1=Frame(self.root) self.fr1.place(x=0,y=0,width=220,height=600) #区域1位置尺寸 img_path = os.path.join(os.path.dirname(__file__), 'me.jpg') img = Image.open(img_path) # 替换为你的图片路径 img = img.resize((80,80)) #self._img = ImageTk.PhotoImage(file = "me.jpg") self._img = ImageTk.PhotoImage(img) self.about = Label(self.fr1) self.about.image = self._img self.about.configure(image=self._img) self.about.place(x=65,y=0,width=80,height=80) pos = 80 self.lb_server =Label(self.fr1, text='地址:',anchor="e",fg='red') #点击可刷新 self.lb_server.place(x=0,y=pos,width=50,height=35) self.txt_server = Text(self.fr1) self.txt_server.place(x=65,y=pos,width=155,height=26) self.txt_server.insert("1.0", "127.0.0.1") self.lb1 =Label(self.fr1, text='端口:',anchor="e",fg='red') #点击可刷新 self.lb1.place(x=0,y=pos+40,width=50,height=35) self.txt_port = Text(self.fr1) self.txt_port.place(x=65,y=pos+40,width=155,height=26) self.txt_port.insert("1.0", 1883) self.lb1 =Label(self.fr1, text='clientID:',anchor="e",fg='red') #点击可刷新 self.lb1.place(x=0,y=pos+80,width=50,height=35) self.txt_id = Text(self.fr1) self.txt_id.place(x=65,y=pos+80,width=155,height=26) self.txt_id.insert("1.0", "mqtt-client") self.lb1 =Label(self.fr1, text='用户名:',anchor="e",fg='red') #点击可刷新 self.lb1.place(x=0,y=pos+120,width=50,height=35) self.txt_name = Text(self.fr1) self.txt_name.place(x=65,y=pos+120,width=155,height=26) self.lb1 =Label(self.fr1, text='密码 :',anchor="e",fg='red') #点击可刷新 self.lb1.place(x=0,y=pos+160,width=50,height=35) self.txt_pwd = Text(self.fr1) self.txt_pwd.place(x=65,y=pos+160,width=155,height=26) self.lb1 =Label(self.fr1, text='心跳 :',anchor="e",fg='red') #点击可刷新 self.lb1.place(x=0,y=pos+200,width=50,height=35) self.txt_heart = Text(self.fr1) self.txt_heart.place(x=65,y=pos+200,width=155,height=26) self.txt_heart.insert("1.0", 60) self.var_bt1 = StringVar() self.var_bt1.set("连接") self.btn1 = Button(self.fr1,textvariable=self.var_bt1,command=self.btn_connect) #绑定 btn_connect 方法 self.btn1.place(x=170,y=pos+240,width=50,height=30) self.lb_s =Label(self.fr1, text='订阅主题',bg="yellow",anchor='w') #字节统计 self.lb_s.place(x=5,y=340,width=90,height=28) self.txt_sub = Text(self.fr1) self.txt_sub.place(x=5,y=368,width=155,height=28) self.btn5 = Button(self.fr1, text='订阅',command=self.btn_sub) #测试用 self.btn5.place(x=170,y=368,width=50,height=28) self.subitem = Listbox(self.fr1) self.subitem.place(x=5,y=402,width=215,height=85) #self.subitem.insert(END, "This is a read-only Text widget.") self.subitem.bind("", self.on_right_click) #-------------------------------文本区域-----------------------------# self.fr2=Frame(self.root) #区域1 容器 relief groove=凹 ridge=凸 self.fr2.place(x=220,y=0,width=620,height=560) #区域1位置尺寸 self.txt_rx = Text(self.fr2) self.txt_rx.place(relheight=0.6,relwidth=0.9,relx=0.05,rely=0.01) #比例计算控件尺寸和位置 self.scrollbar = Scrollbar(self.txt_rx) self.scrollbar.pack(side=RIGHT, fill=Y) self.txt_rx.config(yscrollcommand=self.scrollbar.set) self.scrollbar.config(command=self.txt_rx.yview) self.txt_rx.bind("", self.check_scrollbar) self.lb_t =Label(self.fr2, text='发布主题',bg="yellow",anchor='w') #字节统计 self.lb_t.place(relheight=0.04,relwidth=0.2,relx=0.05,rely=0.62) self.lb_qos =Label(self.fr2, text='QoS:',bg="yellow",anchor='w') #字节统计 self.lb_qos.place(relheight=0.04,relwidth=0.15,relx=0.15,rely=0.62) self.var_cb1 = IntVar() self.comb1 = Combobox(self.fr2,textvariable=self.var_cb1) self.comb1['values'] = [0,1,2] #列出可用等级 self.comb1.current(0) # 设置默认选项 0开始 self.comb1.place(relheight=0.04,relwidth=0.08,relx=0.22,rely=0.615) self.txt_topic = Text(self.fr2) self.txt_topic.place(relheight=0.05,relwidth=0.9,relx=0.05,rely=0.66) #比例计算控件尺寸位置 self.txt_tx = Text(self.fr2) self.txt_tx.place(relheight=0.15,relwidth=0.9,relx=0.05,rely=0.72) #比例计算控件尺寸位置 self.btn6 = Button(self.fr2, text='发送',command=self.btn_send) #绑定发送方法 self.btn6.place(relheight=0.06,relwidth=0.11,relx=0.84,rely=0.88) self.btn3 = Button(self.fr2, text='清空',command = self.txt_clr) #绑定清空方法 self.btn4 = Button(self.fr2, text='保存',command=self.savefiles) #绑定保存方法 self.btn3.place(relheight=0.06,relwidth=0.11,relx=0.05,rely=0.88) self.btn4.place(relheight=0.06,relwidth=0.11,relx=0.18,rely=0.88) self.lb3 =Label(self.fr2, text='接收:0 发送:0',bg="yellow",anchor='w') #字节统计 self.lb3.place(relheight=0.05,relwidth=0.3,relx=0.045,rely=0.945) self.lb4 = Label(self.fr2, text=' ', anchor='w',relief=GROOVE) #时钟 self.lb4.place(relheight=0.05, relwidth=0.11, relx=0.84, rely=0.945)#------------------------------------------方法----------------------------------------------- def check_scrollbar(self,*args): if self.txt_rx.yview() == (0.0, 1.0): self.scrollbar.pack_forget() else: self.scrollbar.place(RIGHT, fill=Y) def on_right_click(self,w): idx = self.subitem.curselection() print("Right-Clicked idx:", idx) if idx == (): return selected_item = self.subitem.get(idx) print("Right-Clicked on:", selected_item,idx) ret = messagebox.askyesno('取消订阅', "取消订阅:\n"+selected_item) if ret: self.subitem.delete(idx) self.client.unsubscribe(selected_item) self.appendTxt("取消订阅:"+selected_item) def gettim(self):#获取时间 未用 timestr = time.strftime("%H:%M:%S") # 获取当前的时间并转化为字符串 self.lb4.configure(text=timestr) # 重新设置标签文本 # tim_str = str(datetime.datetime.now()) + '\n' # self.lb4['text'] = tim_str #self.lb3['text'] = '接收:'+str(rx_cnt),'发送:'+str(tx_cnt) self.txt_rx.after(1000, self.gettim) # 每隔1s调用函数 gettime 自身获取时间 GUI自带的定时函数 def txt_clr(self):#清空显示 self.txt_rx.delete(0.0, 'end') # 清空文本框 self.txt_tx.delete(0.0, 'end') # 清空文本框 def ascii_hex_get(self):#获取单选框状态 if(self.var_cs.get()): return False else: return True def tx_rx_cnt(self,rx=0,tx=0): #发送接收统计 global tx_cnt global rx_cnt rx_cnt += rx tx_cnt += tx self.lb3['text'] = '接收:'+str(rx_cnt),'发送:'+str(tx_cnt) def savefiles(self): #保存日志TXT文本 try: with open('log.txt','a') as file: #a方式打开 文本追加模式 file.write(self.txt_rx.get(0.0,'end')) messagebox.showinfo('提示', '保存成功') except: messagebox.showinfo('错误', '保存日志文件失败!') def log_callback(self,client, userdata, level, buf): print(buf) def appendTxt(self,msg,flag = None): current_t = datetime.now() current_ = current_t.strftime("%Y-%m-%d %H:%M:%S ") self.txt_rx.insert(END,current_) self.txt_rx.insert(END,msg) self.txt_rx.insert(END,"\n") #滚动到末尾 self.txt_rx.see(END) self.txt_rx.update_idletasks() def connect(self,addr,port,alive=60): self.client.connect(addr, port,alive) self.client.loop_start() def disconnect(self): self.client.loop_stop() self.client.disconnect() print("disconnect!") def on_connect(self, client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker ok!\n") self.appendTxt("Connected to MQTT Broker ok!\n") self.var_bt1.set("断开") self.isConnect = True else: print("Failed to connect, return code %d\n", rc) self.appendTxt(f"Failed to connect, return code: {rc}\n") self.isConnect = False def on_message(self, client, userdata, msg): self.tx_rx_cnt(1,0) print("Received message: " + msg.payload.decode()) self.appendTxt(f"Received message:\n[topic]:{msg.topic}\n{msg.payload.decode()}\n","RECV") def subscribe(self, topic): #item = Entry(self.subitem).get() if topic in self.subitem.get(0, END): print("item already exists.") else: self.appendTxt(f"[订阅topic]:{topic}\n") self.client.subscribe(topic) self.subitem.insert(END, topic) def publish(self, topic, message,qos=0): self.client.publish(topic, message,qos) self.appendTxt(f"[发布topic]:{topic}\n{message}\n") def btn_connect(self):#连接 global isConnect if self.var_bt1.get() == '连接': server = self.txt_server.get("1.0",END).strip() port = self.txt_port.get("1.0",END).strip() alive = self.txt_heart.get("1.0",END).strip() user = self.txt_name.get("1.0",END).strip() psd = self.txt_pwd.get("1.0",END).strip() cid = self.txt_id.get("1.0",END).strip() #用户名密码设置 if len(user) !=0 : self.client.username_pw_set(user, psd)self.client = mqtt.Client(cid) #MQTT self.client.on_connect = self.on_connect self.client.on_message = self.on_message print("btn connect click: "+server+","+port+",QoS:"+self.comb1.get()) self.appendTxt(f"连接 {server},port:{port}\n") self.connect(server,int(port),int(alive)) else: self.disconnect() self.var_bt1.set("连接") self.isConnect = False self.appendTxt(f"断开连接!\n") def btn_sub(self):#订阅 if self.isConnect: sub = self.txt_sub.get("1.0",END).strip() print("btn sub click,topic: "+sub) self.subscribe(sub) else: messagebox.showinfo('提示', '服务器未连接!') def btn_send(self):#发布 if self.isConnect: pub_topic = self.txt_topic.get("1.0",END).strip() payload = self.txt_tx.get("1.0",END).strip() print("btn pub click,topic: "+pub_topic) self.publish(pub_topic,payload,int(self.comb1.get())) self.tx_rx_cnt(0,1) else: messagebox.showinfo('提示', '请连接服务器!')if __name__ == '__main__': print('Start...') gui = GUI() gui.gettim() #开启时钟 gui.root.mainloop() print('End...')
最后,再附带个python实现的串口调试助手源码:
import timefrom tkinter.ttk import *from tkinter import *import datetimeimport serial # 导入模块import serial.tools.list_portsimport threadingfrom tkinter import messageboxfrom ttkbootstrap import Styleglobal UART #全局型式保存串口句柄global RX_THREAD #全局型式保存串口接收函数global gui #全局型式保存GUI句柄tx_cnt=0 #发送字符数统计rx_cnt=0 #接收字符数统计def ISHEX(data): #判断输入字符串是否为十六进制 if len(data)%2: return False for item in data: if item not in '0123456789ABCDEFabcdef': #循环判断数字和字符 return False return Truedef uart_open_close(fun,com,bund): #串口打开关闭控制 global UART global RX_THREAD if fun==1:#打开串口 try: UART = serial.Serial(com, bund, timeout=0.2) # 提取串口号和波特率并打开串口 if UART.isOpen(): #判断是否打开成功 lock = threading.Lock() RX_THREAD = UART_RX_TREAD('URX1',lock) #开启数据接收进程 RX_THREAD.setDaemon(True) #开启守护进程 主进程结束后接收进程也关闭 会报警告 不知道咋回事 RX_THREAD.start() RX_THREAD.resume() return True except: return False return False else: #关闭串口 print("关闭串口") RX_THREAD.pause() UART.close()def uart_tx(data,isHex=False): #串口发送数据 global UART try: if UART.isOpen(): #发送前判断串口状态 避免错误 print("uart_send=" + data) gui.tx_rx_cnt(tx=len(data)) #发送计数 if isHex: #十六进制发送 data_bytes = bytes.fromhex(data) return UART.write(bytes(data_bytes)) else: #字符发送 return UART.write(data.encode('gb2312')) except:#错误返回 messagebox.showinfo('错误', '发送失败')class UART_RX_TREAD(threading.Thread): #数据接收进程 部分重构 global gui def __init__(self, name, lock): threading.Thread.__init__(self) self.mName = name self.mLock = lock self.mEvent = threading.Event() def run(self): #主函数 print('开启数据接收\r') while True: self.mEvent.wait() self.mLock.acquire() if UART.isOpen(): rx_buf = UART.read() if len(rx_buf) >0: rx_buf += UART.readall() #有延迟但不易出错 gui.tx_rx_cnt(rx=len(rx_buf)) if gui.ascii_hex_get() == False: print('收到hex数据', rx_buf.hex().upper()) gui.txt_rx.insert(END, rx_buf.hex().upper()) else: str_data = str(rx_buf, encoding='gb2312') print("串口收到消息:", len(rx_buf), str_data) gui.txt_rx.insert(END,str_data) # self.txt_rx.insert(END,str_data) self.mLock.release() #time.sleep(3) def pause(self): #暂停 self.mEvent.clear() def resume(self):#恢复 self.mEvent.set()'''GUI'''''''''''''''''''''''''''''''''''''''''''''''''''''''''class GUI: def __init__(self): self.root = Tk() self.root.title('梵德觅串口调试助手') #窗口名称 self.root.geometry("800x360+500+150") #尺寸位置 self.interface() Style(theme='pulse') #主题修改 可选['cyborg', 'journal', 'darkly', 'flatly' 'solar', 'minty', 'litera', 'united', 'pulse', 'cosmo', 'lumen', 'yeti', 'superhero','sandstone'] def interface(self): """"界面编写位置""" #--------------------------------操作区域-----------------------------# self.fr1=Frame(self.root) self.fr1.place(x=0,y=0,width=180,height=360) #区域1位置尺寸 self.lb1 =Label(self.fr1, text='端口号:',font="微软雅黑",fg='red') #点击可刷新 self.lb1.place(x=0,y=5,width=100,height=35) self.var_cb1 = StringVar() self.comb1 = Combobox(self.fr1,textvariable=self.var_cb1) self.comb1['values'] = list(serial.tools.list_ports.comports()) #列出可用串口 self.comb1.current(0) # 设置默认选项 0开始 self.comb1.place(x=10,y=40,width=150,height=30) com=list(serial.tools.list_ports.comports()) print('**********可用串口***********') for i in range(0, len(com)): print(com[i]) print('***************************') self.lb2 = Label(self.fr1, text='波特率:') self.comb2 = Combobox(self.fr1,values=['2400','9600','57600','115200']) self.comb2.current(3) #设置默认选项 115200 self.lb2.place(x=5,y=75,width=60,height=20) self.comb2.place(x=10,y=100,width=100,height=25) self.var_bt1 = StringVar() self.var_bt1.set("打开串口") self.btn1 = Button(self.fr1,textvariable=self.var_bt1,command=self.uart_opn_close) #绑定 uart_opn_close 方法 self.btn1.place(x=10,y=140,width=60,height=30) self.var_cs = IntVar() #定义返回类型 self.rd1 = Radiobutton(self.fr1,text="Ascii",variable=self.var_cs,value=0,command = self.txt_clr) #选择后清除显示内容 self.rd2 = Radiobutton(self.fr1,text="Hex",variable=self.var_cs,value=1,command = self.txt_clr) self.rd1.place(x=5,y=180,width=60,height=30) self.rd2.place(x=5,y=210,width=60,height=30) self.btn3 = Button(self.fr1, text='清空',command = self.txt_clr) #绑定清空方法 self.btn4 = Button(self.fr1, text='保存',command=self.savefiles) #绑定保存方法 self.btn3.place(x=10,y=260,width=60,height=30) self.btn4.place(x=100,y=260,width=60,height=30) self.btn5 = Button(self.fr1, text='功能',command=self.ascii_hex_get) #测试用 self.btn6 = Button(self.fr1, text='发送',command=self.uart_send) #绑定发送方法 self.btn5.place(x=10,y=315,width=60,height=30) self.btn6.place(x=100,y=315,width=60,height=30) #-------------------------------文本区域-----------------------------# self.fr2=Frame(self.root) #区域1 容器 relief groove=凹 ridge=凸 self.fr2.place(x=180,y=0,width=620,height=360) #区域1位置尺寸 self.txt_rx = Text(self.fr2) self.txt_rx.place(relheight=0.6,relwidth=0.9,relx=0.05,rely=0.01) #比例计算控件尺寸和位置 self.txt_tx = Text(self.fr2) self.txt_tx.place(relheight=0.25,relwidth=0.9,relx=0.05,rely=0.66) #比例计算控件尺寸位置 self.lb3 =Label(self.fr2, text='接收:0 发送:0',bg="yellow",anchor='w') #字节统计 self.lb3.place(relheight=0.05,relwidth=0.3,relx=0.045,rely=0.925) self.lb4 = Label(self.fr2, text=' ', anchor='w',relief=GROOVE) #时钟 self.lb4.place(relheight=0.05, relwidth=0.1, relx=0.85, rely=0.935)#------------------------------------------方法----------------------------------------------- def gettim(self):#获取时间 未用 timestr = time.strftime("%H:%M:%S") # 获取当前的时间并转化为字符串 self.lb4.configure(text=timestr) # 重新设置标签文本 # tim_str = str(datetime.datetime.now()) + '\n' # self.lb4['text'] = tim_str self.txt_rx.after(1000, self.gettim) # 每隔1s调用函数 gettime 自身获取时间 GUI自带的定时函数 def txt_clr(self):#清空显示 self.txt_rx.delete(0.0, 'end') # 清空文本框 self.txt_tx.delete(0.0, 'end') # 清空文本框 def ascii_hex_get(self):#获取单选框状态 if(self.var_cs.get()): return False else: return True def uart_opn_close(self):#打开关闭串口 if(self.var_bt1.get() == '打开串口'): if(uart_open_close(1,str(self.comb1.get())[0:5],self.comb2.get())==True): #传递下拉框选择的参数 COM号+波特率 【0:5】表示只提取COM号字符 self.var_bt1.set('关闭串口') #改变按键内容 self.txt_rx.insert(0.0, self.comb1.get() + ' 打开成功\r\n') # 开头插入 else: print("串口打开失败") messagebox.showinfo('错误','串口打开失败') else: uart_open_close(0, 'COM1', 115200) #关闭时参数无效 self.var_bt1.set('打开串口') def uart_send(self): #发送数据 send_data = self.txt_tx.get(0.0, 'end').strip() if self.ascii_hex_get(): #字符发送 uart_tx(send_data) else: send_data = send_data.replace(" ", "").replace("\n", "0A").replace("\r", "0D") #替换空格和回车换行 if(ISHEX(send_data)==False): messagebox.showinfo('错误', '请输入十六进制数') return uart_tx(send_data,True) def tx_rx_cnt(self,rx=0,tx=0): #发送接收统计 global tx_cnt global rx_cnt rx_cnt += rx tx_cnt += tx self.lb3['text'] = '接收:'+str(rx_cnt),'发送:'+str(tx_cnt) def savefiles(self): #保存日志TXT文本 try: with open('log.txt','a') as file: #a方式打开 文本追加模式 file.write(self.txt_rx.get(0.0,'end')) messagebox.showinfo('提示', '保存成功') except: messagebox.showinfo('错误', '保存日志文件失败!')if __name__ == '__main__': print('Star...') gui = GUI() gui.gettim() #开启时钟 gui.root.mainloop() UART.close() #结束关闭 避免下次打开错误 print('End...')
其他资源
[Python]tkinter的美颜-ttkbootstrap:让tkinter更加美观 - 知乎
在 Python 中使用 MQTT的方法_python mqtt_liming89的博客-CSDN博客
Python GUI之tkinter的皮肤(ttkbootstrap)打造出你的窗口之美_tkinter漂亮gui界面模板_清&轻的博客-CSDN博客
使用python编写mqtt客户端向EMQX服务器发送数据_python mqtt客户端_TMS320VC5257H的博客-CSDN博客
MQTT在Python中的使用mqtt-paho(简单实例, 回调函数,回调参数,qos安全等级)详解及回调函数的正确用法_mqtt python-CSDN博客
MQTT在Python中的使用mqtt-paho(简单实例, 回调函数,回调参数,qos安全等级)详解及回调函数的正确用法_mqtt python-CSDN博客
使用nuitka打包python代码为exe可执行程序_ha_lee的博客-CSDN博客
Python 打包工具 Nuitka 入门指南_半勺蜂蜜~的博客-CSDN博客
Nuitka常见问题解决集锦-独孤九剑之破Bug式 - 知乎
WinLibs - GCC+MinGW-w64 compiler for Windows
利用Nuitka打包py文件_https://github.com/ccache/ccache/releases/download-CSDN博客