PyQt5 基于paho-mqtt库 实现MQTT通信

news2025/1/22 17:59:29

PyQt5 基于paho-mqtt库 实现MQTT通信

  • paho-mqtt
  • 安装paho-mqtt库
  • 综合示例
  • 错误处理

paho-mqtt

paho-mqtt官网文档

安装paho-mqtt库

pip install paho-mqtt

综合示例

  • 封装MQTT类
  • 订阅消息
  • 发布消息
  • 信号方式接收处理MQTT消息
import paho.mqtt.client as mqtt
import sys
import json

from PyQt5.QtGui import *
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *

# 填写实际的MQTT相关参数
mqtt_host = "192.168.0.11"
mqtt_port = 1883
mqtt_client_id="sn_864423065869616"
mqtt_username = "xxxx"
mqtt_password = "xxxx"
mqtt_sub_topic = "topic/sub"
mqtt_pub_topic = "topic/pub"


# 封装一个MQTT客户端
class MqttClient(QObject):

    # 创建信号用于UI更新数据
    message_signal = pyqtSignal(str, str)

    def __init__(self, broker, port, client_id, protocol=mqtt.MQTTv311):
        super(MqttClient, self).__init__()

        self.broker = broker
        self.port = port
        self.client_id = client_id
        self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id)
    
    def connect(self, username=None, password=None, keepalive=60):
        self.client.username_pw_set(username, password)
        self.client.connect(self.broker, self.port, keepalive)

    def subscribe(self, topic, qos=0):
        self.client.subscribe(topic, qos)

    def publish(self, topic, paylaod, qos=0, retain=False):
        self.client.publish(topic, paylaod, qos, retain)

    def on_connect(self, client, userdata, flags, reason_code, properties):
        if reason_code == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %\n", reason_code)
        
    def on_disconnect(self, client, userdata, flags, reason_code, properties):
        if reason_code == 0:
            # success disconnect
            print("Disconnect to MQTT Broker!")
        if reason_code > 0:
            # error processing
            print("Failed to disconnect, return code %\n", reason_code)

    def on_message(self, client, userdata, message):
        print("Received message: ", str(message.payload.decode("utf-8")))
        self.message_signal.emit(message.topic, message.payload.decode())   # 发射信号UI线程里处理更新数据

    def on_subscribe(self, client, userdata, mid, reason_codes, properties):
        for sub_result in reason_codes:
            if sub_result == 1:
                # process QoS == 1
                print("on_subscribe process QoS == 1")
            # Any reason code >= 128 is a failure.
            if sub_result >= 128:
                # error processing
                print("on_subscribe error processing。")

    def on_unsubscribe(client, userdata, mid, reason_codes, properties):
        # In NEW version, reason_codes is always a list. Empty for MQTTv3
        for unsub_result in reason_codes:
            # Any reason code >= 128 is a failure.
            if reason_codes[0] >= 128:
                # error processing
                print("on_unsubscribe error processing.")

    def on_publish(self, client, userdata, mid, reason_codes, properties):
        print('Public reason_codes %\n', reason_codes)

    def on_log(self, client, userdata, level, buf):
        print(buf)
    
    def start(self):
        self.client.on_connect = self.on_connect
        self.client.on_disconnect = self.on_disconnect
        self.client.on_subscribe = self.on_subscribe
        self.client.on_unsubscribe = self.on_unsubscribe
        self.client.on_publish = self.on_publish
        self.client.on_message = self.on_message
        self.client.on_log = self.on_log
        self.client.loop_start()
        
    def stop(self):
        self.client.loop_stop()
        self.client.disconnect()


class MainWindow(QMainWindow):
    def __init__(self, parent=None):
        super(MainWindow, self).__init__(parent)
        self.initUI()

        self.client = MqttClient(broker=mqtt_host,  port=mqtt_port, client_id=mqtt_client_id)
        self.client.connect(username=mqtt_username, password=mqtt_password, keepalive=60)
        self.client.subscribe(topic=mqtt_sub_topic, qos=0)
        self.client.message_signal.connect(self.update_ui)
        self.client.start()

    def initUI(self):
        self.setWindowTitle("MQTT测试工具")
        self.resize(800, 480)
        self.center()   # 窗口居中显示

        self.label_show = QLabel(self)
        self.label_show.setText("...")
        self.label_show.setStyleSheet("color:blue; font-size:20px;")

        self.btn_mqttpub = QPushButton(self)
        self.btn_mqttpub.setText("发布消息")
        self.btn_mqttpub.clicked.connect(lambda: self.publish_message())

        root = QVBoxLayout()        
        root.addWidget(self.label_show)
        root.addWidget(self.btn_mqttpub)
        
        mwidget = QWidget()
        mwidget.setLayout(root)
        self.setCentralWidget(mwidget)

    def center(self):
        screen = QDesktopWidget().screenGeometry()
        size = self.geometry()
        self.move((int)((screen.width()-size.width())/2), (int)((screen.height()-size.height())/2))

    def update_ui(self, topic, message):
        print('接收到的消息更新到UI显示')
        print(topic)
        print(message)
        self.label_show.setText(topic +" "+ message)
    
    def publish_message(self):
        print("发布消息")
        self.client.publish(topic=mqtt_pub_topic, paylaod="hello world", qos=0, retain=False)

        
    def closeEvent(self, event):
        # 重写closeEvent方法
        print('窗口关闭前执行的操作')
        self.client.stop() # 停止MQTT

        # 调用基类的closeEvent方法来执行关闭事件
        super().closeEvent(event)

if __name__ == "__main__":
    app = QApplication(sys.argv)    
    win = MainWindow()
    win.show()
    sys.exit(app.exec_())

在这里插入图片描述

错误处理

  • 报错信息:

Unsupported callback API version: version 2.0 added a callback_api_version, see docs/migrations.rst for details

不支持的回调 API 版本:2.0 版本添加了一个callback_api_version,详情请参阅docs/migrations.rst
参考官方文档

  • 原因:回调参数不一致,2.0 版本更改了传递给用户回调的参数。回调的版本1已弃用,但在版本 2.x 中仍受支持。

  • 解决方法:

    • 方法1:
      使用旧版本的回调。需告诉 paho-mqtt 您选择此版本即可,修改如下代码:
      from paho.mqtt import client as mqtt
      # OLD code
      client = mqtt.Client(client_id)
      # NEW code
      client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, client_id)
      
      但是这种方法每次运行的时候,会出现以下警告:
      DeprecationWarning: Callback API version 1 is deprecated, update to latest version
    • 方法2:需要修改两处
      1)在创建client对象时,新增参数:mqtt_client.CallbackAPIVersion.VERSION2
      2)在on_connect()函数中,新增参数:properties
      from paho.mqtt import client as mqtt
      # OLD code
      client = mqtt.Client(client_id)
      # NEW code
      client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id)
      
      def on_connect(client, userdata, flags, rc, properties):
              if rc == 0:
                  print("Connected to MQTT Broker!")
              else:
                  print("Failed to connect,return code {}".format(rc))
      client.on_connect = on_connect
      
    • 方法3:降低paho-mqtt版本号到1.x版本

      $ pip install paho-mqtt==1.6.1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2203014.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在线绘图工具drawio,visio的平替

Draw.io:灵活高效的在线绘图工具推荐 在工作和项目管理中,流程图、架构图和思维导图等可视化图表是非常重要的沟通工具。Draw.io(现更名为diagrams.net)是一个强大且免费的在线绘图工具,适用于创建各种类型的图表。它功…

YOLOv11训练自己的数据集,YOLOv11网络解析

1 训练自己的数据集 在github搜索ultralytics并下载。 GitHub - ultralytics/ultralytics: Ultralytics YOLO11 🚀 环境配置不再赘述,本地配置自行搜索教程,若使用云服务器配置更为简单。 数据标注 pip install labelimg启动标注工具 la…

【PGCCC】在 Postgres 上构建图像搜索引擎

我最近看到的最有趣的电子商务功能之一是能够搜索与我手机上的图片相似的产品。例如,我可以拍一双鞋或其他产品的照片,然后搜索产品目录以查找类似商品。使用这样的功能可以是一个相当简单的项目,只要有合适的工具。如果我们可以将问题定义为…

Pycharm设置相同变量的背景颜色

在 PyCharm 中设置鼠标点击某个变量时,代码其他地方相同变量的背景颜色,可以通过调整颜色方案中的相关设置来实现。以下是详细步骤: 打开 PyCharm,选择 File -> Settings(在 Windows/Linux 上)或 PyChar…

基于 CSS Grid 的简易拖拉拽 Vue3 组件,从代码到NPM发布(1)- 拖拉拽交互

基于特定的应用场景,需要在页面中以网格的方式,实现目标组件在网格中可以进行拖拉拽、修改大小等交互。本章开始分享如何一步步从代码设计,最后到如何在 NPM 上发布。 请大家动动小手,给我一个免费的 Star 吧~ 大家如果发现了 Bug…

全网最详细k8s搭建部署

目录 Kubernetes的功能: Kubernetes的特点: 1. 安装要求 2. 部署内容 1、系统环境准备 2、所有禁用swap和本地解析 3、仓库配置,所有安装docker 4、所有节点设定docker的资源管理模式为systemd 5、所有阶段复制harbor仓库中的证书并…

一款电子产品图册转换器

​随着科技的不断发展,电子产品已经成为我们生活中不可或缺的一部分。无论是手机、平板电脑还是智能家居,它们都离不开电子图册的支撑。一款优秀的电子产品图册转换器,可以帮助我们轻松实现电子图册的转换,为我们的生活和工作带来…

AlphaFold加冕诺奖,DeepMind CEO获奖感言:最优秀的科学家与AI配合,将完成令人难以置信的工作

继「AI 教父」摘冠 2024 年诺贝尔物理学奖后,AI 再下一城,获得了今年的诺贝尔化学奖。 北京时间 10 月 9 日,瑞典皇家科学院宣布了 2024 年诺贝尔化学奖的归属,一半授予 David Baker, 以表彰其在计算蛋白设计方面的贡…

python实现音频文件mp3/m4a转.wav + windows安装ffmpeg

近期在尝试使用大模型进行音频降噪、人声分离。抱脸上的模型几乎统一输入需求都是.wav,直接贴代码吧 ps:使用这段代码需要提前安装好ffmpeg(安装教程请往下拉) from pydub import AudioSegment# 加载.m4a文件 audio AudioSegment.from_file(r"你文件的路径&…

Pycharm使用CV2

1、windows下已经安装好python3以及opencv2 2、安装并打开pycharm 环境中装好的包就能显示出来了,就可以去调用cv2的接口了

TCP/IP相关

1、关于三次握手、四次挥手和TCP的11种状态: 记住这张图就行了: 2、关于慢启动、拥塞避免、超时重传、快速重传、快速恢复 记住这张图就行了: 一些名词解释: MSS:Maximum Segment Size,最大报文长度 RT…

力扣 1206. 设计跳表

Problem: 1206. 设计跳表 👩‍🏫 参考题解 class Skiplist {// 定义跳表的最大层数int level 10;// 定义跳表节点类class Node {int val; // 节点值Node[] ne new Node[level]; // 节点的下一跳节点数组,支持多级索引// 构造函数&#xf…

深度学习之卷积CONV2D

文章目录 1.学习目的2.填充与步幅2.1填充2.2 步幅 3.总结 1.学习目的 卷积听起来简单,事实上不简单,需要多加练习 2.填充与步幅 在前面的例子 图6.2.1中,输入的高度和宽度都为3,卷积核的高度和宽度都为2,生成的输出…

用Python将HTML转换为Excel文件

在数据处理和分析的过程中,经常需要从网页上抓取信息,并将其转换为更易于操作的格式。HTML表格作为一种常见的数据展示方式,在线报告、统计资料等场景中广泛存在,但其结构化程度较低,不利于进一步的数据清洗和分析。将…

微信小程序-APP-软件开发

微信小程序开发,作为当下移动互联网领域的一股强劲势力,正以其便捷性、轻量化及高用户粘性的特点,深刻改变着我们的生活与工作方式。它不仅为企业和个人开发者提供了一个全新的服务入口,更极大地拓宽了商业应用的边界。 在微信小…

2015年国赛高教杯数学建模D题众筹筑屋规划方案设计解题全过程文档及程序

2015年国赛高教杯数学建模 D题 众筹筑屋规划方案设计 众筹筑屋是互联网时代一种新型的房地产形式。现有占地面积为102077.6平方米的众筹筑屋项目(详情见附件1)。项目推出后,有上万户购房者登记参筹。项目规定参筹者每户只能认购一套住房。  …

游戏录屏必备!五款超实用软件让你轻松记录精彩游戏瞬间

在游戏的世界里,每一个精彩的操作、每一场激烈的对战都值得被记录下来。无论是想要分享给朋友,还是留作自己的游戏回忆,一款优秀的游戏录屏软件都是必不可少的。下面就为大家介绍五款备受好评的游戏录屏软件,让你轻松成为游戏录屏…

【读书笔记·VLSI电路设计方法解密】问题9:什么是SOC发展趋势的推动力

推动SoC趋势的主要力量之一是成本。将更多功能集成到单一芯片中可以减少系统中的芯片数量,从而缩小封装和电路板的成本。这可能会降低整个系统的成本,使产品更具竞争力。在当今的消费电子市场和其他市场中,较低的价格总能带来获得市场份额的优…

激光避障的运行算法!

一、激光传感器的工作原理 激光避障技术利用激光束的直线传播和反射特性,通过发送激光束并接收反射回来的信号,来检测和计算周围障碍物的距离和位置。激光传感器能够生成高精度的距离数据和三维环境信息,为机器人或无人机提供详细的障碍物分…

留学期间如何提高职业竞争力?

留学期间是提高职业竞争力的关键时期,以下是一些具体的建议,帮助留学生在留学期间增强自身的职业竞争力: 一、深化专业知识与技能 1. 专注于课程学习:努力学习专业课程,掌握扎实的专业知识,这是提高职业竞…