纯golang开发的mqtt server

news2024/11/26 0:23:24

Mochi-MQTT Server

github地址:https://github.com/mochi-mqtt/server

Mochi-MQTT 是一个完全兼容的、可嵌入的高性能 Go MQTT v5(以及 v3.1.1)中间件/服务器。

Mochi MQTT 是一个完全兼容 MQTT v5 的可嵌入的中间件/服务器,完全使用 Go 语言编写,旨在用于遥测和物联网项目的开发。它可以作为独立的二进制文件使用,也可以嵌入到你自己的应用程序中作为库来使用,经过精心设计以实现尽可能的轻量化和快速部署,同时也极为重视代码的质量和可维护性。

什么是 MQTT?

MQTT 代表 MQ Telemetry Transport。它是一种发布/订阅、非常简单和轻量的消息传递协议,专为受限设备和低带宽、高延迟或不可靠网络设计而成(了解更多)。Mochi MQTT 实现了完整的 MQTT 协议的 5.0.0 版本。

Mochi-MQTT 特性
  • 完全兼容 MQTT v5 功能,与 MQTT v3.1.1 和 v3.0.0 兼容:
    • MQTT v5 用户和数据包属性
    • 主题别名(Topic Aliases)
    • 共享订阅(Shared Subscriptions)
    • 订阅选项和订阅标识符(Identifiers)
    • 消息过期(Message Expiry)
    • 客户端会话过期(Client Session Expiry)
    • 发送和接收 QoS 流量控制配额(Flow Control Quotas)
    • 服务器端的断开连接和数据包的权限验证(Auth Packets)
    • 遗愿消息延迟间隔(Will Delay Intervals)
    • 还有 Mochi MQTT v1 的所有原始 MQTT 功能,例如完全的 QoS(0,1,2)、$SYS 主题、保留消息等。
  • 面向开发者:
    • 核心代码都已开放并可访问,以便开发者完全控制。
    • 功能丰富且灵活的基于钩子(Hook)的接口系统,支持便捷的“插件(plugin)”开发。
    • 使用特殊的内联客户端(inline client)进行服务端的消息发布,也支持服务端伪装成现有的客户端。
  • 高性能且稳定:
    • 基于经典前缀树 Trie 的主题-订阅模型。
    • 客户端特定的写入缓冲区,避免因读取速度慢或客户端不规范行为而产生的问题。
    • 通过所有 Paho互操作性测试(MQTT v5 和 MQTT v3)。
    • 超过一千多个经过仔细考虑的单元测试场景。
  • 支持 TCP、Websocket(包括 SSL/TLS)和$SYS 服务状态监控。
  • 内置 基于Redis、Badger、Pebble 和 Bolt 的持久化(使用Hook钩子,你也可以自己创建)。
  • 内置基于规则的认证和 ACL 权限管理(使用Hook钩子,你也可以自己创建)。

兼容性说明(Compatibility Notes)

由于 v5 规范与 MQTT 的早期版本存在重叠,因此服务器可以接受 v5 和 v3 客户端,但在连接了 v5 和 v3 客户端的情况下,为 v5 客户端提供的属性和功能将会对 v3 客户端进行降级处理(例如用户属性)。

对于 MQTT v3.0.0 和 v3.1.1 的支持被视为混合兼容性。在 v3 规范中没有明确限制的情况下,将使用更新的和以安全为首要考虑的 v5 规范 - 例如保留的消息(retained messages)的过期处理,待发送消息(inflight messages)的过期处理、客户端过期处理以及QOS消息数量的限制等。

版本更新时间

除非涉及关键问题,新版本通常在周末发布。

规划路线图(Roadmap)

  • 请提出问题来请求新功能或新的hook钩子接口!
  • 集群支持。
  • 统计度量支持。

快速开始(Quick Start)

使用 Go 运行服务端

Mochi MQTT 可以作为独立的中间件使用。只需拉取此仓库代码,然后在 cmd 文件夹中运行 cmd/main.go ,默认将开启下面几个服务端口, tcp (:1883)、websocket (:1882) 和服务状态监控 (:8080) 。

cd cmd
go build -o mqtt && ./mqtt

使用 Docker

你现在可以从 Docker Hub 仓库中拉取并运行Mochi MQTT官方镜像:

docker pull mochimqtt/server
或者
docker run -v $(pwd)/config.yaml:/config.yaml mochimqtt/server

一般情况下,您可以使用基于文件的方式来配置服务端,只需指定一个有效的 yaml 或 json 配置文件。
我们提供了一个简单的 Dockerfile,用于运行 cmd/main.go 中的 Websocket(:1882)、TCP(:1883) 和服务端状态信息(:8080)这三个网络服务,它使用了一个 allow-all 的鉴权策略(Hook)。

docker build -t mochi:latest .
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 -v $(pwd)/config.yaml:/config.yaml mochi:latest

基于文件的配置

你可以使用基于文件的配置与 Docker 镜像(上节所述)一起使用,或者通过运行编译好的可执行文件并使用 --config=config.yaml--config=config.json 指定配置文件。

配置文件使得服务端更易于管理和维护。你可以启用和配置内置的钩子(hooks)和监听器(listeners),并指定服务器的一些选项(options)和能力(compatibilities):

listeners:
  - type: "tcp"
    id: "tcp12"
    address: ":1883"
  - type: "ws"
    id: "ws1"
    address: ":1882"
  - type: "sysinfo"
    id: "stats"
    address: ":1880"
hooks:
  auth:
    allow_all: true
options:
  inline_client: true

你可以参考请 examples/config 中的示例,以了解所有可用的配置。
有一些需要注意的地方:

  1. 如果你使用基于文件的配置,现在支持配置的hook类型只有auth、storage、debug这三种,每种类型的钩子只能有一个。
  2. 你只能在基于文件的配置中使用内置钩子(mochi-mqtt里面默认已经存在的hook,你自己创建的不算),因为钩子的配置需要先跟conf.toml的结构匹配。
  3. 你只能使用内置监听器(listeners),原因同上。

如果你需要实现自定义的钩子(Hooks)或监听器(listeners),请使用 cmd/main.go 中那样的传统方式来实现。

使用 Mochi MQTT 进行开发

将Mochi MQTT作为包导入使用

将 Mochi MQTT 作为一个包导入只需要几行代码即可开始使用。

import (
  "log"

  mqtt "github.com/mochi-mqtt/server/v2"
  "github.com/mochi-mqtt/server/v2/hooks/auth"
  "github.com/mochi-mqtt/server/v2/listeners"
)

func main() {
  // 创建信号用于等待服务端关闭信号
  sigs := make(chan os.Signal, 1)
  done := make(chan bool, 1)
  signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
  go func() {
    <-sigs
    done <- true
  }()

  // 创建新的 MQTT 服务器。
  server := mqtt.New(nil)
  
  // 允许所有连接(权限)。
  _ = server.AddHook(new(auth.AllowHook), nil)
  
  // 在标1883端口上创建一个 TCP 服务端。
  tcp := listeners.NewTCP("t1", ":1883", nil)
  err := server.AddListener(tcp)
  if err != nil {
    log.Fatal(err)
  }
  

  go func() {
    err := server.Serve()
    if err != nil {
      log.Fatal(err)
    }
  }()

  // 服务端等待关闭信号
  <-done

  // 关闭服务端时需要做的一些清理工作
}

在 examples 文件夹中可以找到更多使用不同配置运行服务端的示例。

网络监听器 (Network Listeners)

服务端内置了一些已经实现的网络监听(Network Listeners),这些Listeners允许服务端接受不同协议的连接。当前的监听Listeners有这些:

ListenerUsage
listeners.NewTCP一个 TCP 监听器,接收TCP连接
listeners.NewUnixSock一个 Unix 套接字监听器
listeners.NewNet一个 net.Listener 监听
listeners.NewWebsocket一个 Websocket 监听器
listeners.NewHTTPStats一个 HTTP $SYS 服务状态监听器
listeners.NewHTTPHealthCheck一个 HTTP 健康检测监听器,用于为例如云基础设施提供健康检查响应

可以使用listeners.Listener接口开发新的监听器。如果有兴趣,你可以实现自己的Listener,如果你在此期间你有更好的建议或疑问,你可以提交问题给我们。

可以在*listeners.Config 中配置TLS,传递给Listener使其支持TLS。
我们提供了一些示例,可以在 示例 文件夹或 cmd/main.go 中找到。

服务端选项和功能(Server Options and Capabilities)

有许多可配置的选项(Options)可用于更改服务器的行为或限制对某些功能的访问。

server := mqtt.New(&mqtt.Options{
  Capabilities: mqtt.Capabilities{
    MaximumSessionExpiryInterval: 3600,
    Compatibilities: mqtt.Compatibilities{
      ObscureNotAuthorized: true,
    },
  },
  ClientNetWriteBufferSize: 4096,
  ClientNetReadBufferSize: 4096,
  SysTopicResendInterval: 10,
  InlineClient: false,
})

请参考 mqtt.Options、mqtt.Capabilities 和 mqtt.Compatibilities 结构体,以查看完整的所有服务端选项。ClientNetWriteBufferSize 和 ClientNetReadBufferSize 可以根据你的需求配置调整每个客户端的内存使用状况。

默认配置说明(Default Configuration Notes)

关于决定默认配置的值,在这里进行一些说明:

  • 默认情况下,server.Options.Capabilities.MaximumMessageExpiryInterval 的值被设置为 86400(24小时),以防止在使用默认配置时网络上暴露服务器而受到恶意DOS攻击(如果不配置到期时间将允许无限数量的保留retained/待发送inflight消息累积)。如果您在一个受信任的环境中运行,或者您有更大的保留期容量,您可以选择覆盖此设置(设置为0 以取消到期限制)。

事件钩子(Event Hooks)

服务端有一个通用的事件钩子(Event Hooks)系统,它允许开发人员在服务器和客户端生命周期的各个阶段定制添加和修改服务端的功能。这些通用Hook钩子用于提供从认证(authentication)、持久性存储(persistent storage)到调试工具(debugging tools)等各种功能。

钩子(Hook)是可叠加的 - 你可以向服务器添加多个钩子(Hook),它们将按添加的顺序运行。一些钩子(Hook)修改值,这些修改后的值将在所有钩子返回之前传递给后续的钩子(Hook)。

类型导入包描述
访问控制mochi-mqtt/server/hooks/auth . AllowHookAllowHook 允许所有客户端连接访问并读写所有主题。
访问控制mochi-mqtt/server/hooks/auth . Auth基于规则的访问权限控制。
数据持久性mochi-mqtt/server/hooks/storage/bolt使用 BoltDB 进行持久性存储(已弃用)。
数据持久性mochi-mqtt/server/hooks/storage/badger使用 BadgerDB 进行持久性存储。
数据持久性mochi-mqtt/server/hooks/storage/pebble使用 PebbleDB 进行持久性存储。
数据持久性mochi-mqtt/server/hooks/storage/redis使用 Redis 进行持久性存储。
调试跟踪mochi-mqtt/server/hooks/debug调试输出以查看数据包在服务端的链路追踪。

许多内部函数都已开放给开发者,你可以参考上述示例创建自己的Hook钩子。如果你有更好的关于Hook钩子方面的建议或者疑问,你可以提交问题给我们。

访问控制(Access Control)

允许所有(Allow Hook)

默认情况下,Mochi MQTT 使用拒绝所有(DENY-ALL)的访问控制规则。要允许连接,必须实现一个访问控制的钩子(Hook)来替代默认的(DENY-ALL)钩子。其中最简单的钩子(Hook)是 auth.AllowAll 钩子(Hook),它为所有连接、订阅和发布提供允许所有(ALLOW-ALL)的规则。这也是使用最简单的钩子:

server := mqtt.New(nil)
_ = server.AddHook(new(auth.AllowHook), nil)

如果你将服务器暴露在互联网或不受信任的网络上,请不要这样做 - 它真的应该仅用于开发、测试和调试。

权限认证(Auth Ledger)

权限认证钩子(Auth Ledger hook)使用结构化的定义来制定访问规则。认证规则分为两种形式:身份规则(连接时使用)和 ACL权限规则(发布订阅时使用)。

身份规则(Auth rules)有四个可选参数和一个是否允许参数:

参数说明
Client客户端的客户端 ID
Username客户端的用户名
Password客户端的密码
Remote客户端的远程地址或 IP
Allowtrue(允许此用户)或 false(拒绝此用户)

ACL权限规则(ACL rules)有三个可选参数和一个主题匹配参数:

参数说明
Client客户端的客户端 ID
Username客户端的用户名
Remote客户端的远程地址或 IP
Filters用于匹配的主题数组

规则按索引顺序(0,1,2,3)处理,并在匹配到第一个规则时返回。请查看 hooks/auth/ledger.go 的具体实现。

server := mqtt.New(nil)
err := server.AddHook(new(auth.Hook), &auth.Options{
    Ledger: &auth.Ledger{
    Auth: auth.AuthRules{ // Auth 默认情况下禁止所有连接
      {Username: "peach", Password: "password1", Allow: true},
      {Username: "melon", Password: "password2", Allow: true},
      {Remote: "127.0.0.1:*", Allow: true},
      {Remote: "localhost:*", Allow: true},
    },
    ACL: auth.ACLRules{ // ACL 默认情况下允许所有连接
      {Remote: "127.0.0.1:*"}, // 本地用户允许所有连接
      {
        // 用户 melon 可以读取和写入自己的主题
        Username: "melon", Filters: auth.Filters{
          "melon/#":   auth.ReadWrite,
          "updates/#": auth.WriteOnly, // 可以写入 updates,但不能从其他人那里读取 updates
        },
      },
      {
        // 其他的客户端没有发布的权限
        Filters: auth.Filters{
          "#":         auth.ReadOnly,
          "updates/#": auth.Deny,
        },
      },
    },
  }
})

规则还可以存储为 JSON 或 YAML,并使用 Data 字段加载文件的二进制数据:

err := server.AddHook(new(auth.Hook), &auth.Options{
    Data: data, // 从字节数组(文件二进制)读取规则:yaml 或 json
})

详细信息请参阅 examples/auth/encoded/main.go。

持久化存储(Persistent Storage)

Redis

我们提供了一个基本的 Redis 存储钩子(Hook),用于为服务端提供数据持久性。你可以将这个Redis的钩子(Hook)添加到服务器中,Redis的一些参数也是可以配置的。这个钩子(Hook)里使用 github.com/go-redis/redis/v8 这个库,可以通过 Options 来配置一些参数。

err := server.AddHook(new(redis.Hook), &redis.Options{
  Options: &rv8.Options{
    Addr:     "localhost:6379", // Redis服务端地址
    Password: "",               // Redis服务端的密码
    DB:       0,                // Redis数据库的index
  },
})
if err != nil {
  log.Fatal(err)
}

有关 Redis 钩子的工作原理或如何使用它的更多信息,请参阅 examples/persistence/redis/main.go 或 hooks/storage/redis 。

Pebble DB

如果您更喜欢基于文件的存储,还有一个 PebbleDB 存储钩子(Hook)可用。它可以以与其他钩子大致相同的方式添加和配置(具有较少的选项)。

err := server.AddHook(new(pebble.Hook), &pebble.Options{
  Path: pebblePath,
  Mode: pebble.NoSync,
})
if err != nil {
  log.Fatal(err)
}

有关 pebble 钩子(Hook)的工作原理或如何使用它的更多信息,请参阅 examples/persistence/pebble/main.go 或 hooks/storage/pebble。

Badger DB

同样是基于文件的存储,还有一个 BadgerDB 存储钩子(Hook)可用。它可以以与其他钩子大致相同的方式添加和配置。

err := server.AddHook(new(badger.Hook), &badger.Options{
  Path: badgerPath,
})
if err != nil {
  log.Fatal(err)
}

有关 Badger 钩子(Hook)的工作原理或如何使用它的更多信息,请参阅 examples/persistence/badger/main.go 或 hooks/storage/badger。

还有一个 BoltDB 钩子(Hook),已被弃用,推荐使用 Badger,但如果你想使用它,请参考 examples/persistence/bolt/main.go。

使用事件钩子 Event Hooks 进行开发

在服务端和客户端生命周期中,开发者可以使用各种钩子(Hook)增加对服务端或客户端的一些自定义的处理。
所有的钩子都定义在mqtt.Hook这个接口中了,可以在 hooks.go 中找到这些钩子(Hook)函数。

最灵活的事件钩子是 OnPacketRead、OnPacketEncode 和 OnPacketSent - 这些钩子可以用来控制和修改所有传入和传出的数据包。

钩子函数说明
OnStarted在服务器成功启动时调
OnStopped在服务器成功停止时调用。
OnConnectAuthenticate当用户尝试与服务器进行身份验证时调用。必须实现此方法来允许或拒绝对服务器的访问(请参阅 hooks/auth/allow_all 或 basic)。它可以在自定义Hook钩子中使用,以检查连接的用户是否与现有用户数据库中的用户匹配。如果允许访问,则返回 true。
OnACLCheck当用户尝试发布或订阅主题时调用,用来检测ACL规则。
OnSysInfoTick当 $SYS 主题相关的消息被发布时调用。
OnConnect当新客户端连接时调用,可能返回一个错误或错误码以中断客户端的连接。
OnSessionEstablish在新客户端连接并进行身份验证后,会立即调用此方法,并在会话建立和发送CONNACK之前立即调用。
OnSessionEstablished在新客户端成功建立会话(在OnConnect之后)时调用。
OnDisconnect当客户端因任何原因断开连接时调用。
OnAuthPacket当接收到认证数据包时调用。它旨在允许开发人员创建自己的 MQTT v5 认证数据包处理机制。在这里允许数据包的修改。
OnPacketRead当从客户端接收到数据包时调用。允许对数据包进行修改。
OnPacketEncode在数据包被编码并发送给客户端之前立即调用。允许修改数据包。
OnPacketSent在数据包已发送给客户端后调用。
OnPacketProcessed在数据包已接收并成功由服务端处理后调用。
OnSubscribe当客户端订阅一个或多个主题时调用。允许修改数据包。
OnSubscribed当客户端成功订阅一个或多个主题时调用。
OnSelectSubscribers当订阅者已被关联到一个主题中,在选择共享订阅的订阅者之前调用。允许接收者修改。
OnUnsubscribe当客户端取消订阅一个或多个主题时调用。允许包修改。
OnUnsubscribed当客户端成功取消订阅一个或多个主题时调用。
OnPublish当客户端发布消息时调用。允许修改数据包。
OnPublished当客户端向订阅者发布消息后调用。
OnPublishDropped消息传递给客户端之前消息已被丢弃,将调用此方法。 例如当客户端响应时间过长需要丢弃消息时。
OnRetainMessage当消息被保留时调用。
OnRetainPublished当保留的消息被发布给客户端时调用。
OnQosPublish当发出QoS >= 1 的消息给订阅者后调用。
OnQosComplete在消息的QoS流程走完之后调用。
OnQosDropped在消息的QoS流程未完成,同时消息到期时调用。
OnPacketIDExhausted当packet ids已经用完后,没有可用的id可再分配时调用。
OnWill当客户端断开连接并打算发布遗嘱消息时调用。允许修改数据包。
OnWillSent遗嘱消息发送完成后被调用。
OnClientExpired在客户端会话已过期并应删除时调用。
OnRetainedExpired在保留的消息已过期并应删除时调用。
StoredClients这个接口需要返回客户端列表,例如从持久化数据库中获取客户端列表。
StoredSubscriptions返回客户端的所有订阅,例如从持久化数据库中获取客户端的订阅列表。
StoredInflightMessages返回待发送消息(inflight messages),例如从持久化数据库中获取到还有哪些消息未完成传输。
StoredRetainedMessages返回保留的消息,例如从持久化数据库获取保留的消息。
StoredSysInfo返回存储的系统状态信息,例如从持久化数据库获取的系统状态信息。

如果你想自己实现一个持久化存储的Hook钩子,请参考现有的持久存储Hook钩子以获取灵感和借鉴。如果您正在构建一个身份验证Hook钩子,您将需要实现OnACLCheck 和 OnConnectAuthenticate这两个函数接口。

内联客户端 (Inline Client v2.4.0+支持)

现在可以通过使用内联客户端功能直接在服务端上订阅主题和发布消息。内联客户端是内置在服务端中的特殊的客户端,可以在服务端的配置中启用:

server := mqtt.New(&mqtt.Options{
  InlineClient: true,
})

启用上述配置后,你将能够使用 server.Publish、server.Subscribe 和 server.Unsubscribe 方法来在服务端中直接发布和接收消息。

具体如何使用请参考 direct examples 。

内联发布(Inline Publish)

要想在服务端中直接发布Publish一个消息,可以使用 server.Publish方法。

err := server.Publish("direct/publish", []byte("packet scheduled message"), false, 0)

在这种情况下,QoS级别只对订阅者有效,按照 MQTT v5 规范。

内联订阅(Inline Subscribe)

要想在服务端中直接订阅一个主题,可以使用 server.Subscribe方法并提供一个处理订阅消息的回调函数。内联订阅的 QoS默认都是0。如果您希望对相同的主题有多个回调,可以使用 MQTTv5 的 subscriptionId 属性进行区分。

callbackFn := func(cl *mqtt.Client, sub packets.Subscription, pk packets.Packet) {
    server.Log.Info("inline client received message from subscription", "client", cl.ID, "subscriptionId", sub.Identifier, "topic", pk.TopicName, "payload", string(pk.Payload))
}
server.Subscribe("direct/#", 1, callbackFn)
取消内联订阅(Inline Unsubscribe)

如果您使用内联客户端订阅了某个主题,如果需要取消订阅。您可以使用 server.Unsubscribe 方法取消内联订阅:

server.Unsubscribe("direct/#", 1)

注入数据包(Packet Injection)

如果你想要更多的服务端控制,或者想要设置特定的MQTT v5属性或其他属性,你可以选择指定的客户端创建自己的发布包(publish packets)。这种方法允许你将MQTT数据包(packets)直接注入到运行中的服务端,相当于服务端直接自己模拟接收到了某个客户端的数据包。

数据包注入(Packet Injection)可用于任何MQTT数据包,包括ping请求、订阅等。你可以获取客户端的详细信息,因此你甚至可以直接在服务端模拟某个在线的客户端,发布一个数据包。

大多数情况下,您可能希望使用上面描述的内联客户端(Inline Client),因为它具有独特的特权:它可以绕过所有ACL和主题验证检查,这意味着它甚至可以发布到$SYS主题。你也可以自己从头开始制定一个自己的内联客户端,它将与内置的内联客户端行为相同。

cl := server.NewClient(nil, "local", "inline", true)
server.InjectPacket(cl, packets.Packet{
  FixedHeader: packets.FixedHeader{
    Type: packets.Publish,
  },
  TopicName: "direct/publish",
  Payload: []byte("scheduled message"),
})

MQTT数据包仍然需要满足规范的结构,所以请参考测试用例中数据包的定义 和 MQTTv5规范 以获取一些帮助。

具体如何使用请参考 hooks example 。

测试(Testing)

单元测试(Unit Tests)

Mochi MQTT 使用精心编写的单元测试,测试了一千多种场景,以确保每个函数都表现出我们期望的行为。您可以使用以下命令运行测试:

go run --cover ./...
Paho 互操作性测试(Paho Interoperability Test)

您可以使用 examples/paho/main.go 启动服务器,然后在 interoperability 文件夹中运行 python3 client_test5.py 来检查代理是否符合 Paho互操作性测试 的要求,包括 MQTT v5 和 v3 的测试。

请注意,关于 paho 测试套件存在一些尚未解决的问题,因此在 paho/main.go 示例中启用了某些兼容性模式。

基准测试(Performance Benchmarks)

Mochi MQTT 的性能与其他的一些主流的mqtt中间件(如 Mosquitto、EMQX 等)不相上下。

基准测试是使用 MQTT-Stresser 在 Apple Macbook Air M2 上进行的,使用 cmd/main.go 默认设置。考虑到高低吞吐量的突发情况,中位数分数是最有用的。数值越高越好。

基准测试中呈现的数值不代表真实每秒消息吞吐量。它们依赖于 mqtt-stresser 的一种不寻常的计算方法,但它们在所有代理之间是一致的。性能基准测试的结果仅供参考。这些比较都是使用默认配置进行的。

mqtt-stresser -broker tcp://localhost:1883 -num-clients=2 -num-messages=10000

Brokerpublish fastestmedianslowestreceive fastestmedianslowest
Mochi v2.2.10124,772125,456124,614314,461313,186311,910
Mosquitto v2.0.15155,920155,919155,918185,485185,097184,709
EMQX v5.0.11156,945156,257155,56817,91817,78317,649
Rumqtt v0.21.0112,208108,480104,753135,784126,446117,108

mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=10000

Brokerpublish fastestmedianslowestreceive fastestmedianslowest
Mochi v2.2.1041,82531,66323,008144,05865,90337,618
Mosquitto v2.0.1542,72938,63329,87923,24119,71418,806
EMQX v5.0.1121,55317,41814,3564,2573,9803,756
Rumqtt v0.21.042,21323,15320,81449,46536,62619,283

百万消息挑战(立即向服务器发送100万条消息):

mqtt-stresser -broker tcp://localhost:1883 -num-clients=100 -num-messages=10000

Brokerpublish fastestmedianslowestreceive fastestmedianslowest
Mochi v2.2.1013,5324,4252,34452,1207,2742,701
Mosquitto v2.0.153,8263,3953,0321,2001,1501,118
EMQX v5.0.114,0862,4322,274434333311
Rumqtt v0.21.078,9725,0473,8044,2863,2492,027

这里还不确定EMQX是不是哪里出了问题,可能是因为 Docker 的默认配置优化不对,所以要持保留意见,因为我们确实知道它是一款可靠的软件。

贡献指南(Contribution Guidelines)

我们欢迎代码贡献和反馈!如果你发现了漏洞(bug)或者有任何疑问,又或者是有新的需求,请提交给我们。如果您提交了一个PR(pull request)请求,请尽量遵循以下准则:

  • 在合理的情况下,尽量保持测试覆盖率。
  • 清晰地说明PR(pull request)请求的作用和原因。
  • 请不要忘记在你贡献的文件中添加 SPDX FileContributor 标签。

[SPDX 注释] (https://spdx.dev) 用于智能的识别每个文件的许可证、版权和贡献。如果您正在向本仓库添加一个新文件,请确保它具有以下 SPDX 头部:

// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 mochi-mqtt
// SPDX-FileContributor: Your name or alias <optional@email.address>

package name

请确保为文件的每位贡献者添加一个新的SPDX-FileContributor 行。可以参考其他文件的示例。请务必记得这样做,你对这个项目的贡献是有价值且受到赞赏的 - 获得认可非常重要!

给我们星星的人数(Stargazers over time) 🥰

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

您是否在项目中使用 Mochi MQTT?请告诉我们!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1600207.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

YoloV9实战:从Labelme到训练、验证、测试、模块解析

模型实战 训练COCO数据集 本次使用2017版本的COCO数据集作为例子&#xff0c;演示如何使用YoloV8训练和预测。 下载数据集 Images: 2017 Train images [118K/18GB] &#xff1a;http://images.cocodataset.org/zips/train2017.zip2017 Val images [5K/1GB]&#xff1a;htt…

选择最佳 PoE 布线系统的 3 个步骤

选择合适的 POE 布线系统的重要性 在不断发展的信息和通信技术 &#xff08;ICT&#xff09; 领域&#xff0c;以太网供电 &#xff08;PoE&#xff09; 布线系统已成为一种革命性的解决方案&#xff0c;它简化了网络设备的部署和管理&#xff0c;同时优化了电力传输。从智能建…

傅里叶变换到底是什么

傅里叶变换到底是什么 有一个f&#xff08;t&#xff09;经傅里叶变换公式转化成F&#xff08;w&#xff09;&#xff1b; F&#xff08;w&#xff09;包括 欧拉公式转化成无限包括sin cos的函数相加。sin cos前面的参数a不为0说明这个周期函数分量存在&#xff0c;是某一种有…

基本模拟概念

目标&#xff1a; 讨论模拟电子技术的基本特性 描述模拟信号 分析信号源 解释放大器的特性 1.1模拟电子学 电子学可以划分成很多的分类来研究。其中最基本的一种分类方式是将信号分成可由 二进制数字表示的数字信号和由连续变化量表示的模拟信号。数字电子学包括所有的算术 和…

想做好抖音直播运营,这81个专业术语你必须得知道 沈阳新媒体运营培训

1.起号 释义&#xff1a;从0基础创建账号到1账号已具备基础模型&#xff0c;启动一个直播间&#xff0c;并使其能稳定卖出去东西的过程。简单一点来说&#xff0c;就是冷启动&#xff0c;通过活动或者其他方式获得快速大量曝光的一种形式。目前主流起号方式为&#xff1a;▼活…

【C++】unordered_set和unordered_map

底层哈希结构 namespace hash_bucket {template<class T>struct HashData{T _data;struct HashData* next nullptr;HashData(const T& data):_data(data){}};//仿函数:这里直接用开散列仿函数template <class K>struct HashFunc{size_t operator()(const K&a…

attention and tell论文【无标题】

这个公式使用LaTeX语法表示为&#xff1a; ( i t f t o t c t ) ( σ σ σ tanh ⁡ ) T D m n , n ( E y t − 1 h t − 1 x t ) \begin{pmatrix}i_t \\f_t \\o_t \\c_t\end{pmatrix} \begin{pmatrix}\sigma \\\sigma \\\sigma \\\tanh\end{pmatrix}T_{Dmn,n}\begin{pmatri…

HackMyVM-Gift

目录 信息收集 arp nmap WEB dirsearch hydra ssh连接 get root 信息收集 arp ┌─[rootparrot]─[~] └──╼ #arp-scan -l Interface: enp0s3, type: EN10MB, MAC: 08:00:27:16:3d:f8, IPv4: 192.168.9.102 Starting arp-scan 1.10.0 with 256 hosts (https://git…

动态规划算法求解最长公共子序列

动态规划算法是运筹学中求解多阶段决策问题的经典算法&#xff0c;本文将介绍动态规划算法的基本思想&#xff0c;并介绍如何使用动态规划算法求解最长公共子序列问题。 1. 动态规划算法的基本思想 动态规划算法本质也是基于分治思想&#xff0c;将待求解问题分解成若干个子问…

「GO基础」目录

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

02_JavaWeb中的Tomcat(详解)

文章目录 Tomcat1, 概述1.1 安装1.2 目录结构1.3 启动/停止 2, 资源部署2.1 直接部署: 主要和重要的方式2.2 虚拟映射: 重要2.2.1 方式一:2.2.1 方式二: 2.3 原理解析 3, Tomcat组件3.1 Connector3.2 Engine3.2.1 Host3.2.1.1 Context 4, 其它: 重要4.1 设置 Tomcat 1, 概述 w…

(踩坑)Please refer to 异常和Error creating bean with name 异常

一、Please refer to 异常 如图所示&#xff0c;在使用maven构建项目的时候&#xff0c;如果提示该错误&#xff0c;则可能是xml配置文件有问题或者测试类等。但是没有明确的异常信息&#xff0c;所以做以下小改动&#xff0c;可以查看异常信息。 在IDEA工具中&#xff0c;打…

08 SQL进阶 -- 集合运算 -- 表的连结(JOIN)

1. 连结(JOIN) 前一节我们学习了 UNION和INTERSECT 等集合运算, 这些集合运算的特征就是以行方向为单位进行操作. 通俗地说, 就是进行这些集合运算时, 会导致记录行数的增减。使用 UNION 会增加记录行数,而使用 INTERSECT 或者 EXCEPT 会减少记录行数。 但这些运算不能改变…

【Java开发指南 | 第十篇】Java修饰符

读者可订阅专栏&#xff1a;Java开发指南 |【CSDN秋说】 文章目录 JAVA修饰符访问修饰符非访问修饰符static 修饰符final 修饰符abstract 修饰符synchronized 修饰符transient 修饰符volatile 修饰符 JAVA修饰符 修饰符用来定义类、方法或者变量&#xff0c;通常放在语句的最前…

2024年【高处安装、维护、拆除】考试题库及高处安装、维护、拆除复审模拟考试

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 高处安装、维护、拆除考试题库是安全生产模拟考试一点通生成的&#xff0c;高处安装、维护、拆除证模拟考试题库是根据高处安装、维护、拆除最新版教材汇编出高处安装、维护、拆除仿真模拟考试。2024年【高处安装、维…

瞬态瑞丽波频散曲线提取

频散曲线 function [Y1, f, phase] = das_fft(signal1, signal2, Ts) [y, lag

windows编译xlnt,获取Excel表里的数据

用git拉取项目 这个文件是空的 要用git拉下来&#xff0c;使用终端编译xlnt库 点击解决方案 运行生成 然后新建项目&#xff0c;配置好库&#xff0c; #include <iostream> #include <xlnt/xlnt.hpp>int main() {// 打开 Excel 文件xlnt::workbook workbook;workb…

Ubuntu Vs code配置ROS开发环境

文章目录 1.开发环境2.集成开发环境搭建2.1 安装Ros2.2 安装 Vs code2.3 安装vs code 插件 3.Vs code 配置ROS3.1 创建ROS工作空间3.2 从文件夹启动Vs code3.3 使用Vscode 编译ROS 空间3.4 使用Vs code 创建功能包 4.编写简单Demo实例4.1编写代码4.2编译与执行 1.开发环境 系统…

【数学】主成分分析(PCA)的应用案例(Python)

接着上篇PCA推导过程文章&#xff0c;本文结合图像来展示PCA的应用过程 Jupyter notebook 源文件在这里 1 借助库函数来PCA重建 使用sklearn库函数 # Import needed libs import numpy as np import matplotlib.pyplot as plt from sklearn.datasets import load_digits from…

大模型微调的几种常见方法

在文章深入理解大语言模型微调技术中&#xff0c;我们详细了解大语言模型微调的概念和训练过程&#xff0c;本篇给大家介绍大模型微调常见的7种训练方法。 1、Adapter Tuning 2019年谷歌的研究人员首次在论文《Parameter-Efficient Transfer Learning for NLP》提出针对 BERT 的…