介绍
MQTT (Message Queuing Telemetry Transport) 是一种轻量级的发布/订阅消息协议,专为低带宽环境M2M而设计。是物联网(IoT)最常用的消息传递协议。
- 轻量高效
- 双向通信
- 可以扩展以连接数百万台物联网设备。
- 可靠的消息传递(支持3种QOS级别)
- 支持不可靠的网络(支持持久会话)
- 安全保证(支持TLS)
工作原理
连接
keepalive
如果在一定时间内(keep alive period)没有发送消息,客户端会发送PINGREQ数据包到服务器,服务器接收后返回PINGRESP,以此确认连接正常。
客户端设置了保活时间,服务器如果在客户端设置的保活时间的1.5倍仍没有收到客户端发送的PINGREQ数据包,会强制断开客户端的连接。
Retained Messages(保留消息)
通常,如果发布者向某个主题发布消息,而没有人订阅该主题,则代理会丢弃该消息。
然而,发布者可以通过设置保留消息标志(Retain Flag)来告诉代理保留该主题的最后一条消息。
Clean Sessions
如果是clean session,选项为true,客户端断开连接时,代理不会记住之前会话状态任何信息。
如果是no clean session,选项为false,代理将根据QoS级别保留客户端的订阅和未送达的消息
Last Will Messages
Last Will Messages(遗嘱消息)用于处理客户端意外断开连接的情况,通知订阅者由于网络中断导致发布者不可用。
当客户端连接到MQTT代理时,可以提前注册一条消息,该消息将在客户端意外断开时发送到指定的主题。这使得其他订阅此主题的客户端可以被通知客户端的异常断开。
Topics
Topic结构
MQTT topic的结构类似于文件系统中的文件夹和文件,使用正斜杠(/)作为分隔符。
- 区分大小写
- 使用 UTF-8 字符串。
- 必须至少包含一个字符才有效。
$SYS 主题,默认系统主题都在这个下面。
topic由订阅或发布客户端创建,并且不是永久的。以下情况会创建topic
- 有人订阅了某个topic
- 某人向主题发布一条消息,并将保留消息设置为 True。
订阅多个topic时,可以使用通配符。发布只能向单个topic发布消息,不允许使用通配符
- #(井号) – 多级通配符
-
- (加号) -单级通配符
以下情况会删除topic
- 当订阅该代理的最后一个客户端断开连接时,clean session为true。
- 客户端以clean session为 True的方式连接broker时。
Topic使用
- 由于主题区分大小写,因此仅使用小写。
- 使用前缀分隔命令和响应主题,例如 command/ 和 response/
- 不要使用带空格的主题。
- 仅使用字母、数字和破折号
- 在主题结构中包含路由信息。
发布与订阅
QOS 服务质量等级
MQTT 支持 3 个QOS级别 0、1、2。
- QOS -0 – (At most once) 默认,不保证消息一定被传递。
- QOS -1 – (At lease once) 保证消息传递,但可能会重复。
- QOS -2 - (Exactly once) 保证消息传递,并且只传递1次。
不同级别消息在客户端与代理直接的消息流:
安装使用
安装
本地开发学习可以使用 Eclipse免费的公共 MQTT 代理
https://mqtt.eclipseprojects.io/
或者自己安装一个mqtt boker
docker run -p 8080:8080 -p 1883:1883 hivemq/hivemq4
开发(基于 Eclipse paho go client 库)
Eclipse paho支持的客户端库:https://eclipse.dev/paho/index.php?page=downloads.php
package main
import (
"fmt"
//import the Paho Go MQTT library
MQTT "github.com/eclipse/paho.mqtt.golang"
"os"
"time"
)
// define a function for the default message handler
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
}
var msgRcvd = func(client MQTT.Client, msg MQTT.Message) {
fmt.Printf("Received message on TOPIC: %s Messages: %s \n", msg.Topic(), msg.Payload())
}
func main() {
//create a ClientOptions struct setting the broker address, clientid, turn
//off trace output and set the default message handler
//设置broker url和 client ID
opts := MQTT.NewClientOptions().AddBroker("tcp://mqtt.eclipseprojects.io:1883")
opts.SetClientID("go-simple")
opts.SetDefaultPublishHandler(f)
opts.SetKeepAlive(5 * time.Second)
//create and start a client using the above ClientOptions
c := MQTT.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
//subscribe to the topic /go-mqtt/sample and request messages to be delivered
//at a maximum qos of zero, wait for the receipt to confirm the subscription
if token := c.Subscribe("go-mqtt/sample", 0, msgRcvd); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
//Publish 5 messages to /go-mqtt/sample at qos 1 and wait for the receipt
//from the server after sending each message
for i := 0; i < 5; i++ {
text := fmt.Sprintf("this is msg #%d!", i)
token := c.Publish("go-mqtt/sample", 0, false, text)
token.Wait()
}
time.Sleep(300 * time.Second)
//unsubscribe from /go-mqtt/sample
if token := c.Unsubscribe("go-mqtt/sample"); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
c.Disconnect(250)
}
参考來源:
http://www.steves-internet-guide.com/
https://mqtt.org/