1. 背景
Golang 是谷歌开发的一种静态强类型、编译、并发和垃圾收集编程语言。围棋富有表现力,干净,高效。它的并发机制使得编写最大限度地利用多核和网络机器的程序变得容易,它的创新类型系统使得灵活和模块化的程序构造成为可能。Go 可以快速编译成机器代码,但具有垃圾收集的便利性和运行时反射的强大功能。它是一个快速的、静态类型的、编译语言的,就像一个动态类型的、直译语言的。
MQTT 是一种基于发布/订阅模型的轻量级物联网消息传递协议,它只需要很少的代码和带宽,就可以为物联网设备提供实时可靠的消息传递服务。它适用于硬件资源有限的设备和带宽有限的网络环境。因此,MQTT 协议广泛应用于物联网、移动互联网、物联网、电力等行业。源码GitHub - emqx/emqx-rel: Release Project for EMQ X Broker prior to 4.3. Newer releases are built here: https://github.com/emqx/emqx
本文主要介绍如何在 Golang 项目中使用 paho.MQTT.Golang 客户端库,并实现客户端与 MQTT 代理之间的连接、订阅和消息传递。
2. 设计原理
3. 服务部署
方法一:命令安装Installation | EMQX 3.0 Documentation
- apt-get install lksctp-tools
- curl -s https://assets.emqx.com/scripts/install-emqx-deb.sh | sudo bash
- sudo apt-get install emqx
方法二:源码编译
- $ git clone https://github.com/emqx/emqx-rel.git emqx-rel
$ cd emqx-rel
$ git checkout $(git describe --tags $(git rev-list --tags --max-count=1))
$ make emqx-pkg
$ ls _packages/emqx
4. go 生产消费源码
common.go
package main
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
var broker = "你的broker ip"
var port = 1883
var userName = "emqx"
var passwd = "你的密码"
var topic = "topic/test"
func sub(client mqtt.Client, producer bool) {
token := client.Subscribe(topic, 1, nil)
token.Wait()
if producer {
fmt.Printf("Producer subscribed to topic %s", topic)
} else {
fmt.Printf("Consumer subscribed to topic %s", topic)
}
}
producer.go
package main
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"time"
)
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Producer Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}
func producerPoint() {
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("go_mqtt_producer")
opts.SetUsername(userName)
opts.SetPassword(passwd)
opts.SetKeepAlive(8 * time.Second)
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
sub(client, true)
publish(client)
time.Sleep(30 * time.Second)
client.Disconnect(250)
}
func publish(client mqtt.Client) {
num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %d", i)
token := client.Publish(topic, 0, false, text)
token.Wait()
time.Sleep(time.Second)
}
}
consumer.go
package main
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"time"
)
var messageRecHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Clenit Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
// 其实consumer既可以收到消息,也可以发送消息
// 作为互联网硬件收集器,采集的环境信息数据(温度、湿度等)发送到broker
// 作为互联网硬件执行器,可以接受broker的消息(执行指令信息,如显示文字、声音等),并根据消息执行硬件行为
func consumerPoint() {
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("go_mqtt_consumer")
opts.SetUsername(userName)
opts.SetPassword(passwd)
opts.SetKeepAlive(8 * time.Second)
opts.SetDefaultPublishHandler(messageRecHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
sub(client, false)
time.Sleep(30 * time.Second)
client.Disconnect(250)
}
main.go
package main
import "time"
func main() {
go consumerPoint()
go producerPoint()
time.Sleep(30 * time.Second)
}
结果:
5. 小结
- 其实consumer既可以收到消息,也可以发送消息
- 作为互联网硬件收集器,采集的环境信息数据(温度、湿度等)发送到broker
- 作为互联网硬件执行器,可以接受broker的消息(执行指令信息,如显示文字、声音等),并根据消息执行硬件行为