Python系列 - MQTT协议
资源连接
MQTT的介绍和应用场景的示例说明
一、什么是MQTT
百度关于MQTT的介绍如下:
MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布订阅范式的消息协议。它工作在 TCP/IP协议之上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的。
MQTT适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其次,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
二、 MQTT的特点
MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,因此它具有以下主要的几项特性:
-
轻量级和高效:MQTT协议的客户端代码量小,对系统资源的需求较低。小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量,提高了传输效率。
-
支持双向通信:MQTT协议支持设备到云端以及云端到设备之间的消息传递,使得向大量设备广播消息变得相对简单。
-
数据传输可靠:MQTT支持多种消息服务质量等级(QoS),确保数据按需可靠传输。QoS 0表示最多发送一次,QoS 1表示至少发送一次,QoS 2表示确保消息只有一次到达。
-
支持离线消息:通过保留消息和持久会话,MQTT支持设备离线时的消息保存和转发。
-
简单易用:MQTT协议使用 TCP/IP 提供网络连接,结构简单,易于实现和维护,适合低带宽和不可靠的网络环境。
-
发布/订阅模式:MQTT基于发布/订阅模式,消息通过主题进行分类。客户端可以订阅一个或多个主题,发布者发送消息到特定的主题,所有订阅该主题的客户端都将接收到这些消息,通过提供一对多的消息发布,解除应用程序耦合。
三、 MQTT的工作流程
MQTT的信息基于发布和订阅模式,通过主题进行相应的分类,基于发布订阅模式及其代理服务器的理解示意图:
基于MQTT的发布订阅模式下,其工作流程如下:
其中,MQTT传输的内容包括两个部分:
- 主题(Topic):主题可以理解为信息的主题,订阅者订阅(Subscribe)后,就会收到该主题的内容(payload);
- 负载(payload):可以理解为信息的内容,是指订阅的具体要使用的内容。
四、MQTT的工作模式
MQTT提供三种工作模式来保证数据的传输,三种工作模式和应用场景介绍如下:
- 至多一次:丢掉记录无所谓的场景;
- 至少一次:确保信息到达,但信息可能会重复;
- 只有一次:确保消息到达一次(小型传输,开销小)
五、MQTT的应用场景
MQTT的在如下三种场景下具有明显的优势:
- 带宽低
- 网络延迟高
- 网络通信不稳定
由于MQTT在三大场景下的优势,使得MQTT特别适合物联网(IoT)和类似场景:
-
物联网(IoT):MQTT是物联网领域中最典型的应用之一。在物联网中,大量的设备和传感器需要进行数据的收集、监控和控制。MQTT协议的特性使其成为物联网通信的理想选择。例如,智能家居系统可以使用MQTT来监控和控制家中的各种设备,如智能灯泡、恒温器等。
-
工业控制和远程监测:MQTT在工业控制和远程监测领域也有广泛的应用。它的可靠性和低带宽消耗特性使其非常适合在工业环境中进行实时数据传输和远程控制。例如,工业设备监控可以通过MQTT实时将数据上传至云端或控制中心,用于生产线状态监控、设备维护预警等。
-
即时通讯和实时数据传输:MQTT的低延迟和可靠性使其成为即时通讯和实时数据传输的理想选择。例如,聊天室和实时消息推送应用可以使用MQTT来实现用户之间的实时通讯。此外,MQTT还适用于需要实时数据传输的应用,如实时股票行情、天气数据等。
-
实时位置追踪:MQTT可以用于实时位置追踪应用。移动设备可以发布自己的位置信息,其他用户或系统可以订阅这些信息来获取实时位置数据。这对于车队管理、物流跟踪等应用非常有用。
-
其他场景:MQTT还应用于传感器数据传输、数据传输和同步、远程监控和控制系统等领域。例如,气象站、环境监测等领域可以使用MQTT进行传感器数据的采集和传输;跨平台数据同步、数据备份等场景也可以利用MQTT实现数据的实时同步。
六、python代码实现订阅
> #!/usr/bin/env python
> # 基于Pika实现对MQTT的通信
> import pika
> import sys
> import json
>
> channel = conn.channel()
>
> channel.exchange_declare(exchange='test.topic',exchange_type='topic',durable=True) #订阅的topic
>
> result = channel.queue_declare(queue='2570',
> durable=True,auto_delete=False,exclusive=False) queue_name =
> result.method.queue
>
> channel.queue_bind(
> exchange='test.topic', queue=queue_name, routing_key='order.PM02.2570')
>
> print(' [*] Waiting for logs. To exit press CTRL+C')
>
>
> def callback(ch, method, properties, body):
> #print(" [x] %r:%r" % (method.routing_key, body))
> j_text = json.loads(body)
> print(j_text)
> #j_Order = j_text['maintOrder']
> #print(j_Order)
> j_PM = j_text['pmType']
> print(j_PM)
> # 执行订阅程序
> channel.basic_consume(
> queue=queue_name, on_message_callback=callback, auto_ack=True) try:
> channel.start_consuming() except Exception as e :
> print(e)
> channel.start_consuming()
参考:
MQTT
MQTT协议
mqtt的应用场景有哪些