python实现MQTT协议(发布者,订阅者,topic)

news2025/1/11 22:53:21

python实现MQTT协议

一、简介

1.1 概述

本文章针对物联网MQTT协议完成python实现

1.2 环境

  • Apache-apollo创建broker
  • Python实现发布者和订阅者

1.3 内容

  • MQTT协议架构说明 :

  • 利用仿真服务体会 MQTT协议

  • 针对MQTT协议进行测试

任务1:MQTT协议应用场景

说明: MQ 遥测传输 (MQTT) 是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放、简单、轻量、 易于实现。这些特点使它适用于受限环境。该协议的特点有:

1 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
2 对负载内容屏蔽的消息传输。
3 使用 TCP/IP 提供网络连接。

物联网应用场景:

image-20230901160722558

协议角色分工:

客户端分为2种角色:发布者(Publisher)和订阅者(Subscriber)。
每一个发布者(Publisher)可以发送不同类型的消息,我们把消息的类型叫做主题(topic),
MQTT通信中的消息都属于某一个主题 ,而只有订阅了这个主题的订阅者(Subscriber)才能收到属于这个主题的消息。
发布者和订阅者不需要在意和知道对方的存在(不需要知道对方的IP和端口),也不需要直接与对方建立连接。因为通信中存在着一个叫代理 (MQTT broker)的第三种角色,也可以叫MQTT服务器(MQTT server)。 
发布者、订阅者只需要知道MQTT服务器的IP和端口即可,并和它直接建立连接通信。MQTT代理作为消息的 中转,它过滤所有接受到的消息,并按照一定的机制(MQTT标准规定是基于主题的消息过滤派发方式,而具 体的MQTT服务器软件也提供了其他的派发方式)分发它们,使得所有注册到MQTT代理的订阅者只接收到他 们订阅了的消息,而不会收到他不关心的消息。
当发布者发布一条消息的时候,他必须同时指定消息的主题和消息的负载。MQTT代理在收到发布者发过来的 消息时,无需访问消息负载,他只是访问消息的主题信息,然后根据这主题派发给订阅者。需要注意的是,一个客户端可以同时既当发布者又当订阅者。比如一个开发板连接了一盏LED灯,它可以发布灯的暗/亮状态 信息,也可以从其他节点订阅对灯的控制消息。

3.生产者(发布消息)和消费者(消耗消息-订阅者)模式理解

生产者:wifi设备采集各种物联网传感器比如温度重力传感器
消费者:客户端比如手机
原理如下:

image-20230901095257410

任务2:搭建Mqtt协议服务 (broker)

前提:安装JDK和JAVA_HOME环境变量

1 下载Apollo服务器 地址 http://archive.apache.org/dist/activemq/activemq-apollo/1.7.1/

2 进入bin目录命令行 输入:

D:\softwares\apache-apollo-1.7.1\bin\apollo.cmd create jwbroker

3:broker\etc\apollo.xml文件下是配置服务器信息的文件
初始默认帐号是admin,密码password;

4:启动命令行: (以一个实例为单位进行创建的)

进入... jwbroker创建实例的\bin\ 目录,
在CMD输入命令apollo-broker.cmd run,可以使用TAB键自动补全,运行后输出信息如下:验证:
MQTT服务器TCP连接端口: tcp://0.0.0.0:61613
后台web管理页面:https://127.0.0.1:61681/或http://127.0.0.1:61680/

出现如下图表示启动成功

image-20230901154500857

安装mqtt需要的包:

pip install paho-mqtt

发布者publish创建:

import time
from paho.mqtt import publish
#源码中只需要知道   ip + 端口 + 订阅的主题
HOST ="127.0.0.1"
PORT =61613

def on_connect(client,userdata, flags,rc):
    print("Connected with result code" + str(rc))
    client.subscribe("jw-temperature") # 发布主题

def on_message(client,userdata,msg):
    print(msg.topc +  "消息发送!" + msg.payload.decode("utf-8"))

if __name__ == '__main__':
    print("消息发布!----我是一个发布者:正在给设备和传感器发布主题-----")
    client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
    for i in range(20):
        time.sleep(2)
        publish.single("lightChange","现在天黑了", qos = 1, hostname = HOST, port = PORT, client_id = client_id,
                       auth = {'username': "admin", 'password': "password"})
        print("已发送"+str(i+1)+"条消息")

订阅者light_subcribe创建:

import paho.mqtt.client as mqtt
import time

#源码中只需要知道   ip + 端口 + 订阅的主题
HOST ="127.0.0.1"
PORT =61613

'''
The callback for when the client receives a CONNACK response from the server
客户端接收到服务器端的确认连接请求时,回调on_connect服务端发送CONNACK报文响应从客户端收到的CONNECT报文。
服务端发送给客户端的第一个报文必须是CONNACK [MQTT-3.2.0-1].
'''
def on_connect(client,userdata, flags,rc):
    print("Connected with result code" + str(rc))
    '''
    Subscribing in on_connect() means that if we lose the connection 
    and  reconnect then subscriptions wil be renewed(恢复、续订).'''
    client.subscribe("lightChange") # 订阅主题

'''
The callback for when a PUBLISH message is received from the server.
客户端接收到服务器向其传输的消息时,
回调on_messagePUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息。
'''
def on_message(client,userdata,msg):
    print(msg.topic+ msg.payload.decode("utf-8") +  ",回调消息:收到收到!我已经接收到发布者的消息,并且打开了光传感器" )

def client_loop():
    '''
    注意,client_id是必须的,并且是唯一的。否则可能会出现如下错误
    [WinError 10054] 远程主机强迫关闭了一个现有的连接
    '''

    client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
    client = mqtt.Client(client_id) # Client_id 不能重复,所以使用当前时间
    client.username_pw_set("admin","password") # 必须设置,否则会返回 /Connected with result code 4/
    client.on_connect = on_connect
    client.on_message = on_message

    '''
    拥塞回调:处理网络流量,调度回调和重连接。
    Blocking call that processes network traffic, 
    dispatches callbacks and handles reconnecting.
    Other loop*() functions are available that give a threaded interface and amanual- interface...I
    '''
    try:
        client.connect(HOST,PORT,60)
        client.loop_forever()
    except KeyboardInterrupt:
        client.disconnect()

if __name__ == '__main__':
    print("手电筒打开----我是一个订阅者:需要消费主题-----")
    client_loop()

订阅者phone_subcribe创建:

import paho.mqtt.client as mqtt
import time

#源码中只需要知道   ip + 端口 + 订阅的主题
HOST ="127.0.0.1"
PORT =61613
'''
The callback for when the client receives a CONNACK response from the server
客户端接收到服务器端的确认连接请求时,回调on_connect服务端发送CONNACK报文响应从客户端收到的CONNECT报文。
服务端发送给客户端的第一个报文必须是CONNACK [MQTT-3.2.0-1].

'''
def on_connect(client,userdata, flags,rc):
    print("Connected with result code" + str(rc))
    '''
    Subscribing in on_connect() means that if we lose the connection 
    and  reconnect then subscriptions wil be renewed(恢复、续订).
    '''
    client.subscribe("lightChange") # 订阅主题


'''
The callback for when a PUBLISH message is received from the server.
客户端接收到服务器向其传输的消息时,
回调on_messagePUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息。
'''
def on_message(client,userdata,msg):
    print(msg.topic  + msg.payload.decode("utf-8")+  ",回调消息:收到收到!我已经接收到发布者的消息并给用户反馈手电筒已经打开")

def client_loop():
    '''
    注意,client_id是必须的,并且是唯一的。否则可能会出现如下错误
    [WinError 10054] 远程主机强迫关闭了一个现有的连接
    '''

    client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
    client = mqtt.Client(client_id) # Client_id 不能重复,所以使用当前时间
    client.username_pw_set("admin","password") # 必须设置,否则会返回 /Connected with result code 4/
    client.on_connect = on_connect
    client.on_message = on_message

    '''
    拥塞回调:处理网络流量,调度回调和重连接。
    Blocking call that processes network traffic, 
    dispatches callbacks and handles reconnecting.
    Other loop*() functions are available that give a threaded interface and amanual- interface...I
    '''
    try:
        client.connect(HOST,PORT,60)
        client.loop_forever()
    except KeyboardInterrupt:
        client.disconnect()

if __name__ == '__main__':
    print("手机启动----我是一个订阅者:需要消费主题-----")
    client_loop()

演示:分别运行publish和light_subcribe和phone_subcribe

publish:

image-20230901160129432

light_subcribe:

image-20230901160148825

phone_subcribe:

image-20230901160158514

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/961716.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关于Incapsula reese84加密的特征研究

最近研究了下reese84的加密算法,基本上两个参数的加密__utmvc和token,因为nodejs调用会有内存问题,没有采用补环境的方式解决,用python扣的算法 1:__utmvc参数的生成是一个ob混淆,ast处理之后调试难度不是很大 测试结…

说说IO多路复用

分析&回答 IO多路复用 I/O multiplexing 这里面的 multiplexing 指的其实是在单个线程通过记录跟踪每一个Sock(I/O流)的状态(对应空管塔里面的Fight progress strip槽)来同时管理多个I/O流。直白点说:多路指的是多个socket连接,复用指的是复用一个…

ROS 2官方文档(基于humble版本)学习笔记(二)

ROS 2官方文档(基于humble版本)学习笔记(二) 理解节点(node)ros2 runros2 node list重映射(remap)ros2 node info 理解话题(topic)rqt_graphros2 topic listr…

JVM工具-1. jps 虚拟机进程状态工具

文章目录 1. jps介绍2. jps命令格式3. jps工具主要选项4. jps -q5. jps -m6. jps -l7. jps -v 1. jps介绍 jps(JVM Process Status Tool):虚拟机进程状态工具,可以列出正在运行的虚拟机进程,并显示虚拟机执行主类(Main Class&…

【校招VIP】操作系统考点之sleep和wait

考点介绍: 多线程可以说是进阶必备的知识点,也是面试中必备的考点。 可能不少人能对多线程说上一二,但这还远远不够,如果碰到比较有经验的面试官再继续追问,很可能会被吊打。 操作系统考点之sleep和wait 相关题目及解…

2023-9-1-虚拟网卡学习

🍿*★,*:.☆( ̄▽ ̄)/$:*.★* 🍿 💥💥💥欢迎来到🤞汤姆🤞的csdn博文💥💥💥 💟💟喜欢的朋友可以关注一下&#xf…

使用boost::geometry::union_ 合并边界(内、外):方案二

使用boost::geometry::union_ 合并边界&#xff08;内、外&#xff09;&#xff1a;方案二 typedef boost::geometry::model::d2::point_xy<double> boost_point; typedef boost::geometry::model::polygon<boost_point> boost_Polygon;struct Point {float x;floa…

kubernetes进阶 (二) 搭建harbor仓库及镜像制作

我遇到的场景规模较大&#xff0c;F5后有很多个仓库。 并且仓库直接还存在同步关系&#xff0c;因为拉取镜像走的是F5&#xff0c;当碰到莫名其妙的原因(仓库挂了&#xff0c;或者维护、磁盘满了&#xff09;&#xff0c;上送到根仓库的镜像没有同步到其他所以的仓库&#xff0…

高版本springboot3.1配置Eureka客户端问题

只需要按上面配置好&#xff0c;然后高版本的Eureka&#xff0c;不需要EnableEurekaClient这个注解了&#xff0c;直接SpringBoot启动&#xff0c;就可以注册到注册中心。 /*********************************************************/ /** * 开启eureka客户端功能 */ //E…

go锁-互斥锁

go锁-互斥锁 sema初始值是0&#xff0c;waitershift等待协程的数量 正常枷锁&#xff1a; 尝试CAS直接加锁&#xff0c;通过原子包给lockerd 为枷锁 若无法直接获取&#xff0c;进行多次自旋尝试&#xff0c;未获取到的锁的g &#xff0c;多次执行空语句&#xff0c;多次尝试…

indeogram用法

特点&#xff1a; indeogram.ai 是一种基于人工智能的图形设计工具&#xff0c;可以帮助用户快速和轻松地创建专业级的图形。它使用人工智能来识别图形的元素&#xff0c;并自动生成设计方案。这使得 indeogram.ai 非常适合没有任何图形设计经验的用户。 登录网站&#xff1a…

参数和BigDecimal zero比较失效的异常

记录Bigdecimal中参数和BigDecimal zero比较失效的异常 List<StockBatchDTO> b a.stream().filter(v->!v.getQuantity().equals(BigDecimal.ZERO)).collect(Collectors.toList());//失效List<StockBatchDTO> c a.stream().filter(v->v.getQuantity().comp…

高忆管理:股票最基本的知识?

股票是一个经济体中的一份所有权。持有股票的人成为公司的股东&#xff0c;代表他们在公司中有一定的决策权和分红权。股票商场是一个重要的金融商场&#xff0c;关于企业和出资者都具有重要的含义。那么&#xff0c;股票出资的基本知识是什么呢&#xff1f; 一、 股票的界说 …

DEAP库文档教程三-----创建类型

本节将继续展示如何通过creator创建类型以及如何使用toolbox如何对复杂问题进行初始化。 Particle的初始化--粒子初始化 一个Particle是另一个特殊类型的个体&#xff0c;这是因为通常情况下它有一个速度&#xff0c;并且有一个最优的位置需要去记忆。这种类型个体的创建与通…

六、高并发内存池--Central Cache

六、高并发内存池–Central Cache 6.1 Central Cache的工作原理 central cache也是一个哈希桶结构&#xff0c;他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每个哈希桶位置挂是SpanList链表结构&#xff0c;不过每个映射桶下面的span中的大内存块被按映射关系切…

pycharm创建的虚拟环境为什么用conda env list命令查询不到?

问题描述&#xff1a;pycharm创建的虚拟环境为什么用conda env list命令查询不到。 pycharm开发环境可以创建虚拟环境&#xff0c;目的是为隔绝其他环境种库带来的版本干扰&#xff0c;但是发现一个问题&#xff0c;无论是在windows终端、anaconda终端、Pycharm开发环境中的终…

Redis之MoreKey问题及Scan命令解读

目录 MoreKey问题讨论 Scan命令 Sscan命令 Hscan命令 Zscan命令 MoreKey问题讨论 keys * 查看当前库所有key 对于海量数据执行key *会造成严重服务卡顿、影响业务。在实际环境中最好不要使用。生产制造过程中keys * / flushdb/flushall等危险命令以防止误删误用。 大量的…

求臻医学:ctDNA动态监测可预测A+T治疗疗效

臻知识专家访谈 | 第45期 抗血管生成药物贝伐珠单抗&#xff08;A&#xff09;和一代靶向药厄洛替尼&#xff08;T&#xff09;的联合&#xff0c;能够显著提高EGFR敏感突变&#xff08;非T790M&#xff09;NSCLC患者的PFS和5年生存率&#xff0c;延缓靶向药物的耐药。但不是所…

智慧排水监测系统,科技助力城市排水治理

城市里&#xff0c;人们每天通过道路通行&#xff0c;人多&#xff0c;路窄&#xff0c;都会拥堵。同样&#xff0c;下雨天&#xff0c;雨水通过雨篦汇集、管道输送&#xff0c;最终排出去&#xff0c;当雨水过大&#xff0c;或者管道过窄&#xff0c;或者管道不通畅&#xff0…

考研408 | 【操作系统】终章

I/O设备的基本概念和分类 I/O设备&#xff1a; I/O设备的分类 1.按使用特性&#xff1a; 2.按传输速率分类&#xff1a; 3.按信息交换的单位分类&#xff1a; 总结&#xff1a; I/O控制器 I/O设备的机械部件&#xff1a; I/O设备的电子部件&#xff08;I/O控制器&#…