golang开源的可嵌入应用程序高性能的MQTT服务

news2025/1/11 8:16:30

golang开源的可嵌入应用程序高性能的MQTT服务

什么是MQTT?

MQTT(Message Queuing Telemetry Transport)是一种轻量级的、开放的消息传输协议,设计用于在低带宽、高延迟或不可靠的网络环境中进行通信。MQTT最初由IBM开发,现已成为OASIS标准。
MQTT的设计目标是提供一种简单、轻量、可扩展的协议,适用于各种设备和网络条件。它通常用于物联网(IoT)和传感器网络,其中设备需要以有效的方式进行通信,并且资源(如带宽和电池寿命)可能受到限制。
MQTT的简单设计和适用性使其成为物联网中常用的通信协议之一。它被广泛用于传感器网络、嵌入式设备、移动应用程序和其他场景中,提供了一种可靠、高效的消息传输机制。

什么是Mochi-MQTT

源代码地址:https://github.com/mochi-mqtt/server

Mochi MQTT 是一个完全兼容 MQTT v5的可嵌入的中间件/服务器,完全使用 Go 语言编写,旨在用于遥测和物联网项目的开发。它可以作为独立的二进制文件使用,也可以嵌入到你自己的应用程序中库来使用,经过提出的设计以实现问题的轻量化和快速部署,同时也非常重视代码的质量和可维护性。

用途

物联网项目开发时,常常需要使用MQTT协议对设备接入,在很多场景中,私有化部署物联网系统时资源比较少,性能要求高,一些大型的MQTT服务不满足要求,而且代码不可控。
还有在边缘场景下,需要在边缘网关,边缘控制器设备上部署物联网系统,但是边缘网关的资源很少,内存大约只有4G,所以使用java开发的物联网系统就很难部署上去;使用C/C++开发效率又很低,所以Go语言是最合适的,
Mochi-MQTT刚好又完全是Go编写的开源的,可以嵌入到自己的程序启动。

Mochi MQTT独立部署

Golang的环境配置这里不做说明,请看我前面的博文说明

Mochi MQTT 可以作为独立的中间件使用。只需拉取此仓库代码,然后在 cmd 文件夹中运行 cmd/main.go ,默认将开启下面几个服务端口, tcp (:1883)、websocket (:1882) 和服务状态监控 (:8080) 。

cd cmd
go build -o mqtt && ./mqtt

docker部署

可以从 Docker Hub 仓库中拉取并运行Mochi MQTT官方镜像:

docker pull mochimqtt/server
或者
docker run mochimqtt/server

也提供了一个简单的 Dockerfile,用于运行 cmd/main.go 中的 Websocket(:1882)、TCP(:1883) 和服务端状态信息(:8080)这三个服务监听:

docker build -t mochi:latest .
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 mochi:latest

嵌入自己项目运行和开发

下载Mochi MQTT包

go get github.com/mochi-mqtt/server/v2

将Mochi MQTT作为包导入使用, 示例代码如下

import (
  mqttServer "github.com/mochi-mqtt/server/v2"
	"github.com/mochi-mqtt/server/v2/listeners"
	"github.com/mochi-mqtt/server/v2/packets"
)

var Server *mqttServer.Server

func ServerMqttInit() {
	// 创建新的 MQTT 服务器。
	Server = mqttServer.New(&mqttServer.Options{
		InlineClient: true, // 启动内联客户端
	})
	
	// 初始化数据库实例
	edge := &edgeHook{deviceDao: deviceDao.NewDeviceRepository(),
		productDao:     productDao.NewProductRepository(),
	}
	// 添加自定义权限方法
	err := Server.AddHook(edge, nil)
	if err != nil {
		log.Fatal(err)
	}

	// 在1883端口上创建一个 TCP 服务端。
	tcp := listeners.NewTCP("t1", ":1883", nil)
	err = Server.AddListener(tcp)
	if err != nil {
		log.Fatal(err)
	}

	// 在1882端口上创建一个 Websocket 服务端。
	ws := listeners.NewWebsocket("ws1", ":1882", nil)
	err = server.AddListener(ws)
	if err != nil {
		log.Fatal(err)
	}

	go func() {
		err := Server.Serve()
		if err != nil {
			log.Fatal(err)
		}
	}()
}

type edgeHook struct {
	mqttServer.HookBase
	deviceDao      deviceDao.DeviceRepository
	productDao     productDao.ProductRepository
}

func (h *edgeHook) ID() string {
	return "mqtt-auth"
}

func (h *edgeHook) Provides(b byte) bool {
	// 实现钩子函数
	return bytes.Contains([]byte{
		//MQTT连接时认证. 当用户尝试与服务器进行身份验证时调用。
		mqttServer.OnConnectAuthenticate,
		//MQTT topic权限控制. 当用户尝试发布或订阅主题时调用,用来检测ACL规则。
		mqttServer.OnACLCheck,
		//在新客户端连接并进行身份验证后,会立即调用此方法,并在会话建立和发送CONNACK之前立即调用。
		mqttServer.OnSessionEstablish,
		//当客户端因任何原因断开连接时调用。
		mqttServer.OnDisconnect,
		//当客户端向订阅者发布消息后调用。
		mqttServer.OnPublished,
	}, []byte{b})
}

// OnConnectAuthenticate MQTT连接时认证. 当用户尝试与服务器进行身份验证时调用。
func (h *edgeHook) OnConnectAuthenticate(cl *mqttServer.Client, pk packets.Packet) bool {
	username := string(pk.Connect.Username)
	password := string(pk.Connect.Password)
	if username == "" || len(username) == 0 {
		return false
	}
	if password == "" || len(password) == 0 {
		return false
	}
	return true
}

// OnACLCheck MQTT topic权限控制. 当用户尝试发布或订阅主题时调用,用来检测ACL规则。
func (h *edgeHook) OnACLCheck(cl *mqttServer.Client, topic string, write bool) bool {
	username := string(cl.Properties.Username)
	if username == "" || len(username) == 0 {
		return false
	}
	if topic == "" || len(topic) == 0 {
		return false
	}
	return true
}

// OnSessionEstablish 在新客户端连接并进行身份验证后,会立即调用此方法,并在会话建立和发送CONNACK之前立即调用。
func (h *edgeHook) OnSessionEstablish(cl *mqttServer.Client, pk packets.Packet) {
	username := string(cl.Properties.Username)
	if username == "" || len(username) == 0 {
		return
	}
	//设备连接MQTT成功后保存设备在线状态
}

// OnDisconnect 当客户端因任何原因断开连接时调用。
func (h *edgeHook) OnDisconnect(cl *mqttServer.Client, err error, expire bool) {
	username := string(cl.Properties.Username)
	if username == "" || len(username) == 0 {
		return
	}
	//设备断开MQTT成功后保存设备离线状态
}

// OnPublished 当客户端向订阅者发布消息后调用。
func (h *edgeHook) OnPublished(cl *mqttServer.Client, pk packets.Packet) {
	Log.Infof("mqtt server OnPublished info topic=%s, msg=%s", pk.TopicName, string(pk.Payload))
	//收到客户端消息后做业务逻辑处理
}

// 使用内联客户端方式,向MQTT发送消息
func PublishMsg(topic string, msg []byte) bool {
	err := Server.Publish(topic, msg, false, 0)
	if err != nil {
		Log.Errorf("mqtt EdgePublish error=%v, topic=%s, msg=%s", err, topic, msg)
		return false
	}
	return true
}

// 使用内联客户端方式,订阅边缘MQTT消息topic
func SubscribeTopic(topic string, subscriptionId int, callback func(topic string, msg []byte)) {
	callbackFn := func(cl *mqttServer.Client, sub packets.Subscription, pk packets.Packet) {
		Log.Info("mqtt EdgeSubscribe received message", "client", cl.ID, "subscriptionId", sub.Identifier,
			"topic", pk.TopicName, "payload", string(pk.Payload))
		callback(pk.TopicName, pk.Payload)
	}
	_ = Server.Subscribe(topic, subscriptionId, callbackFn)
}

// 使用内联客户端方式,取消订阅边缘MQTT消息topic
func UnsubscribeTopic(topic string, subscriptionId int) {
	_ = Server.Unsubscribe(topic, subscriptionId)
}

func main() {
	// 创建信号用于等待服务端关闭信号
  sigs := make(chan os.Signal, 1)
  done := make(chan bool, 1)
  signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
  go func() {
    <-sigs
    done <- true
  }()
  
  <-done
	Log.Error("caught signal, stopping...")
	Server.Close()
	Log.Error("main.go finished")
}

监控MQTT指标信息

mqttRouters := r.Group("/mqtt", func(context *gin.Context) {})
	{
		mqttRouters.GET("stats", func(c *gin.Context) {
			util.R(c, nil, mqtt.Server.Info)
		})
	}

在这里插入图片描述

详情使用指南请看:https://github.com/mochi-mqtt/server

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1423993.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在windows和Linux中的安装 boost 以及 安装 muduo

二、安装boost boost官网&#xff1a;boost官网 我下载的boost版本&#xff1a; windows:boost_1_84_0.ziplinux:boost_1_84_0.tar.gz 2.1 在windows中安装boost和测试 &#xff08;1&#xff09;在windows中&#xff0c;解压这个压缩包boost_1_84_0.zip&#xff0c;路径为…

2024Node.js零基础教程(小白友好型),nodejs新手到高手,(三)NodeJS入门——http协议

033_HTTP协议_初识HTTP协议 hello&#xff0c;大家好&#xff0c;这个小节我们来认识一下 http协议。 http是几个单词的首字母拼写&#xff0c;全称为Hypertext Transfer Protocol 译为超文本传输协议&#xff0c;那么这个http协议是互联网上应用最广泛的协议之一。顺便说一下…

壹[1],Xamarin开发

1&#xff0c;环境 VS2022 注&#xff1a; 1&#xff0c;本来计划使用AndroidStudio&#xff0c;但是也是一堆莫名的配置让人搞得很神伤&#xff0c;还是回归C#。 2&#xff0c;MAUI操作类似&#xff0c;但是很多错误解来解去&#xff0c;且调试起来很卡。 3&#xff0c;最…

【3DGS】从新视角合成到3D Gaussian Splatting

文章目录 引言&#xff1a;什么是新视角合成任务定义一般步骤NeRF的做法NeRF的三维重建NeRF的渲染 3DGS的三维重建从一组图片估计点云高斯点云模型球谐函数参数优化损失函数和协方差矩阵的优化高斯点的数量控制(Adaptive Density Control)新的问题 3DGS的渲染&#xff1a;快速可…

[网络安全]IIS---FTP服务器 、serverU详解

一 . FTP服务器(File Transfor Protocol) : 协议:文件传输协议 端口号:TCP: 20(数据) / 21(控制) 二 . FTP工作方式: 1.主动模式 : (FTP服务器21端口与FTP客户端产生的随机端口先建立连接 建立连接后,再使用FTP服务器21端口与FTP客户端创建的一个新的随机端口进行发送…

Mobileye CES 2024 自动驾驶新技术新方向

Mobileye亮相2024年国际消费类电子产品展览会推出什么自动驾驶新技术? Mobileye再次亮相CES,展示了我们的最新技术,并推出了Mobileye DXP--我们全新的驾驶体验平台。 与往年一样,Mobileye是拉斯维加斯展会现场的一大亮点,让参观者有机会见证我们对自主未来的愿景。 在…

Wpf 使用 Prism 实战开发Day16

客户端使用RestSharp库调用WebApi 动态加载数据 在MyDoTo客户端中&#xff0c;使用NuGet 安装两个库 RestSharp Newtonsoft.Json 一. RestSharp 简单的使用测试例子 当前章节主要目的是&#xff1a;对RestSharp 库&#xff0c;根据项目需求再次进行封装。下面先做个简单的使用…

前端开发基于Qunee绘制网络拓扑图总结-02

1、渲染连线颜色 *关键函数一定要调用&#xff1a;graph.invalidate()* graph.forEach(function(element) {if (element instanceof Q.Edge) {let arr [#549BF1, #AA8A6E, #8F54F1,#5A70BC,#BCBF5C, #BC5A76, #67B4D4,#B4C9EF, #676AD4, #A86EAA,#5CBF7F, #EFB4B4];let inde…

CapCut - 剪映国际版11.0.0

【应用名称】&#xff1a;CapCut - 剪映国际版 【适用平台】&#xff1a;#Android 【软件标签】&#xff1a;#CapCut #剪映国际版 【应用版本】&#xff1a;11.0.0 【应用大小】&#xff1a;231MB 【软件说明】&#xff1a;软件升级更新。目前大家广泛使用的最令人惊叹、最专业…

负载均衡下的webshell上传+nginx解析漏洞

负载均衡下的webshell上传 一&#xff0c;负载均衡下webshell上传的四大难点 难点一&#xff1a;需要在每一台节点的相同位置上传相同内容的webshell 我们需要在每一台节点的相同位置都上传相同内容的 WebShell一旦有一台机器上没有&#xff0c;那么在请求轮到这台机器上的时…

C++ hash—unordered_mapset

目录 一. unordered系列关联式容器 1、文档说明 2、接口说明 1. 构造 2. 容量 3. 迭代器 4. 元素访问 5. 查询 6. 修改 7. 桶操作 8. 测试 二、unordered_set 1、​​​​​​​文档说明 2、接口说明 1. 构造 2. 容量 3. 迭代器 4. 元素访问 5. 插入和删除…

AI赋能编程 | 自动化工具助力高效办公

前言 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家&#xff1a;https://www.captainbed.cn/z ChatGPT体验地址 文章目录 前言泡泡AI工具卡片思维导图Markdown编辑器 其他工具文件免费处理工具结语 合集…

SpringBoot 使用WebSocket功能

实现步骤&#xff1a; 1.导入WebSocket坐标。 在pom.xml中增加依赖项&#xff1a; <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId> </dependency>2.编写WebSocket配…

查看 npm的一些命令,以及npm config set registry x x x 不生效 解决方案

在 Mac 上查看自己的 npm 源&#xff0c;可以使用以下命令&#xff1a; 打开终端应用程序&#xff08;Terminal&#xff09;。 运行以下命令来查看当前的 npm 配置&#xff1a; npm config list这会显示 npm 的配置信息&#xff0c;包括当前使用的源&#xff08;registry&am…

FCIS 2023:洞悉网络安全新前沿,引领未来安全创新狂潮

在数字化浪潮席卷全球的今天&#xff0c;网络安全问题愈发凸显其重要性。 FCIS 2023网络安全创新大会作为业界瞩目的盛会&#xff0c;不仅汇聚了国际顶尖的网络安全专家&#xff0c;更展示了最前沿的安全技术与研究成果。那么&#xff0c;参与这场大会&#xff0c;我们究竟能学…

MySQL-DQL(Data Query Language)数据查询语言

文章目录 1. DQL定义2. 基础查询3. 条件查询&#xff08;WHERE&#xff09;4. 分组查询&#xff08;GROUP BY&#xff09;5. 过滤分组&#xff08;HAVING&#xff09;6. 排序&#xff08;ORDER BY&#xff09;7. 限制查询结果的条数&#xff08;LIMIT&#xff09;8. 多表查询8.…

OG Trade在ZKX揭幕:一家基于Starknet的游戏化永续合约交易所

ZKX的 OG Trade通过内置游戏化和30分钟交易竞赛&#xff0c;为所有交易者创造机会&#xff0c;革新了永续合约交易模式。 2024年1月30日 — ZKX宣布推出OG Trade&#xff0c;这是一家基于Starknet的游戏化永续合约交易所&#xff0c;旨在满足短期交易者、高水平交易者和波段交易…

数据可视化工具之选,三选一?

在数据可视化的世界中&#xff0c;选择一款合适的工具对于提升工作效率和洞察力至关重要。本文将对三款主流数据可视化工具进行详细比较&#xff0c;包括山海鲸可视化、Echarts和D3.js&#xff0c;以帮助您做出明智的选择。 山海鲸可视化 山海鲸可视化是一款免费且功能强大的…

全面掌握Django的web框架Django Rest_Framework(一)

文章目录 Django Rest_Framework1. DRF介绍2.DRF特点3.环境安装与配置&#xff08;1&#xff09;DRF需要以下依赖&#xff08;2&#xff09;创建django项目 4.序列化器的使用&#xff08;1&#xff09;创建序列化器 5. 反序列化器使用 Django Rest_Framework 1. DRF介绍 Djan…

phar反序列化漏洞

基础&#xff1a; Phar是一种PHP文件归档格式&#xff0c;它类似于ZIP或JAR文件格式&#xff0c;可以将多个PHP文件打包成一个单独的文件&#xff08;即Phar文件&#xff09;。 打包后的Phar文件可以像普通的PHP文件一样执行&#xff0c;可以包含PHP代码、文本文件、图像等各…