Python实现的mqtt客户端工具分享,小巧且超轻量级(python+tkinter+paho.mqtt)

news2025/1/12 10:10:32

mqtt协议调试时需要个客户端工具,但网上找的体积包都很大,都不够小巧和便携。于是趁周末时间用python搞出来了个客户端工具,使用python+tinker+paho.mqtt实现。源码量很少但功能不弱,相当的轻量级。分享给有需要的小伙伴,喜欢的可以点击收藏。

前言

用python实现个跨平台的mqtt客户端工具,同时介绍下python的mqtt客户端库paho.mqtt的使用。界面这里选择使用了python自带的tkinter,虽不是很好用,但相当的轻量级,对于造一个工具来说足够啦。且配合ttkbootstrap这个包,界面可以美化,还更换皮肤,这点儿挺不错。但是如果界面特别复杂的话推荐pyqt。

界面效果:

环境准备

使用python3实现的mqtt客户端工具, 相当的轻量级。源码不大仅一个文件。

需要安装依赖包:

pip install ttkbootstrap
pip install -i https://pypi.doubanio.com/simple paho-mqtt

tkinter是python自带的标准gui库,对于我们自己日常做一些小程序出来给自己使用是非常不错的。因为tkinter相比较其它强大的gui库(PyQT,WxPython等等)而言要简单、方便、学起来也容易得很多,所以用来造个小工具非常nice,但它做出来的界面不是很好看。

ttkbootstrap 介绍

ttkbootstrap 是一个基于 tkinterttk 的Python库,它提供了一套现代化的主题和样式,可以用于创建漂亮的图形用户界面(GUI)应用程序。它是基于 Bootstrap 框架的设计风格,为 tkinter 应用程序提供了一致的外观和用户体验。

官方文档:ttkbootstrap - ttkbootstrap

ttkbootstrap 库简单示例,选择主题: 

import ttkbootstrap as ttk
from ttkbootstrap.constants import *
root = ttk.Window()
root.geometry("500x400+500+150")  
style = ttk.Style()
theme_names = style.theme_names()#以列表的形式返回多个主题名
theme_selection = ttk.Frame(root, padding=(10, 10, 10, 0))
theme_selection.pack(fill=X, expand=YES)
lbl = ttk.Label(theme_selection, text="选择主题:")
theme_cbo = ttk.Combobox(
        master=theme_selection,
        text=style.theme.name,
        values=theme_names,
)
theme_cbo.pack(padx=10, side=RIGHT)
theme_cbo.current(theme_names.index(style.theme.name))
lbl.pack(side=RIGHT)
def change_theme(event):
    theme_cbo_value = theme_cbo.get()
    style.theme_use(theme_cbo_value)
    theme_selected.configure(text=theme_cbo_value)
    theme_cbo.selection_clear()
theme_cbo.bind('<<ComboboxSelected>>', change_theme)
theme_selected = ttk.Label(
        master=theme_selection,
        text="litera",
        font="-size 24 -weight bold"
)
theme_selected.pack(side=LEFT)
root.mainloop()

ttkbootstrap简单使用

import ttkbootstrap as ttk
#实例化创建应用程序窗口
root = ttk.Window(
        title="窗口名字",        #设置窗口的标题
        themename="litera",     #设置主题
        size=(1066,600),        #窗口的大小
        position=(100,100),     #窗口所在的位置
        minsize=(0,0),          #窗口的最小宽高
        maxsize=(1920,1080),    #窗口的最大宽高
        resizable=None,         #设置窗口是否可以更改大小
        alpha=1.0,              #设置窗口的透明度(0.0完全透明)
        )
# root.place_window_center()    #让显现出的窗口居中
# root.resizable(False,False)   #让窗口不可更改大小
# root.wm_attributes('-topmost', 1)#让窗口位置其它窗口之上
root.mainloop()

标签显示:

import ttkbootstrap as ttk
from ttkbootstrap.constants import *
root = ttk.Window()
ttk.Label(root,text="标签1",bootstyle=INFO).pack(side=ttk.LEFT, padx=5, pady=10)
ttk.Label(root,text="标签2",bootstyle="inverse").pack(side=ttk.LEFT, padx=5, pady=10)
ttk.Label(root,text="标签3",bootstyle="inverse-danger").pack(side=ttk.LEFT, padx=5, pady=10)
ttk.Label(root, text="标签4", bootstyle=WARNING, font=("微软雅黑", 15), background='#94a2a4').pack(side=LEFT, padx=5, pady=10)
root.mainloop()
'''
# bootstyle colors
PRIMARY = 'primary'
SECONDARY = 'secondary'
SUCCESS = 'success'
DANGER = 'danger'
WARNING = 'warning'
INFO = 'info'
LIGHT = 'light'
DARK = 'dark'

# bootstyle types
OUTLINE = 'outline'
LINK = 'link'
TOGGLE = 'toggle'
INVERSE = 'inverse'
STRIPED = 'striped'
TOOLBUTTON = 'toolbutton'
ROUND = 'round'
SQUARE = 'square'
'''

 paho-mqtt库介绍

paho-mqtt 是目前 Python 中使用较多的 MQTT 客户端库,它在 Python 2.7 或 3.x 上为客户端类提供了对 MQTT v3.1 和 v3.1.1 的支持。它还提供了一些帮助程序功能,使将消息发布到 MQTT 服务器变得非常简单。 

paho-mqtt简单使用:

# -*- coding: utf-8 -*-# -*- coding: utf-8 -*-
 
 
import paho.mqtt.client as mqtt
import time
 
 
def on_connect(client, userdata, flags, rc):
    print "链接"
    print("Connected with result code: " + str(rc))
 
 
def on_message(client, userdata, msg):
    print "消息内容"
    print(msg.topic + " " + str(msg.payload))
 
 
#   订阅回调
def on_subscribe(client, userdata, mid, granted_qos):
    print "订阅"
    print("On Subscribed: qos = %d" % granted_qos)
    pass
 
 
#   取消订阅回调
def on_unsubscribe(client, userdata, mid, granted_qos):
    print "取消订阅"
    print("On unSubscribed: qos = %d" % granted_qos)
    pass
 
 
#   发布消息回调
def on_publish(client, userdata, mid):
    print "发布消息"
    print("On onPublish: qos = %d" % mid)
    pass
 
 
#   断开链接回调
def on_disconnect(client, userdata, rc):
    print "断开链接"
    print("Unexpected disconnection rc = " + str(rc))
    pass
 
 
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.on_disconnect = on_disconnect
client.on_unsubscribe = on_unsubscribe
client.on_subscribe = on_subscribe
client.connect('127.0.0.1', 1883, 600) # 600为keepalive的时间间隔
while True:
    client.publish(topic='mqtt11', payload='amazing', qos=0, retain=False)
    time.sleep(2)

参数解释:

  • keepalive =>   心跳间隔,单位是秒,如果 broker 和 client 在这段时间内没有任何通讯,client 会给 broker 发送一个 ping 消息
  • retain  =>  如果设为 Ture ,这条消息会被设为保留消息 
  • payload  => 消息内容,字符串类型,如果设为 None ,会发送一条长度为 0 消息。如果设置了 int 或者 3. float 类型的值,会当做字符串发送,如果你想发送真正的 int 或者 float 值,需要用 struct.pack() 生成消息, mqtt的publish 只支持None, string, int, float 类型的数据,  如果需要发送json类型数据可以通过json.dumps()将数据进行转换后在发送, 接收端在on_message()回调函数中通过json.loads() 将数据解析就可以了
  • topic =>  这条消息所属的话题
  • qos  => 消息的安全等级 Qos详细介绍
  • qos=0    QoS0,At most once,至多一次;
  • QoS0 代表,Sender 发送的一条消息,Receiver 最多能收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,也就算了;
  • qos=1    QoS1,At least once,至少一次;
  • QoS1 代表,Sender 发送的一条消息,Receiver 至少能收到一次,也就是说 Sender 向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,但是因为重传的原因,Receiver 有可能会收到重复的消息;
  • qos=2    QoS2,Exactly once,确保只有一次
  • QoS2 代表,Sender 发送的一条消息,Receiver 确保能收到而且只收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息。

源码实现

 源码实现比较简单,主要是里面有一些注意事项。比如图片的显示,网上关于tkinter不能显示图片的问题有很多。这里给出绝对可行的做法。

label的图片显示

from PIL import Image, ImageTk       
#.....
        img = Image.open("me.jpg")  # 替换为你的图片路径
        img = img.resize((80,80))
        #self._img = ImageTk.PhotoImage(file = "me.jpg")    
        self._img = ImageTk.PhotoImage(img)     
        self.about = Label(self.fr1) 
        self.about.image = self._img
        self.about.configure(image=self._img)
        self.about.place(x=65,y=0,width=80,height=80)

判断是否为16进制

def ISHEX(data):        #判断输入字符串是否为十六进制
    if len(data)%2:
        return False
    for item in data:
        if item not in '0123456789ABCDEFabcdef': #循环判断数字和字符
            return False
    return True

保存文件

def savefiles(self):   #保存日志TXT文本
        try:
            with open('log.txt','a') as file:       #a方式打开 文本追加模式
                file.write(self.txt_rx.get(0.0,'end'))
                messagebox.showinfo('提示', '保存成功')
        except:
            messagebox.showinfo('错误', '保存日志文件失败!')

 自动滚动到末尾

def appendTxt(self,msg):
        current_t = datetime.now()
        current_ = current_t.strftime("%Y-%m-%d %H:%M:%S ")
        self.txt_rx.insert(END,current_)
        self.txt_rx.insert(END,msg)
        self.txt_rx.insert(END,"\n")  
        #滚动到末尾
        self.txt_rx.see(END)
        self.txt_rx.update_idletasks()      

mqtt的使用

def connect(self,addr,port,alive=60):
        self.client.connect(addr, port,alive)
        self.client.loop_start()

    def disconnect(self):
        self.client.loop_stop()
        self.client.disconnect()
        print("disconnect!")

    def on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker ok!\n")
            self.appendTxt("Connected to MQTT Broker ok!\n")
            self.var_bt1.set("断开")
            self.isConnect = True
        else:
            print("Failed to connect, return code %d\n", rc)
            self.appendTxt(f"Failed to connect, return code: {rc}\n")
            self.isConnect = False

    def on_message(self, client, userdata, msg):
        self.tx_rx_cnt(1,0)
        print("Received message: " + msg.payload.decode())
        self.appendTxt(f"Received message:\n[topic]:{msg.topic}\n{msg.payload.decode()}\n")

    def subscribe(self, topic):
        #item = Entry(self.subitem).get()
        if topic in self.subitem.get(0, END):
            print("item already exists.")
        else:
            self.appendTxt(f"[订阅topic]:{topic}\n")
            self.client.subscribe(topic)
            self.subitem.insert(END, topic)

    def publish(self, topic, message):
        self.client.publish(topic, message)
        self.appendTxt(f"[发布topic]:{topic}\n{message}\n")

打包为exe程序

python脚本打包为exe有很多方法。常用的为pyinstaller,使用简单,但是有不少缺点,如打包体积大,启动速度慢等。所以这里并不推荐它,推荐使用nuitka。

#pyinstaller方式打包
pyinstaller -F -w -i chengzi.ico py_word.py 打包指定exe图标打包

nuitka的安装

  • 直接利用pip即可安装:pip install Nuitka

  • 需具备(MSVS)或者MinGW64等C++的编译器。

nuitka使用过程

对于第三方依赖包较多的项目(比如需要import torch,tensorflow,cv2,numpy,pandas,geopy等等)而言,这里最好打包的方式是只将属于自己的代码转成C++,不管这些大型的第三方包!

使用举例:

nuitka --standalone --show-memory --show-progress --nofollow-imports --plugin-enable=qt-plugins --follow-import-to=utils,src --output-dir=out --windows-icon-from-ico=./logo.ico ./demo.py

简单介绍下上面的nuitka的命令:

  • --standalone:方便移植到其他机器,不用再安装python

  • --show-memory --show-progress:展示整个安装的进度过程

  • --nofollow-imports:不编译代码中所有的import,比如keras,numpy之类的。

  • --plugin-enable=qt-plugins:如果用到pyqt5来做界面,这里nuitka有其对应的插件。

  • --plugin-enable=qt-plugins

  • --follow-import-to=utils,src:需要编译成C++代码的指定的2个包含源码的文件夹,这里用,来进行分隔。

  • --output-dir=out:指定输出的结果路径为out。

  • --windows-icon-from-ico=./logo.ico:指定生成的exe的图标为logo.ico这个图标,这里推荐一个将图片转成ico格式文件的网站(比特虫)。

  • --windows-disable-console:运行exe取消弹框。这里没有放上去是因为我们还需要调试,让可以弹出DOS窗口便于查看print的日志。

完整实现

# -*- coding: utf-8 -*-
# @Time : 2023/09/17 12:49
# @Author : yangyongzhen
# @Email : 534117529@qq.com
# @File : mqttclienttool.py
# @Project : study
import time

from tkinter.ttk import *
from tkinter import *
from datetime import datetime
import time
import threading
from tkinter import messagebox
from ttkbootstrap import Style
import paho.mqtt.client as mqtt
from PIL import Image, ImageTk

global gui           #全局型式保存GUI句柄

tx_cnt=0 #发送条数统计
rx_cnt=0 #接收条数统计

def ISHEX(data):        #判断输入字符串是否为十六进制
    if len(data)%2:
        return False
    for item in data:
        if item not in '0123456789ABCDEFabcdef': #循环判断数字和字符
            return False
    return True

'''GUI'''''''''''''''''''''''''''''''''''''''''''''''''''''''''
class GUI:
    def __init__(self):
        self.root = Tk()
        self.root.title('MQTT调试助手-author:blog.csdn.net/qq8864')             #窗口名称
        self.root.geometry("820x560+500+150")         #尺寸位置
        self.interface()
        Style(theme='pulse') #主题修改 可选['cyborg', 'journal', 'darkly', 'flatly' 'solar', 'minty', 'litera', 'united', 'pulse', 'cosmo', 'lumen', 'yeti', 'superhero','sandstone']
        self.client = mqtt.Client() #MQTT
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.isConnect = False
        self._img = None
    def interface(self):
        """"界面编写位置"""
        #--------------------------------操作区域-----------------------------#
        self.fr1=Frame(self.root)
        self.fr1.place(x=0,y=0,width=220,height=600)     #区域1位置尺寸
        
        img = Image.open("me.jpg")  # 替换为你的图片路径
        img = img.resize((80,80))
        #self._img = ImageTk.PhotoImage(file = "me.jpg")    
        self._img = ImageTk.PhotoImage(img)     
        self.about = Label(self.fr1) 
        self.about.image = self._img
        self.about.configure(image=self._img)
        self.about.place(x=65,y=0,width=80,height=80)
        pos = 80
        self.lb_server =Label(self.fr1, text='地址:',anchor="e",fg='red')  #点击可刷新
        self.lb_server.place(x=0,y=pos,width=50,height=35)
        self.txt_server = Text(self.fr1)
        self.txt_server.place(x=65,y=pos,width=155,height=26)
        self.txt_server.insert("1.0", "127.0.0.1")
        
        self.lb1 =Label(self.fr1, text='端口:',anchor="e",fg='red')  #点击可刷新
        self.lb1.place(x=0,y=pos+40,width=50,height=35)
        self.txt_port = Text(self.fr1)
        self.txt_port.place(x=65,y=pos+40,width=155,height=26)
        self.txt_port.insert("1.0", 1883)
        
        self.lb1 =Label(self.fr1, text='clientID:',anchor="e",fg='red')  #点击可刷新
        self.lb1.place(x=0,y=pos+80,width=50,height=35)
        self.txt_id = Text(self.fr1)
        self.txt_id.place(x=65,y=pos+80,width=155,height=26)
        self.txt_id.insert("1.0", "mqtt-client")
        
        self.lb1 =Label(self.fr1, text='用户名:',anchor="e",fg='red')  #点击可刷新
        self.lb1.place(x=0,y=pos+120,width=50,height=35)
        self.txt_name = Text(self.fr1)
        self.txt_name.place(x=65,y=pos+120,width=155,height=26)


        self.lb1 =Label(self.fr1, text='密码 :',anchor="e",fg='red')  #点击可刷新
        self.lb1.place(x=0,y=pos+160,width=50,height=35)
        self.txt_pwd = Text(self.fr1)
        self.txt_pwd.place(x=65,y=pos+160,width=155,height=26)
        
        self.lb1 =Label(self.fr1, text='心跳 :',anchor="e",fg='red')  #点击可刷新
        self.lb1.place(x=0,y=pos+200,width=50,height=35)
        self.txt_heart = Text(self.fr1)
        self.txt_heart.place(x=65,y=pos+200,width=155,height=26)
        self.txt_heart.insert("1.0", 60)

        self.var_bt1 = StringVar()
        self.var_bt1.set("连接")
        self.btn1 = Button(self.fr1,textvariable=self.var_bt1,command=self.btn_connect) #绑定 btn_connect 方法
        self.btn1.place(x=170,y=pos+240,width=50,height=30)


        self.lb_s =Label(self.fr1, text='订阅主题:',bg="yellow",anchor='w') #字节统计
        self.lb_s.place(x=5,y=340,width=90,height=28)
        
        self.txt_sub = Text(self.fr1)
        self.txt_sub.place(x=5,y=368,width=155,height=28)
        self.btn5 = Button(self.fr1, text='订阅',command=self.btn_sub) #测试用
        self.btn5.place(x=170,y=368,width=50,height=28)
    

        self.subitem = Listbox(self.fr1)
        self.subitem.place(x=5,y=402,width=215,height=85)
        #self.subitem.insert(END, "This is a read-only Text widget.")
        self.subitem.bind("<Button-3>", self.on_right_click)
        

        #-------------------------------文本区域-----------------------------#
        self.fr2=Frame(self.root)          #区域1 容器  relief   groove=凹  ridge=凸
        self.fr2.place(x=220,y=0,width=620,height=560)     #区域1位置尺寸

        self.txt_rx = Text(self.fr2)
        self.txt_rx.place(relheight=0.6,relwidth=0.9,relx=0.05,rely=0.01) #比例计算控件尺寸和位置
        
        self.scrollbar = Scrollbar(self.txt_rx)
        self.scrollbar.pack(side=RIGHT, fill=Y)
        self.txt_rx.config(yscrollcommand=self.scrollbar.set)
        self.scrollbar.config(command=self.txt_rx.yview)
        self.txt_rx.bind("<Configure>", self.check_scrollbar)

        self.lb_t =Label(self.fr2, text='发布主题:',bg="yellow",anchor='w') #字节统计
        self.lb_t.place(relheight=0.04,relwidth=0.9,relx=0.05,rely=0.62)
        
        self.txt_topic = Text(self.fr2)
        self.txt_topic.place(relheight=0.05,relwidth=0.9,relx=0.05,rely=0.66) #比例计算控件尺寸位置
        
        self.txt_tx = Text(self.fr2)
        self.txt_tx.place(relheight=0.15,relwidth=0.9,relx=0.05,rely=0.72) #比例计算控件尺寸位置

        self.btn6 = Button(self.fr2, text='发送',command=self.btn_send)  #绑定发送方法
        self.btn6.place(relheight=0.06,relwidth=0.11,relx=0.84,rely=0.88)
        
        self.btn3 = Button(self.fr2, text='清空',command = self.txt_clr) #绑定清空方法
        self.btn4 = Button(self.fr2, text='保存',command=self.savefiles) #绑定保存方法
        self.btn3.place(relheight=0.06,relwidth=0.11,relx=0.05,rely=0.88)
        self.btn4.place(relheight=0.06,relwidth=0.11,relx=0.18,rely=0.88)
        
        self.lb3 =Label(self.fr2, text='接收:0    发送:0',bg="yellow",anchor='w') #字节统计
        self.lb3.place(relheight=0.05,relwidth=0.3,relx=0.045,rely=0.945)

        self.lb4 = Label(self.fr2, text=' ', anchor='w',relief=GROOVE)  #时钟
        self.lb4.place(relheight=0.05, relwidth=0.11, relx=0.84, rely=0.945)
#------------------------------------------方法-----------------------------------------------
    def check_scrollbar(self,*args):
        if self.txt_rx.yview() == (0.0, 1.0):
            self.scrollbar.pack_forget()
        else:
            self.scrollbar.place(RIGHT, fill=Y)
            
    def on_right_click(self,w):
        idx = self.subitem.curselection()
        print("Right-Clicked idx:", idx)
        if idx == ():
            return
        selected_item = self.subitem.get(idx)
        print("Right-Clicked on:", selected_item,idx)
        ret = messagebox.askyesno('取消订阅', "取消订阅:\n"+selected_item)
        if ret:
            self.subitem.delete(idx)
            self.client.unsubscribe(selected_item)
            self.appendTxt("取消订阅:"+selected_item)
        
    def gettim(self):#获取时间 未用
            timestr = time.strftime("%H:%M:%S")  # 获取当前的时间并转化为字符串
            self.lb4.configure(text=timestr)  # 重新设置标签文本
            # tim_str = str(datetime.datetime.now()) + '\n'
            # self.lb4['text'] = tim_str
            #self.lb3['text'] = '接收:'+str(rx_cnt),'发送:'+str(tx_cnt)
            self.txt_rx.after(1000, self.gettim)     # 每隔1s调用函数 gettime 自身获取时间 GUI自带的定时函数

    def txt_clr(self):#清空显示
        self.txt_rx.delete(0.0, 'end')  # 清空文本框
        self.txt_tx.delete(0.0, 'end')  # 清空文本框

    def ascii_hex_get(self):#获取单选框状态
        if(self.var_cs.get()):
            return False
        else:
            return True

    def tx_rx_cnt(self,rx=0,tx=0):  #发送接收统计
        global tx_cnt
        global rx_cnt

        rx_cnt += rx
        tx_cnt += tx
        self.lb3['text'] = '接收:'+str(rx_cnt),'发送:'+str(tx_cnt)

    def savefiles(self):   #保存日志TXT文本
        try:
            with open('log.txt','a') as file:       #a方式打开 文本追加模式
                file.write(self.txt_rx.get(0.0,'end'))
                messagebox.showinfo('提示', '保存成功')
        except:
            messagebox.showinfo('错误', '保存日志文件失败!')
    
    def appendTxt(self,msg):
        current_t = datetime.now()
        current_ = current_t.strftime("%Y-%m-%d %H:%M:%S ")
        self.txt_rx.insert(END,current_)
        self.txt_rx.insert(END,msg)
        self.txt_rx.insert(END,"\n")  
        #滚动到末尾
        self.txt_rx.see(END)
        self.txt_rx.update_idletasks()      
                
    def connect(self,addr,port,alive=60):
        self.client.connect(addr, port,alive)
        self.client.loop_start()

    def disconnect(self):
        self.client.loop_stop()
        self.client.disconnect()
        print("disconnect!")

    def on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker ok!\n")
            self.appendTxt("Connected to MQTT Broker ok!\n")
            self.var_bt1.set("断开")
            self.isConnect = True
        else:
            print("Failed to connect, return code %d\n", rc)
            self.appendTxt(f"Failed to connect, return code: {rc}\n")
            self.isConnect = False

    def on_message(self, client, userdata, msg):
        self.tx_rx_cnt(1,0)
        print("Received message: " + msg.payload.decode())
        self.appendTxt(f"Received message:\n[topic]:{msg.topic}\n{msg.payload.decode()}\n")

    def subscribe(self, topic):
        #item = Entry(self.subitem).get()
        if topic in self.subitem.get(0, END):
            print("item already exists.")
        else:
            self.appendTxt(f"[订阅topic]:{topic}\n")
            self.client.subscribe(topic)
            self.subitem.insert(END, topic)

    def publish(self, topic, message):
        self.client.publish(topic, message)
        self.appendTxt(f"[发布topic]:{topic}\n{message}\n")
        
    def btn_connect(self):#连接
        global isConnect
        if self.var_bt1.get() == '连接':
            server = self.txt_server.get("1.0",END).strip()
            port = self.txt_port.get("1.0",END).strip()
            alive = self.txt_heart.get("1.0",END).strip()
            user = self.txt_name.get("1.0",END).strip()
            psd = self.txt_pwd.get("1.0",END).strip()
            #用户名密码设置
            if len(user) !=0 :
                self.client.username_pw_set(user, psd)
                
            print("btn connect click: "+server+","+port)
            self.appendTxt(f"连接 {server},port:{port}\n")
            self.connect(server,int(port),int(alive))
        else:
            self.disconnect()
            self.var_bt1.set("连接")
            self.isConnect = False
            self.appendTxt(f"断开连接!\n")
        
    def btn_sub(self):#订阅
        if self.isConnect:
            sub = self.txt_sub.get("1.0",END).strip()
            print("btn sub click,topic: "+sub)
            self.subscribe(sub)
        else:
            messagebox.showinfo('提示', '服务器未连接!')
        
    def btn_send(self):#发布
        if self.isConnect:
            pub_topic = self.txt_topic.get("1.0",END).strip()
            payload = self.txt_tx.get("1.0",END).strip()
            print("btn pub click,topic: "+pub_topic)
            self.publish(pub_topic,payload)
            self.tx_rx_cnt(0,1)
        else:
            messagebox.showinfo('提示', '请连接服务器!')

if __name__ == '__main__':
    print('Start...')
    gui = GUI()
    gui.gettim()  #开启时钟
    gui.root.mainloop()
    gui.disconnect()
    print('End...')

最后,再附带个python实现的串口调试助手源码:

import time

from tkinter.ttk import *
from tkinter import *
import datetime
import serial  # 导入模块
import serial.tools.list_ports
import threading
from tkinter import messagebox
from ttkbootstrap import Style



global UART          #全局型式保存串口句柄
global RX_THREAD     #全局型式保存串口接收函数
global gui           #全局型式保存GUI句柄

tx_cnt=0 #发送字符数统计
rx_cnt=0 #接收字符数统计


def ISHEX(data):        #判断输入字符串是否为十六进制
    if len(data)%2:
        return False
    for item in data:
        if item not in '0123456789ABCDEFabcdef': #循环判断数字和字符
            return False
    return True


def uart_open_close(fun,com,bund):  #串口打开关闭控制
    global UART
    global RX_THREAD

    if fun==1:#打开串口
        try:
           UART = serial.Serial(com, bund, timeout=0.2)  # 提取串口号和波特率并打开串口
           if UART.isOpen(): #判断是否打开成功
               lock = threading.Lock()
               RX_THREAD = UART_RX_TREAD('URX1',lock)  #开启数据接收进程
               RX_THREAD.setDaemon(True)               #开启守护进程 主进程结束后接收进程也关闭 会报警告 不知道咋回事
               RX_THREAD.start()
               RX_THREAD.resume()
               return True
        except:
            return False
        return False
    else:                   #关闭串口
        print("关闭串口")
        RX_THREAD.pause()
        UART.close()

def uart_tx(data,isHex=False):          #串口发送数据
    global UART

    try:
        if  UART.isOpen():  #发送前判断串口状态 避免错误
            print("uart_send=" + data)
            gui.tx_rx_cnt(tx=len(data)) #发送计数
            if isHex:   #十六进制发送
                data_bytes = bytes.fromhex(data)
                return UART.write(bytes(data_bytes))
            else:      #字符发送
                return UART.write(data.encode('gb2312'))
    except:#错误返回
        messagebox.showinfo('错误', '发送失败')


class UART_RX_TREAD(threading.Thread):          #数据接收进程 部分重构
    global gui

    def __init__(self, name, lock):
        threading.Thread.__init__(self)
        self.mName = name
        self.mLock = lock
        self.mEvent = threading.Event()

    def run(self): #主函数
        print('开启数据接收\r')
        while True:
            self.mEvent.wait()
            self.mLock.acquire()
            if UART.isOpen():
                rx_buf =  UART.read()
                if len(rx_buf) >0:
                    rx_buf += UART.readall()  #有延迟但不易出错
                    gui.tx_rx_cnt(rx=len(rx_buf))
                    if gui.ascii_hex_get() == False:
                        print('收到hex数据', rx_buf.hex().upper())
                        gui.txt_rx.insert(END,  rx_buf.hex().upper())
                    else:
                        str_data = str(rx_buf, encoding='gb2312')
                        print("串口收到消息:", len(rx_buf), str_data)
                        gui.txt_rx.insert(END,str_data)
                        # self.txt_rx.insert(END,str_data)
            self.mLock.release()
           #time.sleep(3)
    def pause(self): #暂停
        self.mEvent.clear()

    def resume(self):#恢复
        self.mEvent.set()


'''GUI'''''''''''''''''''''''''''''''''''''''''''''''''''''''''
class GUI:
    def __init__(self):
        self.root = Tk()
        self.root.title('梵德觅串口调试助手')             #窗口名称
        self.root.geometry("800x360+500+150")         #尺寸位置
        self.interface()
        Style(theme='pulse') #主题修改 可选['cyborg', 'journal', 'darkly', 'flatly' 'solar', 'minty', 'litera', 'united', 'pulse', 'cosmo', 'lumen', 'yeti', 'superhero','sandstone']


    def interface(self):
        """"界面编写位置"""
        #--------------------------------操作区域-----------------------------#
        self.fr1=Frame(self.root)
        self.fr1.place(x=0,y=0,width=180,height=360)     #区域1位置尺寸

        self.lb1 =Label(self.fr1, text='端口号:',font="微软雅黑",fg='red')  #点击可刷新
        self.lb1.place(x=0,y=5,width=100,height=35)

        self.var_cb1 = StringVar()
        self.comb1 = Combobox(self.fr1,textvariable=self.var_cb1)
        self.comb1['values'] = list(serial.tools.list_ports.comports()) #列出可用串口
        self.comb1.current(0)  # 设置默认选项 0开始
        self.comb1.place(x=10,y=40,width=150,height=30)
        com=list(serial.tools.list_ports.comports())

        print('**********可用串口***********')
        for i in range(0, len(com)):
            print(com[i])
        print('***************************')

        self.lb2 = Label(self.fr1, text='波特率:')
        self.comb2 = Combobox(self.fr1,values=['2400','9600','57600','115200'])
        self.comb2.current(3)                               #设置默认选项 115200
        self.lb2.place(x=5,y=75,width=60,height=20)
        self.comb2.place(x=10,y=100,width=100,height=25)

        self.var_bt1 = StringVar()
        self.var_bt1.set("打开串口")
        self.btn1 = Button(self.fr1,textvariable=self.var_bt1,command=self.uart_opn_close) #绑定 uart_opn_close 方法
        self.btn1.place(x=10,y=140,width=60,height=30)



        self.var_cs = IntVar()  #定义返回类型
        self.rd1 = Radiobutton(self.fr1,text="Ascii",variable=self.var_cs,value=0,command = self.txt_clr) #选择后清除显示内容
        self.rd2 = Radiobutton(self.fr1,text="Hex",variable=self.var_cs,value=1,command = self.txt_clr)
        self.rd1.place(x=5,y=180,width=60,height=30)
        self.rd2.place(x=5,y=210,width=60,height=30)


        self.btn3 = Button(self.fr1, text='清空',command = self.txt_clr) #绑定清空方法
        self.btn4 = Button(self.fr1, text='保存',command=self.savefiles) #绑定保存方法
        self.btn3.place(x=10,y=260,width=60,height=30)
        self.btn4.place(x=100,y=260,width=60,height=30)

        self.btn5 = Button(self.fr1, text='功能',command=self.ascii_hex_get) #测试用
        self.btn6 = Button(self.fr1, text='发送',command=self.uart_send)  #绑定发送方法
        self.btn5.place(x=10,y=315,width=60,height=30)
        self.btn6.place(x=100,y=315,width=60,height=30)

        #-------------------------------文本区域-----------------------------#
        self.fr2=Frame(self.root)          #区域1 容器  relief   groove=凹  ridge=凸
        self.fr2.place(x=180,y=0,width=620,height=360)     #区域1位置尺寸

        self.txt_rx = Text(self.fr2)
        self.txt_rx.place(relheight=0.6,relwidth=0.9,relx=0.05,rely=0.01) #比例计算控件尺寸和位置

        self.txt_tx = Text(self.fr2)
        self.txt_tx.place(relheight=0.25,relwidth=0.9,relx=0.05,rely=0.66) #比例计算控件尺寸位置

        self.lb3 =Label(self.fr2, text='接收:0    发送:0',bg="yellow",anchor='w') #字节统计
        self.lb3.place(relheight=0.05,relwidth=0.3,relx=0.045,rely=0.925)

        self.lb4 = Label(self.fr2, text=' ', anchor='w',relief=GROOVE)  #时钟
        self.lb4.place(relheight=0.05, relwidth=0.1, relx=0.85, rely=0.935)
#------------------------------------------方法-----------------------------------------------
    def gettim(self):#获取时间 未用
            timestr = time.strftime("%H:%M:%S")  # 获取当前的时间并转化为字符串
            self.lb4.configure(text=timestr)  # 重新设置标签文本
            # tim_str = str(datetime.datetime.now()) + '\n'
            # self.lb4['text'] = tim_str
            self.txt_rx.after(1000, self.gettim)     # 每隔1s调用函数 gettime 自身获取时间 GUI自带的定时函数

    def txt_clr(self):#清空显示
        self.txt_rx.delete(0.0, 'end')  # 清空文本框
        self.txt_tx.delete(0.0, 'end')  # 清空文本框

    def ascii_hex_get(self):#获取单选框状态
        if(self.var_cs.get()):
            return False
        else:
            return True

    def uart_opn_close(self):#打开关闭串口
        if(self.var_bt1.get() == '打开串口'):
          if(uart_open_close(1,str(self.comb1.get())[0:5],self.comb2.get())==True): #传递下拉框选择的参数 COM号+波特率  【0:5】表示只提取COM号字符
             self.var_bt1.set('关闭串口')                             #改变按键内容
             self.txt_rx.insert(0.0, self.comb1.get() + ' 打开成功\r\n')  # 开头插入
          else:
             print("串口打开失败")
             messagebox.showinfo('错误','串口打开失败')
        else:
            uart_open_close(0, 'COM1', 115200) #关闭时参数无效
            self.var_bt1.set('打开串口')

    def uart_send(self): #发送数据
        send_data = self.txt_tx.get(0.0, 'end').strip()
        if self.ascii_hex_get():    #字符发送
            uart_tx(send_data)
        else:
            send_data = send_data.replace(" ", "").replace("\n", "0A").replace("\r", "0D") #替换空格和回车换行
            if(ISHEX(send_data)==False):
                messagebox.showinfo('错误', '请输入十六进制数')
                return
            uart_tx(send_data,True)

    def tx_rx_cnt(self,rx=0,tx=0):  #发送接收统计
        global tx_cnt
        global rx_cnt

        rx_cnt += rx
        tx_cnt += tx
        self.lb3['text'] = '接收:'+str(rx_cnt),'发送:'+str(tx_cnt)

    def savefiles(self):   #保存日志TXT文本
        try:
            with open('log.txt','a') as file:       #a方式打开 文本追加模式
                file.write(self.txt_rx.get(0.0,'end'))
                messagebox.showinfo('提示', '保存成功')
        except:
            messagebox.showinfo('错误', '保存日志文件失败!')


if __name__ == '__main__':
    print('Star...')
    gui = GUI()
    gui.gettim()  #开启时钟
    gui.root.mainloop()
    UART.close()   #结束关闭 避免下次打开错误
    print('End...')

其他资源

[Python]tkinter的美颜-ttkbootstrap:让tkinter更加美观 - 知乎

在 Python 中使用 MQTT的方法_python mqtt_liming89的博客-CSDN博客

Python GUI之tkinter的皮肤(ttkbootstrap)打造出你的窗口之美_tkinter漂亮gui界面模板_清&轻的博客-CSDN博客

使用python编写mqtt客户端向EMQX服务器发送数据_python mqtt客户端_TMS320VC5257H的博客-CSDN博客

MQTT在Python中的使用mqtt-paho(简单实例, 回调函数,回调参数,qos安全等级)详解及回调函数的正确用法_mqtt python-CSDN博客

MQTT在Python中的使用mqtt-paho(简单实例, 回调函数,回调参数,qos安全等级)详解及回调函数的正确用法_mqtt python-CSDN博客

Nuitka打包教程_苍穹之跃的博客-CSDN博客

使用nuitka打包python代码为exe可执行程序_ha_lee的博客-CSDN博客

Python 打包工具 Nuitka 入门指南_半勺蜂蜜~的博客-CSDN博客

Nuitka常见问题解决集锦-独孤九剑之破Bug式 - 知乎

Python打包exe的王炸-Nuitka - 知乎

WinLibs - GCC+MinGW-w64 compiler for Windows

Nuitka打包一、安装依赖-阿里云开发者社区

利用Nuitka打包py文件_https://github.com/ccache/ccache/releases/download-CSDN博客 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1017030.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

论文写作指导手册

TIPS&#xff1a;最近我们上线了“AI写作大师”&#xff0c;后续会继续围绕论文、文章、文案写相关的文章&#xff0c;来验证写作大师能力如何&#xff0c;敬请持续关注&#xff08; openrabbit.net&#xff09;&#xff5e; 一、论文选题 选题是论文研究的第一步&a…

Prometheus+Grafana可视化监控【Nginx状态】

文章目录 一、安装Docker二、安装Nginx(Docker容器方式)三、安装Prometheus四、安装Grafana五、Pronetheus和Grafana相关联六、安装nginx_exporter七、Grafana添加Nginx监控模板 一、安装Docker 注意&#xff1a;我这里使用之前写好脚本进行安装Docker&#xff0c;如果已经有D…

软件测试中常见的难题

1、需求定义&#xff1a; 需求可能不完整或者不准确&#xff0c;这会导致测试人员无法测试应用程序的所有功能。 例如&#xff1a;在一个电子商务网站上&#xff0c;可能需要测试的某些操作并未在需求中列出&#xff0c;导致测试人员无法测试到这些操作。 对策&#xff1a;测…

MySQL实现单个字段根据特定字符拆分

1.字段内容 2.想得到的效果 步骤1中&#xff0c;每一条记录的FJ字段&#xff0c;根据分号&#xff0c;拆分成多条&#xff0c;如下图所示&#xff1a; 3.具体实现 说明&#xff1a; SELECT DISTINCTsubstring_index(substring_index(a.要拆分的字段, 分隔字符, b.help_top…

自动化项目实战:用requests库自动保存王者荣耀英雄皮肤到本地,文末附源码下载!

前言 王者荣耀是一款备受欢迎的手机游戏&#xff0c;拥有众多精美的英雄皮肤。如果你想获取这些皮肤的图片或者其他相关信息&#xff0c;可以利用Python编写一个简单的爬虫来实现。 安装第三方库 首先&#xff0c;我们需要安装Python的requests和BeautifulSoup库。可以使用…

系列六、Nginx配置实例之反向代理2

一、目标 浏览器网页中访问http://${Linux服务器的IP}:9001/basketball/index.html&#xff0c;浏览器中打印"篮球8080!!!"&#xff1b; 浏览器网页中访问http://${Linux服务器的IP}:9001/football/index.html&#xff0c;浏览器中打印"足球8081!!!"&#…

[NLP] LLM---<训练中文LLama2(五)>对SFT后的LLama2进行DPO训练

当前关于LLM的共识 大型语言模型&#xff08;LLM&#xff09;使 NLP 中微调模型的过程变得更加复杂。最初&#xff0c;当 ChatGPT 等模型首次出现时&#xff0c;最主要的方法是先训练奖励模型&#xff0c;然后优化 LLM 策略。从人类反馈中强化学习&#xff08;RLHF&#xff09…

[字符串和内存函数]错误信息报告函数strerror详解

strerror介绍 strerror是一个C库函数&#xff0c;用于将错误代码转换为对的错误信息字符串。它接受一个整数参数errno&#xff0c;返回一个指向错误信息字符串的指针。 errno是一个全局变量&#xff0c;可以直接使用。它在C语言中用于表示发生错误时的错误码。它是一个整数&…

笔记1.4 计算机网络性能

1. 速率 速率即数据率&#xff08;data rate&#xff09;或称数据传输速率或比特率 单位时间&#xff08;秒&#xff09;传输信息&#xff08;比特&#xff09;量 计算机网络中最重要的一个性能指标 单位&#xff1a;bps、kbps、Mbps k 10^3、M 10^6、G 10^9 速率往往…

autosar 诊断入门

AUTOSAR (汽车开放系统架构) 是一个国际汽车行业的开放和标准化的软件架构。它的主要目标是为了创建一种独立于硬件的软件架构&#xff0c;以提高汽车电子系统的模块化和可重用性。 AUTOSAR架构主要分为两个部分&#xff1a;AUTOSAR Runtime Environment (RTE) 和 AUTOSAR Soft…

HTTP各版本差异

HTTP1.0 无法复用连接 HTTP1.0为每个请求单独新开一个TCP连接 #mermaid-svg-9N3exXRS4VvT4bWF {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-9N3exXRS4VvT4bWF .error-icon{fill:#552222;}#mermaid-svg-9N3exXRS…

集成电路运算放大器[23-9-16]

目录 1、结构组成&#xff1a;差分放大电路、电压放大电路、功率放大电路。 2、同相放大器&#xff1a; 3、反相放大器&#xff1a;一个正电压放大并变为负电压。 4、差分放大电路&#xff1a;输入两个不同的电压&#xff0c;两者的差值乘以放大系数得到输 出电压。 1、结构组…

2.策略模式

UML图 代码 main.cpp #include "Strategy.h" #include "Context.h"void test() {Context* pContext nullptr;/* StrategyA */pContext new Context(new StrategyA());pContext->contextInterface();/* StrategyB */pContext new Context(new Strat…

MybatisPlus(5)

前言&#x1f36d; ❤️❤️❤️SSM专栏更新中&#xff0c;各位大佬觉得写得不错&#xff0c;支持一下&#xff0c;感谢了&#xff01;❤️❤️❤️ Spring Spring MVC MyBatis_冷兮雪的博客-CSDN博客 上篇讲了增删的操作&#xff0c;这篇讲修改操作中的一个问题以及它对应的…

002-第一代硬件系统架构确立及产品选型

第一代硬件系统架构确立及产品选型 文章目录 第一代硬件系统架构确立及产品选型项目介绍摘要硬件架构硬件结构选型及设计单片机选型上位机选型扯点别的 关键字&#xff1a; Qt、 Qml、 信号采集机、 数据处理、 上位机 项目介绍 欢迎来到我们的 QML & C 项目&#xff…

Centos8下载安装JDK8

安装JDK8 一、下载 官网&#xff1a;https://www.oracle.com/java/technologies/downloads/#java8-linux 二、存放到opt目录下 三、解压 tar -zxvf jdk-8u361-linux-x64.tar.gz -C /usr/local如果下载的是tar格式&#xff0c;则安装如下命令解压 tar -zvf jdk-8u361-linu…

Flash的学习

Flash的学习 1 概述 2 特性 STM32 的内部FLASH 包含主存储器、系统存储器以及选项字节区域。 2.1 主存储器 主存储器分为256 页&#xff0c;每页大小为2KB&#xff0c;共512KB。这个分页的概念&#xff0c;实质就是FLASH 存储器 的扇区&#xff0c;与其它FLASH 一样&…

【C++】动态内存管理(79分钟写的文章哪里看不懂了,快来学)

动态内存管理目录&#xff1a; 一、C/C内存分布 在学习了C/C内存区域的划分后&#xff0c;我们来做几道题巩固一下&#xff1a; 1. 选择题&#xff1a;选项 : A.栈 B.堆 C.数据段(静态区) D.代码段(常量区)globalVar在哪里&#xff1f;____ staticGlobalVar在哪里&#x…

【2023知乎评论爬虫】我用Python爬虫爬了2386条知乎评论!

文章目录 一、爬取目标二、展示爬取结果三、爬虫代码讲解3.1 分析知乎页面3.2 爬虫代码 四、同步视频五、完整源码 您好&#xff0c;我是 马哥python说&#xff0c;一枚10年程序猿。 一、爬取目标 前些天我分享过一篇微博的爬虫&#xff1a;https://blog.csdn.net/solo_msk/a…

Spring 的注入

目录 一、注入&#xff08;Injection&#xff09; 1、什么是注入 &#xff08;1&#xff09;为什么需要注入 &#xff08;2&#xff09;如何进行注入 2、Spring 注入原理分析&#xff08;简易版&#xff09; 二、Set 注入详解 1、JDK 内置类型 &#xff08;1&#xff09…