使用paho.mqtt.client实现MQTT Client连接EMQX Broker

news2024/11/24 14:49:10

目录

概述

1 认识paho.mqtt.client

2 实现MQTT Client

2.1 功能介绍

2.2 paho.mqtt.client库函数介绍

2.3 MQTT Client实现

2.3.1 创建项目

2.3.2 编写MQTT Client代码

2.3.3 Log工具源码

2.4 功能测试代码实现

2.4.1 功能介绍

2.4.2 代码实现

3 测试

3.1 EMQX上创建Client

3.2 运行UserMqttClient

3.3 使用MQTT.fx订阅消息


概述

本文主要介绍使用paho.mqtt.client库实现一个MQTT Client,并使其连接到EMQX物联网平台。其中包括在EMQX创建项目的方法,配置参数的步骤。还是用该MQTT Client发布数据至EMQX,并使用MQTT.fx订阅Topic。MQTT Client也能订阅MQTT.fx发布的Topic数据。

1 认识paho.mqtt.client

paho.mqtt.client是一个Python MQTT客户端库,它提供了与MQTT代理进行通信的功能。MQTT是一种轻量级的消息传递协议,通常用于物联网应用中的设备间通信。

使用paho.mqtt.client,可以创建一个MQTT客户端,并使用其提供的方法来连接到MQTT代理,发布和订阅主题,接收和处理消息。

登录网站可以了解paho.mqtt.client的相关内容。登录地址如下:

https://mqtt.org/software/

2 实现MQTT Client

2.1 功能介绍

使用paho.mqtt.client编写一个 MQTT客户端,可以实现,消息订阅,发布功能。主要功能如下:

1)和Broker相关的参数通过配置文件来实现,不能写死在代码里

2)实现publish topic和subscriber topic功能

3)实时打印publish log和subscriber log

4)使用专用的logging库来处理log数据

5)使用class的方式来实现软件功能

2.2 paho.mqtt.client库函数介绍

在使用paho.mqtt.client库函数编写代码之前,必须对它所提供的重要接口函数的用法有一个清晰的认识,这对后面的如何使用这些接口非常重要。paho.mqtt.client提供了需要参数可供使用,这里只介绍一些,编程中必须用到的接口。其他函数的功能和用法可参考官方文档。

1)connect ()

连接MQTT Broker函数,函数原型如下:

参数介绍:

参数名称功能介绍
hostMQTT Broker地址
port连接Broker的端口号
keeplive心跳包时间间隔

2)username_pw_set()

设置Client的用户名和对应的密码,函数原型如下:

参数介绍:

参数名称功能介绍
usernameClient的用户名
passwordClient的用户名对应的密码

3)loop_start()

调用该接口,启动MQTT Client工作线程,这时就可以进行publish或者subscrib 消息,函数原型如下:

4)loop_stop()

调用该接口,销毁MQTT Client工作线程,函数原型如下:

5)subscribe()

订阅消息接口,函数原型如下:

参数介绍:

参数名称功能介绍
topic订阅的主题,例如:subscribe("my/topic", 2)
qos消息的服务质量等级
options and properties这两个参数在MQTT v5.0的版本中使用,当前版本不使用这两个参数

6)unsubscribe()

取消订阅消息接口,函数原型如下:

参数介绍:

参数名称功能介绍
topic取消订阅的主题,例如:unsubscribe("my/topic")
properties这个参数在MQTT v5.0的版本中使用,当前版本不使用这个参数

7)publish()

发布消息接口,函数原型如下:

参数介绍:

参数名称功能介绍
topic发布消息的主题
payload消息内容
qos消息服务等级
retainBroker是否保留消息
properties这个参数在MQTT v5.0的版本中使用,当前版本不使用这个参数

2.3 MQTT Client实现

2.3.1 创建项目

本项目使用PyCharm 作为开发工具,运行代码前必须安装 paho.mqtt.client库。详细安装方法和步骤见官网文档。创建项目之后编写如下代码

2.3.2 编写MQTT Client代码

编写一个MQTTClient的user类,实现MQTT Client的基本功能,函数列表和介绍如下:

函数名描述
start注册回调函数和连接MQTT Client
stop断开连接和销毁MQTT Client
usr_subscribe订阅函数
usr_publish发布消息函数
usr_unsubscribe取消订阅函数
usr_on_message订阅消息回调函数
usr_log_callbacklog监控函数
receive_msg接收消息函数

详细代码如下:

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @describe : mqtt handler
# @Time    : 2022/03/18 16:21
# @Author  : ming fei.tang
import logging
from queue import Queue
​
import paho.mqtt.client as mqtt
​
__all__ = ["MQTTClient"]
​
​
class MQTTClient:
​
    def __init__(self, host, port, qos, heartbeat, client_id, username, password):
        self.host = host
        self.port = port
        self.qos = qos
        self.queue = Queue()
        self.mqtt_client_id = client_id
        self.heartbeat = heartbeat
        self.username = username
        self.password = password
        self.mqtt_client = None
​
    def usr_on_message(self, user, data, msg):
        payload = msg.payload.decode('utf-8')
        payload = payload.replace('\n', '').replace('\r', '').replace(' ', '')
        logging.debug('subscribe: %s , payload: %s, QoS = %s' % (msg.topic, payload, msg.qos))
​
        self.queue.put(msg)
​
    def usr_subscribe(self, topic):
        self.mqtt_client.subscribe(topic, self.qos)
        logging.info('subscribe the topic: %s' % topic)
​
    def usr_unsubscribe(self, topic):
        self.mqtt_client.unsubscribe(topic)
        logging.info('unsubscribe %s' % topic)
​
    def receive_msg(self, timeout=None):
        logging.info('waiting for message.')
        if timeout is None:
            timeout = self.heartbeat
        return self.queue.get(timeout=timeout)
​
    def usr_publish(self, topic, payload, qos, retain=False):
        self.mqtt_client.publish(topic, payload, qos, retain)
        logging.debug('public topic = %s, payload = %s , qos = %s, retain = %s' % (topic, payload, qos, retain))
​
    def usr_log_callback(self, client, userdata, level, msg):
        # logging.info('public topic: %s ' % msg)
        pass
​
    def start(self):
        if self.mqtt_client is None:
            self.mqtt_client = mqtt.Client(client_id=self.mqtt_client_id)
            self.mqtt_client.on_log = self.usr_log_callback
            self.mqtt_client.on_message = self.usr_on_message
            self.mqtt_client.username_pw_set(self.username, self.password)
            self.mqtt_client.connect(self.host, self.port, self.heartbeat)
            self.mqtt_client.loop_start()
            logging.info("client('%s') is connected" % self.mqtt_client_id)
        else:
            logging.error("mqtt_client object is None")
​
    def stop(self):
        if self.mqtt_client is not None:
            self.mqtt_client.loop_stop()
            logging.info("client('%s')  is disconnected" % self.mqtt_client_id)
            self.mqtt_client.disconnect()
            self.mqtt_client = None

2.3.3 Log工具源码

创建logging_tool.py,编写如下代码:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time    : 2020/4/9 13:05
# @Author  : ming fei.tang
# @File    : log tools
# ---------------------
import os
import logging
import datetime
import sys
​
import coloredlogs
​
__all__ = ["LogTool"]
​
​
class LogTool:
    def __init__(self):
        self.log_folder = 'clog' + os.sep + datetime.datetime.now().strftime('%Y%m%d')
​
    def setup_logging(self, level=logging.DEBUG, filename=None):
        if os.path.exists(self.log_folder) is False:
            os.makedirs(self.log_folder)
​
        log_filename = None
        if filename is not None:
            log_filename = self.log_folder + os.sep + filename
​
        log_format = '%(asctime)5s - %(levelname)5s - %(lineno)4s - %(filename)18s - %(message)s'
        if log_filename is not None:
            console = logging.StreamHandler(stream=sys.stdout)
            console.setLevel(logging.getLogger().level)
            console.setFormatter(logging.Formatter(log_format))
            logging.getLogger().addHandler(console)
​
        logging.basicConfig(filename=log_filename, level=level, format=log_format)
        coloredlogs.install(level=level, fmt=log_format, milliseconds=True)
​
    def remove_log_folder(self):
        if os.path.exists(self.log_folder) is False:
            os.remove(self.log_folder)
​

2.4 功能测试代码实现

2.4.1 功能介绍

测试函数主要完成功能:

1)连接EMQX服务器上的mqtt_user3 MQTT Client

2)发布消息至EMQX Broker

3) 订阅其他Client的消息,并通过log打印出来

2.4.2 代码实现

编写一个UserMqttClient的类,测试MQTTClient类中的方法,函数列表和介绍如下:

详细代码如下:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time    : 2023/4/9 13:05
# @Author  : ming fei.tang
# @File    : mqtt client test
# ---------------------
import logging
import random
import time
​
from lib.MQTT_Client import MQTTClient
from lib.logging_tool import LogTool
​
​
class UserMqttClient(MQTTClient):
    def __init__(self, broker_address, mqtt_port, qos, heartbeat, client_ID, user_name, user_password):
        # prepare log file
        super().__init__(broker_address, mqtt_port, qos, heartbeat, client_ID, user_name, user_password)
        self.debug = LogTool()
        self.debug.setup_logging()
        self.debug.remove_log_folder()
        try:
            self.start()
        except Exception as e:
            logging.exception(e)
            assert False, e
​
    def load_para(self):
        pass
​
    def user_publish(self, topic, base_payload):
        while True:
            val = random.randint(1, 15) * 0.1
            val = base_payload + val
            val = '{:.2f}'.format(val)
            publish_payload = str(val)
            self.usr_publish(topic, publish_payload, 1, True)
            time.sleep(10)
​
​
if __name__ == '__main__':
    host = "192.168.1.11"
    port = 1883
    client_id = "paho.mqtt.client"
    username = "mqtt_user3"
    password = "123456"
​
    user_client = UserMqttClient(host, port, 0, 60, client_id, username, password)
​
    user_client.usr_subscribe("switch")
    user_client.usr_subscribe("attributes")
​
    user_client.user_publish(topic="temperature", base_payload=12)
​

3 测试

3.1 EMQX上创建Client

打开EMQX创建如下MQTT Client

名称参数值
usermnamemqtt_user3
password123456

在EMQX客户端认证面板上创建该Client

3.2 运行UserMqttClient

运行UserMqttClient之后, mqtt_user3会自动连接至EMQX,可以在客户端面板查看它的状态

查看EMQX上 mqtt_user3状态,其已经正常的连接到了Broker:

mqtt_user3订阅的消息分别为 switch和attributes

3.3 使用MQTT.fx订阅消息

使用MQTT.fx作为MQTT Client登录EMQX, 然后订阅temperature消息,可以看见MQTT.fx这边能正确的收到

查看EMQX上保留的mqtt_user3发布的消息:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1532679.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

回归预测 | Matlab基于SAO-LSTM雪消融算法优化长短期记忆神经网络的数据多输入单输出回归预测

回归预测 | Matlab基于SAO-LSTM雪消融算法优化长短期记忆神经网络的数据多输入单输出回归预测 目录 回归预测 | Matlab基于SAO-LSTM雪消融算法优化长短期记忆神经网络的数据多输入单输出回归预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.Matlab基于SAO-LSTM雪消融…

课时70:流程控制_for循环_嵌套循环

2.4.4 嵌套循环 学习目标 这一节,我们从 基础知识、简单实践、小结 三个方面来学习。 基础知识 简介 这里的嵌套实践,与选择语句的嵌套实践基本一致,只不过组合的方式发生了一些变化。常见的组合样式如下:for嵌套for语句for …

[STM32] Keil MDK 新建工程编译不通过(warning: #2803-D和Error: L6218E)解决方法备忘

按照野火的PDF教程的第4章:[野火]《RT-Thread 内核实现与应用开发实战—基于STM32》.pdf 新建 Keil MDK 工程,工程设置完成后点击编译按钮,编译不通过: RTE\Device\ARMCM3\startup_ARMCM3.c(75): warning: #2803-D: unrecognize…

nodejs社区垃圾分类管理平台的设计与实现python-flask-django-php

近些年来,随着科技的飞速发展,互联网的普及逐渐延伸到各行各业中,给人们生活带来了十分的便利,社区垃圾分类管理平台利用计算机网络实现信息化管理,使整个社区垃圾分类管理的发展和服务水平有显著提升。 语言&#xf…

Hash类型

2.3.Hash类型 Hash类型,也叫散列,其value是一个无序字典,类似于Java中的HashMap结构。 String结构是将对象序列化为JSON字符串后存储,当需要修改对象某个字段时很不方便: Hash结构可以将对象中的每个字段独立存储&am…

【Qt】使用Qt实现Web服务器(三):QtWebApp中HttpRequest和HttpResponse

1、HttpRequest 1.1 示例 1)在Demo1的Dump HTTP request示例 在浏览器中输入http://127.0.0.1:8080点击Dump HTTP request 2)切换到页面:http://127.0.0.1:8080/dump 该页面显示请求和响应的内容: Request: Method: GET Path: /dump Version: HTTP/1.1 Headers: accep…

图片编辑器中实现文件上传的三种方式和二进制流及文件头校验文件类型

背景 最近在 vue-design-editor 开源项目中实现 psd 等多种文件格式上传解析成模板过程中, 发现搞定设计文件上传没有使用 input 实现文件上传, 所以我研究了一下相关技术, 总结了以下三种文件上传方法 input 文件选择window.showOpenFilePicker 和 window.showDirectoryPicke…

树莓派夜视摄像头拍摄红外LED灯

NoIR相机是一种特殊类型的红外摄像头,其名称来源于"No Infrared"的缩写。与普通的彩色摄像头不同,NoIR相机具备红外摄影和低光条件下摄影的能力。 一般摄像头能够感知可见光,并用于普通摄影和视频拍摄。而NoIR相机则在设计上去除了…

【Hadoop】Hadoop 编译源码

目录 为什么要源码编译Hadoop 编译源码1前期工作准备2jar 包安装2.1安装 Maven2.2安装 ant2.3安装 glibc-headers 和 g2.4安装 make 和 cmake2.5安装 protobuf2.6安装 openssl 库2.7安装 ncurses-devel 库 3编译源码3.1解压源码到 /opt/ 目录3.2 进入到 hadoop 源码主目录 /opt…

Redis持久化笔记(3)

redis持久化:把内存的数据存放到磁盘,避免因为断电等导致数据丢失。 RDB(Redis Database) rdb就是在一定时间间隔内把当时的数据和状态保存为 .rdb文件放在磁盘中。 自动触发设置:在redis.conf 修改.rdb文件的保存位…

【Vue】三、使用ElementUI实现图片上传

目录 一、前端代码实现 二、后端代码实现 三、调试效果实现 一、前端代码实现 废话不多说直接上代码 <el-form-item prop"image" label"上传图片" v-model"form.image"><el-upload:action"http://localhost:8…

【prometheus-operator】k8s监控集群外redis

1、部署exporter GitHub - oliver006/redis_exporter: Prometheus Exporter for Redis Metrics. Supports Redis 2.x, 3.x, 4.x, 5.x, 6.x, and 7.x redis_exporter-v1.57.0.linux-386.tar.gz # 解压 tar -zxvf redis_exporter-v1.57.0.linux-386.tar.gz # 启动 nohup ./redi…

Go——指针和内存逃逸

区别于C/C中的指针&#xff0c;Go语言中的指针不能进行偏移和运算&#xff0c;是安全指针。 要搞明白Go语言中的指针概念需要先知道3个概念&#xff1a;指针地址&#xff0c;指针类型和指针取值。 一. Go语言的指针 Go语言中的函数传参都是值拷贝&#xff0c;当我们想修改某个…

# Django通过开关控制数据库参数(JS版)

目录 场景初始的视图层HTML部分JS代码视图层接受部分 场景 此时我的表单中有一排开关 数据库有一排状态 需求是要当开关开启时数据库state为1&#xff0c;关闭时为0 初始的视图层 将整个adv数据表返回给前端HTML def adv(request):adv_list Adv.objects.all()return rende…

语言教育App头牌Duolingo如何重新点燃用户增长350%?

Duolingo是全球最大的语言教育APP&#xff0c;拥有数亿用户&#xff0c;然而用户增长正在放缓&#xff0c;本案例以Duolingo增长 通过数据建模洞察关键指标&#xff0c;并围绕指标用增长实验驱动&#xff0c;设计植根于创新的增长模式&#xff0c;包括启动排行榜&#xff0c;重…

docker仓库登录及配置insecure-registries的方法

docker仓库登录及配置insecure-registries的方法 这篇文章主要介绍了docker仓库登录配置insecure-registries的方法,docker客户端如果配置中添加了insecure-registary配置&#xff0c;就不需要在docker 客户端配置上对应证书&#xff0c;如果不配置要在/etc/docker/certs.d/目…

【阅读论文】When Large Language Models Meet Vector Databases: A Survey

摘要 本调查探讨了大型语言模型&#xff08;LLM&#xff09;和向量数据库&#xff08;VecDB&#xff09;之间的协同潜力&#xff0c;这是一个新兴但迅速发展的研究领域。随着LLM的广泛应用&#xff0c;出现了许多挑战&#xff0c;包括产生虚构内容、知识过时、商业应用成本高昂…

流畅的 Python 第二版(GPT 重译)(十三)

第二十四章&#xff1a;类元编程 每个人都知道调试比一开始编写程序要困难两倍。所以如果你在编写时尽可能聪明&#xff0c;那么你将如何调试呢&#xff1f; Brian W. Kernighan 和 P. J. Plauger&#xff0c;《编程风格的要素》 类元编程是在运行时创建或自定义类的艺术。在 P…

ZYNQ EMIO MIO

1 概述 先来了解GPIO的BANK分布&#xff0c;在UG585文档GPIO一章中可以看到GPIO是有4个BANK&#xff0c; 注意与MIO的BANK区分。 BANK0 控制32个信号&#xff0c;BANK1控制22个信号&#xff0c;总共是MIO的54个引脚&#xff0c;也就是诸如 SPI,I2C,USB,SD 等 PS 端外设接口&am…

C语言字符函数与字符串函数:编织文字的舞会之梦(上)

欢迎来到白刘的领域 Miracle_86.-CSDN博客 系列专栏 C语言知识 先赞后看&#xff0c;已成习惯 创作不易&#xff0c;多多支持&#xff01; 在编程的过程中&#xff0c;我们经常要处理字符以及字符串&#xff0c;为了方便操作这些字符和字符串&#xff0c;C语言标准库中提供…