目录
概述
1 认识paho.mqtt.client
2 实现MQTT Client
2.1 功能介绍
2.2 paho.mqtt.client库函数介绍
2.3 MQTT Client实现
2.3.1 创建项目
2.3.2 编写MQTT Client代码
2.3.3 Log工具源码
2.4 功能测试代码实现
2.4.1 功能介绍
2.4.2 代码实现
3 测试
3.1 EMQX上创建Client
3.2 运行UserMqttClient
3.3 使用MQTT.fx订阅消息
概述
本文主要介绍使用paho.mqtt.client库实现一个MQTT Client,并使其连接到EMQX物联网平台。其中包括在EMQX创建项目的方法,配置参数的步骤。还是用该MQTT Client发布数据至EMQX,并使用MQTT.fx订阅Topic。MQTT Client也能订阅MQTT.fx发布的Topic数据。
1 认识paho.mqtt.client
paho.mqtt.client是一个Python MQTT客户端库,它提供了与MQTT代理进行通信的功能。MQTT是一种轻量级的消息传递协议,通常用于物联网应用中的设备间通信。
使用paho.mqtt.client,可以创建一个MQTT客户端,并使用其提供的方法来连接到MQTT代理,发布和订阅主题,接收和处理消息。
登录网站可以了解paho.mqtt.client的相关内容。登录地址如下:
https://mqtt.org/software/
2 实现MQTT Client
2.1 功能介绍
使用paho.mqtt.client编写一个 MQTT客户端,可以实现,消息订阅,发布功能。主要功能如下:
1)和Broker相关的参数通过配置文件来实现,不能写死在代码里
2)实现publish topic和subscriber topic功能
3)实时打印publish log和subscriber log
4)使用专用的logging库来处理log数据
5)使用class的方式来实现软件功能
2.2 paho.mqtt.client库函数介绍
在使用paho.mqtt.client库函数编写代码之前,必须对它所提供的重要接口函数的用法有一个清晰的认识,这对后面的如何使用这些接口非常重要。paho.mqtt.client提供了需要参数可供使用,这里只介绍一些,编程中必须用到的接口。其他函数的功能和用法可参考官方文档。
1)connect ()
连接MQTT Broker函数,函数原型如下:
参数介绍:
参数名称 | 功能介绍 |
---|---|
host | MQTT Broker地址 |
port | 连接Broker的端口号 |
keeplive | 心跳包时间间隔 |
2)username_pw_set()
设置Client的用户名和对应的密码,函数原型如下:
参数介绍:
参数名称 | 功能介绍 |
---|---|
username | Client的用户名 |
password | Client的用户名对应的密码 |
3)loop_start()
调用该接口,启动MQTT Client工作线程,这时就可以进行publish或者subscrib 消息,函数原型如下:
4)loop_stop()
调用该接口,销毁MQTT Client工作线程,函数原型如下:
5)subscribe()
订阅消息接口,函数原型如下:
参数介绍:
参数名称 | 功能介绍 |
---|---|
topic | 订阅的主题,例如:subscribe("my/topic", 2) |
qos | 消息的服务质量等级 |
options and properties | 这两个参数在MQTT v5.0的版本中使用,当前版本不使用这两个参数 |
6)unsubscribe()
取消订阅消息接口,函数原型如下:
参数介绍:
参数名称 | 功能介绍 |
---|---|
topic | 取消订阅的主题,例如:unsubscribe("my/topic") |
properties | 这个参数在MQTT v5.0的版本中使用,当前版本不使用这个参数 |
7)publish()
发布消息接口,函数原型如下:
参数介绍:
参数名称 | 功能介绍 |
---|---|
topic | 发布消息的主题 |
payload | 消息内容 |
qos | 消息服务等级 |
retain | Broker是否保留消息 |
properties | 这个参数在MQTT v5.0的版本中使用,当前版本不使用这个参数 |
2.3 MQTT Client实现
2.3.1 创建项目
本项目使用PyCharm 作为开发工具,运行代码前必须安装 paho.mqtt.client库。详细安装方法和步骤见官网文档。创建项目之后编写如下代码
2.3.2 编写MQTT Client代码
编写一个MQTTClient的user类,实现MQTT Client的基本功能,函数列表和介绍如下:
函数名 | 描述 |
---|---|
start | 注册回调函数和连接MQTT Client |
stop | 断开连接和销毁MQTT Client |
usr_subscribe | 订阅函数 |
usr_publish | 发布消息函数 |
usr_unsubscribe | 取消订阅函数 |
usr_on_message | 订阅消息回调函数 |
usr_log_callback | log监控函数 |
receive_msg | 接收消息函数 |
详细代码如下:
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @describe : mqtt handler
# @Time : 2022/03/18 16:21
# @Author : ming fei.tang
import logging
from queue import Queue
import paho.mqtt.client as mqtt
__all__ = ["MQTTClient"]
class MQTTClient:
def __init__(self, host, port, qos, heartbeat, client_id, username, password):
self.host = host
self.port = port
self.qos = qos
self.queue = Queue()
self.mqtt_client_id = client_id
self.heartbeat = heartbeat
self.username = username
self.password = password
self.mqtt_client = None
def usr_on_message(self, user, data, msg):
payload = msg.payload.decode('utf-8')
payload = payload.replace('\n', '').replace('\r', '').replace(' ', '')
logging.debug('subscribe: %s , payload: %s, QoS = %s' % (msg.topic, payload, msg.qos))
self.queue.put(msg)
def usr_subscribe(self, topic):
self.mqtt_client.subscribe(topic, self.qos)
logging.info('subscribe the topic: %s' % topic)
def usr_unsubscribe(self, topic):
self.mqtt_client.unsubscribe(topic)
logging.info('unsubscribe %s' % topic)
def receive_msg(self, timeout=None):
logging.info('waiting for message.')
if timeout is None:
timeout = self.heartbeat
return self.queue.get(timeout=timeout)
def usr_publish(self, topic, payload, qos, retain=False):
self.mqtt_client.publish(topic, payload, qos, retain)
logging.debug('public topic = %s, payload = %s , qos = %s, retain = %s' % (topic, payload, qos, retain))
def usr_log_callback(self, client, userdata, level, msg):
# logging.info('public topic: %s ' % msg)
pass
def start(self):
if self.mqtt_client is None:
self.mqtt_client = mqtt.Client(client_id=self.mqtt_client_id)
self.mqtt_client.on_log = self.usr_log_callback
self.mqtt_client.on_message = self.usr_on_message
self.mqtt_client.username_pw_set(self.username, self.password)
self.mqtt_client.connect(self.host, self.port, self.heartbeat)
self.mqtt_client.loop_start()
logging.info("client('%s') is connected" % self.mqtt_client_id)
else:
logging.error("mqtt_client object is None")
def stop(self):
if self.mqtt_client is not None:
self.mqtt_client.loop_stop()
logging.info("client('%s') is disconnected" % self.mqtt_client_id)
self.mqtt_client.disconnect()
self.mqtt_client = None
2.3.3 Log工具源码
创建logging_tool.py,编写如下代码:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time : 2020/4/9 13:05
# @Author : ming fei.tang
# @File : log tools
# ---------------------
import os
import logging
import datetime
import sys
import coloredlogs
__all__ = ["LogTool"]
class LogTool:
def __init__(self):
self.log_folder = 'clog' + os.sep + datetime.datetime.now().strftime('%Y%m%d')
def setup_logging(self, level=logging.DEBUG, filename=None):
if os.path.exists(self.log_folder) is False:
os.makedirs(self.log_folder)
log_filename = None
if filename is not None:
log_filename = self.log_folder + os.sep + filename
log_format = '%(asctime)5s - %(levelname)5s - %(lineno)4s - %(filename)18s - %(message)s'
if log_filename is not None:
console = logging.StreamHandler(stream=sys.stdout)
console.setLevel(logging.getLogger().level)
console.setFormatter(logging.Formatter(log_format))
logging.getLogger().addHandler(console)
logging.basicConfig(filename=log_filename, level=level, format=log_format)
coloredlogs.install(level=level, fmt=log_format, milliseconds=True)
def remove_log_folder(self):
if os.path.exists(self.log_folder) is False:
os.remove(self.log_folder)
2.4 功能测试代码实现
2.4.1 功能介绍
测试函数主要完成功能:
1)连接EMQX服务器上的mqtt_user3 MQTT Client
2)发布消息至EMQX Broker
3) 订阅其他Client的消息,并通过log打印出来
2.4.2 代码实现
编写一个UserMqttClient的类,测试MQTTClient类中的方法,函数列表和介绍如下:
详细代码如下:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time : 2023/4/9 13:05
# @Author : ming fei.tang
# @File : mqtt client test
# ---------------------
import logging
import random
import time
from lib.MQTT_Client import MQTTClient
from lib.logging_tool import LogTool
class UserMqttClient(MQTTClient):
def __init__(self, broker_address, mqtt_port, qos, heartbeat, client_ID, user_name, user_password):
# prepare log file
super().__init__(broker_address, mqtt_port, qos, heartbeat, client_ID, user_name, user_password)
self.debug = LogTool()
self.debug.setup_logging()
self.debug.remove_log_folder()
try:
self.start()
except Exception as e:
logging.exception(e)
assert False, e
def load_para(self):
pass
def user_publish(self, topic, base_payload):
while True:
val = random.randint(1, 15) * 0.1
val = base_payload + val
val = '{:.2f}'.format(val)
publish_payload = str(val)
self.usr_publish(topic, publish_payload, 1, True)
time.sleep(10)
if __name__ == '__main__':
host = "192.168.1.11"
port = 1883
client_id = "paho.mqtt.client"
username = "mqtt_user3"
password = "123456"
user_client = UserMqttClient(host, port, 0, 60, client_id, username, password)
user_client.usr_subscribe("switch")
user_client.usr_subscribe("attributes")
user_client.user_publish(topic="temperature", base_payload=12)
3 测试
3.1 EMQX上创建Client
打开EMQX创建如下MQTT Client
名称 | 参数值 |
---|---|
usermname | mqtt_user3 |
password | 123456 |
在EMQX客户端认证面板上创建该Client
3.2 运行UserMqttClient
运行UserMqttClient之后, mqtt_user3会自动连接至EMQX,可以在客户端面板查看它的状态
查看EMQX上 mqtt_user3状态,其已经正常的连接到了Broker:
mqtt_user3订阅的消息分别为 switch和attributes
3.3 使用MQTT.fx订阅消息
使用MQTT.fx作为MQTT Client登录EMQX, 然后订阅temperature消息,可以看见MQTT.fx这边能正确的收到
查看EMQX上保留的mqtt_user3发布的消息: