MQTT服务搭建及python使用示例

news2025/1/10 21:23:22

1、MQTT协议

1.1、MQTT介绍

        MQTT(Message Queuing Telemetry Transport)是一种轻量级的、基于发布/订阅模式的通信协议,通常用于物联网设备之间的通讯。它具有低带宽、低功耗和开放性等特点,适合在网络带宽有限或者网络连接不稳定的环境下使用。MQTT协议使用TCP/IP协议栈进行通讯,支持多种编程语言和平台,并且能够提供可靠的消息传递机制。

        在MQTT中,设备可以发布消息到特定的主题(topic),同时其他设备可以订阅这些主题以接收相应的消息。这种发布/订阅模式使得设备之间的通讯更加灵活和高效。MQTT还支持三种服务质量等级:至多一次(at most once)、至少一次(at least once)和恰好一次(exactly once),以满足不同场景下对消息传递可靠性的需求。

        优点:代码量少,开销低,带宽占用小,即时通讯协议。

1.2、MQTT概念

        订阅(Subscribtion):订阅包含主题筛选器( Topic Filter )和最大服务质量( QoS )。订阅会与一个会话( Session )关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。

        会话(Session):每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。

        主题名(Topic Name):连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。需要注意的是, MQTT 中消息主题按照层级命名,使用 ‘/’ 进行分割。此外,主题中可以使用通配符进行多个主题或多层级的订阅,有两种常见的通配符:

        单层通配符 + :单层通配符只能匹配一层的主题,例如: China/Beijing/+ ,可以匹配的只有 Beijing 这个主 题下面一层的主题,例如 Xicheng, DongCheng, Xuanwu 等等。

        多层通配符 # :顾名思义,多层通配符就是可以匹配多个层级的主题,例如: China/# ,可以匹配到的主题可能有:China/Beijing/Dongcheng, China/Shanghai/PuDong ,等等。

        主题筛选器(Topic Filter):一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。

        消息订阅:消息订阅者所具体接收的内容。

1.3、MQTT中的角色

        Publisher 和 Subscriber 为客户端,Broker 为服务器端,消息主题为消息类型,Broker 根据 Topic 过滤消息,并将消息向客户端推送。

MQTT 中用 QoS 表示服务质量, MQTT 协议中有三种服务质量 (QoS) :

        1 ) QoS =0 ,至多一次,可能会出现丢包的情况,使用在对实时性要求不高的情况,例如,将此服务质量与通信环境传感器数据一起使用。 对于是否丢失个别读取或是否稍后立即发布新的读取并不重要。

        2 ) QoS =1, 至少一次,保证包会到达目的地,但是可能出现重包。

        3 ) QoS =2, 刚好一次,保证包会到达目的地,且不会出现重包的现象。

客户端:

        Publisher 和 Subscriber 都属于客户端。

        发布应用消息给其它相关的客户端。

        订阅以请求接受相关的应用消息。

        取消订阅以移除接受应用消息的请求。

        从服务端断开连接

服务器:

        服务器端即所谓的 MQTT Broker 服务器。

        接受来自客户端的网络连接。

        接受客户端发布的应用消息。

        处理客户端的订阅和取消订阅请求。

        转发应用消息给符合条件的已订阅客户端。

MQTT 提供的公共服务器端( Broker )有:

test.mosquitto.org

broker.hivemq.com

iot.eclipse.org

2、基于公共服务示例

        在次,选择使用EMQX提供的免费MQTT公共服务器,但同样可以选择其他任何MQTT broker。The Free Global Public MQTT Broker | Try Now | EMQ (emqx.com)

Broker: broker.emqx.io 
TCP Port: 1883 
Websocket Port: 8083

python库:pip install paho-mqtt=="1.6.1"

消息发布代码,pub.py

import time
import random
from paho.mqtt import client as mqtt_client

broker = 'broker.emqx.io'
port = 1883
topic = "/flask/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'


def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def publish(client):
    msg_count = 0
    while True:
        time.sleep(1)
        msg = f"messages: {msg_count}"
        result = client.publish(topic, msg)
        # result: [0, 1]
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")
        msg_count += 1


def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)


if __name__ == '__main__':
    run()

运行打印结果:

消息订阅代码,sub.py:

import random
from paho.mqtt import client as mqtt_client

broker = 'broker.emqx.io'
port = 1883
topic = "/flask/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'


def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

    client.subscribe(topic)
    client.on_message = on_message


def run():
    client = connect_mqtt()
    subscribe(client)
    client.loop_forever()


if __name__ == '__main__':
    run()

运行打印结果:

        可以看到,发布和订阅成功。注意:每个客户端的ID唯一,不能重复!

3、基于Apollo服务示例

3.1、安装Apollo服务

        服务器端搭建:

Index of /dist/activemq/activemq-apollo/1.7.1

解压并打开目标如下:

        Apollo中间件其实是免安装的,我们只需要下载apache-apollo-1.7.1-windows-distro.zip,然后解压到某个文件夹就可以了。在这里我解压到D:\dist\apache-apollo-1.7.1。在apache-apollo-1.7.1/bin目录下打开终端执行命令:apollo create myapollo

        若出现如下错误 Loading configuration file ‘D:\phpStudy\apache-apollo-1.7.1\bin\mybroker\etc\apollo.xml’.Startup failed: java.lang.NoClassDefFoundError: javax/xml/bind/ValidationEventHandler,处理方式:换上jdk1.8版本即可。

        然后在生成的myapollo目录的bin目录下执行:pollo-broker.cmd run,结果如下:

        服务搭建成功,进入后台管理,打开网页,输入ip + : 61680 进入后台管理 ,默认用户名admin 密码 password,例如:http://127.0.0.1:61680

3.2、示例代码

本地服务基础信息:

host="127.0.0.1" 
port = 61613 
用户名:admin 
密码:password

python库:pip install paho-mqtt=="1.6.1"

发布主题,publish.py

import sys
import time
import paho.mqtt.client as mqtt


def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))


def on_subscribe(client, userdata, mid, granted_qos):
    print("消息发送成功")


client = mqtt.Client(protocol=3)
client.username_pw_set("admin", "password")
client.on_connect = on_connect
client.on_subscribe = on_subscribe
client.connect(host="127.0.0.1", port=61613, keepalive=60)  # 订阅频道
time.sleep(1)

i = 0
while True:
    try:
        # 发布MQTT信息
        sensor_data = "test" + str(i)
        client.publish(topic="public", payload=sensor_data, qos=0)
        time.sleep(5)
        i += 1
    except KeyboardInterrupt:
        print("EXIT")
        client.disconnect()
        sys.exit(0)

订阅主题,subscribe.py

import time
import paho.mqtt.client as mqtt


# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("连接成功")
        print("Connected with result code " + str(rc))


def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))


client = mqtt.Client(protocol=3)
client.username_pw_set("admin", "password")
client.on_connect = on_connect
client.on_message = on_message
client.connect(host="127.0.0.1", port=61613, keepalive=60)  # 订阅频道
time.sleep(1)

# client.subscribe("public")
client.subscribe([("public", 0), ("test", 2)])  # 订阅
client.loop_forever()

执行脚本后,正确大小消息。浏览器可见连接记录。

4、EMQX客户端工具

下载地址:MQTTX 下载选择适合您的平台,立即开始使用 MQTTX。icon-default.png?t=N7T8https://mqttx.app/zh/downloads

4.1、公共服务测试消息接收

创建连接:

Host:为代码中定义好的 broker.emqx.io 
Port:为代码中定义好的 1883 
用户名、密码根据需要添加

添加订阅:

主题为:/flask/mqtt

在MQTTX中发布消息:

测试成功。

4.2、自建服务测试消息接收

MQTT版本选择3.1.1,以下参数,与服务保持一致:

host="127.0.0.1" 
port = 61613 
用户名:admin 
密码:password

其它测试步骤,一样。(创建主题,测试)

5、EMQX代理服务器

        windows下搭建MQTT代理服务,与Apollo服务功能一样,更方便好用,下载地址:https://www.emqx.io/zh/downloads

        下载后解压,进入目录bin文件下,执行:emqx start 

        打开浏览器输入 http://localhost:18083 进入EMQ的web控制台,输入用户名:admin 密码:public 进行登录。登录进入后,界面如下:

至此,代理服务器已经创建完成!客户端就可以连接代理服务器进行消息的分发和订阅!

附录:

java1.8.1下载,地址:Java Downloads | Oracleicon-default.png?t=N7T8https://www.oracle.com/java/technologies/downloads/#java8-windows

参考:

1、嵌入式QT- QT使用MQTT

嵌入式QT- QT使用MQTT_qt mqtt-CSDN博客

2、Python实现通信协议(mqtt)5星,mqtt flask

【小沐学Python】Python实现通信协议(mqtt)_python mqtt-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1654858.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

pyinstaller 不是内部或外部命令,也不是可运行的程序或批处理文件的解决办法(详细)

首先我们需要查看是否安装了pyinstaller ,可以在命令窗口输入命令pip list检查是否安装成功, 这里我们可以看见已经安装成功了的,如果没有安装可以执行安装命令 pip install pyinstaller 进行安装即可。 下一步我们排查pyinstaller的安装路…

Python数据可视化------地图

基础地图使用 # 地图基本演示 # 导包 from pyecharts.charts import Map from pyecharts.options import TitleOpts, VisualMapOpts# 准备地图对象 cmap Map() # 准备数据(列表) data [("北京市", 99), ("上海市", 199), ("…

为什么你创业总是失败?2024普通人如何创业?2024创业赛道!2024创业新风口!2024创业方向!2024普通人的机会!

为什么你做项目老是不赚钱,是你不够努力吗?是你运气不好吗? 如果都不是!那一定是你的思维逻辑出了问题! 先想一想你以前做的项目,有没有哪个符合以下条件:对客户有价值、寻找客源成本在可接受…

BFS专题——FloodFill算法:200.岛屿数量

文章目录 题目描述算法原理代码实现CJava 题目描述 题目链接:200.岛屿数量 PS:注意题目中每座岛屿只能由水平方向和/或竖直方向上相邻的陆地连接形成。也就是说斜角是不算了, 例如示例二,是三个岛屿。 算法原理 这道题目是 DFS&#xff0…

商务分析方法与工具(二):Python的趣味快捷-序列结构解决电影推荐

Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊! 喜欢我的博客的话,记得…

【C语言】内存函数的概念,使用及模拟实现

Tiny Spark get dazzling some day. 目录 1. memcpy-- 函数原型-- 函数使用-- 函数的模拟实现 2.memmove-- 函数原型-- 函数使用-- 函数的模拟实现 3. memset-- 函数原型-- 函数使用-- 函数的模拟实现 4. memcmp-- 函数原型-- 函数使用-- 函数的模拟实现 1. memcpy 使用需包含…

3dmax-vray6渲染器参数设置

适用于3dmax2018-2023版本 一、【公用】 小图输出大小:1500*1125,勾选大气、效果; 大图输出大小:3000*2250,勾选大气、效果、置换; 二、【vray】 小图抗锯齿类型:渐进式;最小细分:1,最大细分:100&#…

C++新特性

1 智能指针 1.1 为什么需要智能指针 智能指针主要解决以下问题: 内存泄漏:内存手动释放,使用智能指针可以自动释放共享所有权指针的传播和释放,比如多线程使用同一个对象时析构问题,比如: C里面的四个智…

解决springboot项目的网站静态页面显示不全问题

在通过springboot搭建项目时,为了能够访问静态的前端页面,我们考虑到访问的优先级问题,通常选择将资源放在recourses/static的目录下,如下: 这时可能会出现类似于下面这种图片无法加载、没有按照指定位置显示的情况&am…

R语言:r画韦恩图

> setwd("") > library(openxlsx) > library(ggvenn) > data <- read.xlsx("韦恩图种2.xlsx") data$P <- ifelse(data$P 0, "F", "T") data$N <- ifelse(data$N 0, "F", "T")> data &l…

流星烛台如何交易?Anzo Capital昂首资本3步盈利收场

各位投资者通过之前的文章可以准确的辨认出什么是流星烛台了&#xff0c;但是各位投资者一旦遇到流星图案知道怎么交易吗?其实一点都不困难&#xff0c;只要掌握住流星图案的交易真棒&#xff0c;Anzo Capital昂首资本3步就可以盈利收场。 首先&#xff0c;投资者需要确定图…

技术速递|介绍 .NET MAUI 社区工具包 v8 :包含 TouchBehavior 支持!

作者&#xff1a;Gerald Versluis 排版&#xff1a;Alan Wang .NET MAUI 社区工具包团队很自豪地向您介绍 .NET MAUI 社区工具包的第 8 版&#xff01; 在这个最新的主要版本中&#xff0c;我们为您带来了备受期待的 TouchBehavior&#xff08;以前称为 TouchEffect&#xff0…

【C++】详解STL容器之一的 vector

目录 概述 迭代器 数据结构 优点和缺点 接口介绍 begin end rbegin rend resize reseve insert erase 其他一些接口 模拟实现 框架 获取迭代器 深浅拷贝 赋值重载 reseve resize 拷贝构造 构造 析构 insert erase 其他 概述 vector是STL的容器之一。…

二叉树遍历总结

7.二叉树 二叉树理论基础 二叉树的种类 在我们解题过程中二叉树有两种主要的形式&#xff1a;满二叉树和完全二叉树。 满二叉树 完全二叉树 二叉搜索树 平衡二叉搜索树 C中map、set、multimap&#xff0c;multiset的底层实现都是平衡二叉搜索树&#xff0c;所以map、set的增…

2024.5.8 2.二叉树的最大深度 (简单)

给定一个二叉树 root &#xff0c;返回其最大深度。 二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;3 示例 2&#xff1a; 输入&#xff1a;root [1,null,2] 输…

GZIP文件格式解析和Inflate静态Huffman解压缩

GZIP是封装了Deflate压缩的格式文件&#xff1b;Deflate使用了无压缩、HuffmanLZ77进行压缩&#xff1b;解压是Inflate&#xff0c;Huffman包括静态Huffman压缩和动态Huffman压缩两种模式。 Java语言实现了GZIP格式解析、Inflate的静态Huffman解压缩、CRC32校验 算法。 gzip文…

7-AMCA Mal,可通过与蛋白质上的巯基反应生成具有荧光的标记物

【产品概述】 7-AMCA Mal&#xff0c;也被称为7-AMCA maleimide&#xff0c;是一种在生物医学领域中常用的荧光染料。 中文名称&#xff1a;7-AMCA 马来酰亚胺 英文名称&#xff1a;7-AMCA Mal&#xff0c;7-AMCA maleimide CAS号&#xff1a;N/A 分子式&#xff1a;C18H17N3…

STEP BY STEP带你使用Docker搭建MySql-MGR高可用集群

数据的重要性 数据已成为当今数字时代最重要的资产之一&#xff0c;对于企业的成功至关重要。它可以帮助企业了解客户、市场和自身运营&#xff0c;提高运营效率&#xff0c;做出明智决策&#xff0c;推动创新&#xff0c;并获得竞争优势。 数据的采集&#xff0c;存储&#…

解析源代码安全的防泄密解决途径

随着各行各业业务数据信息化发展&#xff0c;各类产品研发及设计等行业&#xff0c;都有关乎自身发展的核心数据&#xff0c;包括业务数据、代码数据、机密文档、用户数据等敏感信息&#xff0c;这些信息数据有以下共性&#xff1a; 属于核心机密资料&#xff0c;万一泄密会对…

TCP四次挥手中为什么 TIME_WAIT 等待的时间是 2MSL?

TCP 连接断开 1、TCP 四次挥手过程是怎样的&#xff1f;如下图 2、为什么 TIME_WAIT 等待的时间是 2MSL&#xff1f; MSL 是 Maximum Segment Lifetime&#xff0c;报文最大生存时间&#xff0c;它是任何报文在网络上存在的最长时间&#xff0c;超过这个时间报文将被丢弃。因…