PyQt5 基于paho-mqtt库 实现MQTT通信
- paho-mqtt
- 安装paho-mqtt库
- 综合示例
- 错误处理
paho-mqtt
paho-mqtt官网文档
安装paho-mqtt库
pip install paho-mqtt
综合示例
- 封装MQTT类
- 订阅消息
- 发布消息
- 信号方式接收处理MQTT消息
import paho.mqtt.client as mqtt
import sys
import json
from PyQt5.QtGui import *
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *
# 填写实际的MQTT相关参数
mqtt_host = "192.168.0.11"
mqtt_port = 1883
mqtt_client_id="sn_864423065869616"
mqtt_username = "xxxx"
mqtt_password = "xxxx"
mqtt_sub_topic = "topic/sub"
mqtt_pub_topic = "topic/pub"
# 封装一个MQTT客户端
class MqttClient(QObject):
# 创建信号用于UI更新数据
message_signal = pyqtSignal(str, str)
def __init__(self, broker, port, client_id, protocol=mqtt.MQTTv311):
super(MqttClient, self).__init__()
self.broker = broker
self.port = port
self.client_id = client_id
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id)
def connect(self, username=None, password=None, keepalive=60):
self.client.username_pw_set(username, password)
self.client.connect(self.broker, self.port, keepalive)
def subscribe(self, topic, qos=0):
self.client.subscribe(topic, qos)
def publish(self, topic, paylaod, qos=0, retain=False):
self.client.publish(topic, paylaod, qos, retain)
def on_connect(self, client, userdata, flags, reason_code, properties):
if reason_code == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %\n", reason_code)
def on_disconnect(self, client, userdata, flags, reason_code, properties):
if reason_code == 0:
# success disconnect
print("Disconnect to MQTT Broker!")
if reason_code > 0:
# error processing
print("Failed to disconnect, return code %\n", reason_code)
def on_message(self, client, userdata, message):
print("Received message: ", str(message.payload.decode("utf-8")))
self.message_signal.emit(message.topic, message.payload.decode()) # 发射信号UI线程里处理更新数据
def on_subscribe(self, client, userdata, mid, reason_codes, properties):
for sub_result in reason_codes:
if sub_result == 1:
# process QoS == 1
print("on_subscribe process QoS == 1")
# Any reason code >= 128 is a failure.
if sub_result >= 128:
# error processing
print("on_subscribe error processing。")
def on_unsubscribe(client, userdata, mid, reason_codes, properties):
# In NEW version, reason_codes is always a list. Empty for MQTTv3
for unsub_result in reason_codes:
# Any reason code >= 128 is a failure.
if reason_codes[0] >= 128:
# error processing
print("on_unsubscribe error processing.")
def on_publish(self, client, userdata, mid, reason_codes, properties):
print('Public reason_codes %\n', reason_codes)
def on_log(self, client, userdata, level, buf):
print(buf)
def start(self):
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_subscribe = self.on_subscribe
self.client.on_unsubscribe = self.on_unsubscribe
self.client.on_publish = self.on_publish
self.client.on_message = self.on_message
self.client.on_log = self.on_log
self.client.loop_start()
def stop(self):
self.client.loop_stop()
self.client.disconnect()
class MainWindow(QMainWindow):
def __init__(self, parent=None):
super(MainWindow, self).__init__(parent)
self.initUI()
self.client = MqttClient(broker=mqtt_host, port=mqtt_port, client_id=mqtt_client_id)
self.client.connect(username=mqtt_username, password=mqtt_password, keepalive=60)
self.client.subscribe(topic=mqtt_sub_topic, qos=0)
self.client.message_signal.connect(self.update_ui)
self.client.start()
def initUI(self):
self.setWindowTitle("MQTT测试工具")
self.resize(800, 480)
self.center() # 窗口居中显示
self.label_show = QLabel(self)
self.label_show.setText("...")
self.label_show.setStyleSheet("color:blue; font-size:20px;")
self.btn_mqttpub = QPushButton(self)
self.btn_mqttpub.setText("发布消息")
self.btn_mqttpub.clicked.connect(lambda: self.publish_message())
root = QVBoxLayout()
root.addWidget(self.label_show)
root.addWidget(self.btn_mqttpub)
mwidget = QWidget()
mwidget.setLayout(root)
self.setCentralWidget(mwidget)
def center(self):
screen = QDesktopWidget().screenGeometry()
size = self.geometry()
self.move((int)((screen.width()-size.width())/2), (int)((screen.height()-size.height())/2))
def update_ui(self, topic, message):
print('接收到的消息更新到UI显示')
print(topic)
print(message)
self.label_show.setText(topic +" "+ message)
def publish_message(self):
print("发布消息")
self.client.publish(topic=mqtt_pub_topic, paylaod="hello world", qos=0, retain=False)
def closeEvent(self, event):
# 重写closeEvent方法
print('窗口关闭前执行的操作')
self.client.stop() # 停止MQTT
# 调用基类的closeEvent方法来执行关闭事件
super().closeEvent(event)
if __name__ == "__main__":
app = QApplication(sys.argv)
win = MainWindow()
win.show()
sys.exit(app.exec_())
错误处理
- 报错信息:
Unsupported callback API version: version 2.0 added a callback_api_version, see docs/migrations.rst for details
不支持的回调 API 版本:2.0 版本添加了一个callback_api_version,详情请参阅docs/migrations.rst
参考官方文档
-
原因:回调参数不一致,2.0 版本更改了传递给用户回调的参数。回调的版本1已弃用,但在版本 2.x 中仍受支持。
-
解决方法:
- 方法1:
使用旧版本的回调。需告诉 paho-mqtt 您选择此版本即可,修改如下代码:
但是这种方法每次运行的时候,会出现以下警告:from paho.mqtt import client as mqtt # OLD code client = mqtt.Client(client_id) # NEW code client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, client_id)
DeprecationWarning: Callback API version 1 is deprecated, update to latest version - 方法2:需要修改两处
1)在创建client对象时,新增参数:mqtt_client.CallbackAPIVersion.VERSION2
2)在on_connect()函数中,新增参数:propertiesfrom paho.mqtt import client as mqtt # OLD code client = mqtt.Client(client_id) # NEW code client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id) def on_connect(client, userdata, flags, rc, properties): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect,return code {}".format(rc)) client.on_connect = on_connect
- 方法3:降低paho-mqtt版本号到1.x版本
$ pip install paho-mqtt==1.6.1
- 方法1: