RabbitMQ3.13.0起支持MQTT5.0协议及MQTT5.0特性功能列表

news2024/10/6 22:32:51

RabbitMQ3.13.0起支持MQTT5.0协议及MQTT5.0特性功能列表

文章目录

  • RabbitMQ3.13.0起支持MQTT5.0协议及MQTT5.0特性功能列表
  • 1. MQTT概览
  • 2. MQTT 5.0 特性
    • 1. 特性概要
    • 2. Docker中安装RabbitMQ及启用MQTT5.0协议
  • 3. MQTT 5.0 功能列表
    • 1. 消息过期
      • 1. 描述
      • 2. 举例
      • 3. 实现
    • 2. 订阅标识符
      • 1.描述
      • 2. 举例
      • 3. 实现
    • 3. 订阅选项
      • 1. 描述
      • 2. 举例例
    • 4 所有ACK上的原因代码
      • 1. 描述
      • 2. 实现
    • 5. 用户属性
      • 1. 描述
      • 2. 示例 PUBLISH 数据包
      • 3. 示例 CONNECT 数据包
    • 6. 有效负载格式和内容类型
      • 1. 描述
      • 2. 举例
    • 7. 请求/响应
      • 1. 描述
      • 2. 举例
    • 8. 分配的客户端标识符
      • 1. 描述
      • 2. 实现
    • 9. 主题别名
      • 1. 描述
      • 2. 实现
    • 10. 流量控制
      • 1. 描述
      • 2. 实现
    • 11. 最大数据包大小
      • 1. 描述
      • 2. 举例
    • 12. 服务器启动的断开连接
      • 1. 描述
      • 2. 实现
    • 13. 会话到期
      • 1. 描述
      • 2. 实现
      • 3. 举例
    • 14. 会延迟
      • 1. 描述
      • 2. 实现
      • 3. 举例
    • 15. 可选的服务器功能可用性
      • 1. 描述
      • 2. 实现
  • 4. 局限性
    • 1. MQTT 5.0 特定限制
      • 1. 共享订阅
      • 1. 延迟和保留的遗嘱消息
    • 2. 非 MQTT 5.0 特定限制
      • 1. 保留的消息
  • 5. 总结
  • 6. 相关链接

RabbitMQ 3.12 中发布的原生 MQTT 为物联网用例提供了显著的可扩展性和性能改进。

RabbitMQ 3.13 将支持 MQTT 5.0,因此将成为我们使 RabbitMQ 成为领先的 MQTT 代理之一的下一个重要步骤。

这篇博文解释了如何在 RabbitMQ 中使用新的 MQTT 5.0 功能。

1. MQTT概览

MQTT 是物联网 (IoT) 的标准协议。

物联网远程设备在连接到代理时网络质量可能较差。 因此,MQTT是轻量级的:MQTT协议头很小,可以节省网络带宽。

由于物联网设备可能经常断开连接并重新连接(想象一下一辆汽车驶过隧道),MQTT 也很高效:与其他消息传递协议相比,客户端通过更短的握手进行连接和身份验证。

MQTT协议已经存在了很多年。 如下表所示,最新的 MQTT 协议版本为 5.0。

MQTT 版本CONNECT 数据包中的协议版本MQTT 规范发布年份自 Year 以来的 RabbitMQ 支持(版本)
3.1320102012 (3.0)
3.1.1420142014 (3.3)
5.0520192024 (3.13)

值得一提的是,面向用户的协议版本和“内部”协议版本(也称为协议级别)是有区别的。 后者在 CONNECT 数据包中从客户端发送到服务器。 由于面向用户的协议版本 3.1.1 映射到内部协议版本 4,为了避免进一步的混淆,MQTT 委员会决定跳过面向用户的版本 4.0,以便面向用户的版本 5.0 映射到内部协议版本 5。

2. MQTT 5.0 特性

1. 特性概要

附录 C. MQTT v5.0 中的新功能摘要提供了 MQTT 5.0 新功能的完整列表。

由于您在 Web 上找到了很棒的 MQTT 5.0 资源,包括说明性图表和使用模式,因此这篇博文仅关注 RabbitMQ 的细节。 本节介绍 PR #7263 中实现的最重要功能。 对于每个功能,我们提供了一个如何将其与 RabbitMQ 一起使用的示例,或者概述了如何在 RabbitMQ 中实现它的高级描述。

2. Docker中安装RabbitMQ及启用MQTT5.0协议

要自己运行示例,请启动 RabbitMQ 服务器 3.13, 例如,使用以下 Docker 镜像标记:

docker run -it --rm --name rabbitmq -p 1883:1883 -p 15672:15672 -p 15692:15692 rabbitmq:3.13.0-management

在另一个终端窗口中,启用 MQTT 插件:

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_mqtt

由于 MQTT 插件是动态启用的,因此 MQTT 插件定义的功能标志被禁用。 启用所有功能标志,包括功能标志:mqtt_v5

docker exec rabbitmq rabbitmqctl enable_feature_flag all

现在,列出功能标志应显示所有功能标志都已启用:

docker exec rabbitmq rabbitmqctl list_feature_flags --formatter=pretty_table

以下示例使用 MQTTX CLI V1.9.4。 我们使用 CLI 而不是图形 UI,以便您可以通过复制粘贴命令轻松运行示例。

所有新功能也适用于 RabbitMQ Web MQTT 插件。

3. MQTT 5.0 功能列表

1. 消息过期

1. 描述

可以为发布到代理的每条消息设置以秒为单位的到期间隔。 如果在该过期时间间隔内未使用邮件,则该邮件将被丢弃或死信。

2. 举例

为 主题 创建订阅。 这将在 RabbitMQ 中创建一个队列。 通过键入终端断开客户端连接。 由于我们使用 600 秒的会话到期间隔,因此此队列将再存在 10 分钟。t/1``Ctrl+C

mqttx sub --client-id sub-1 --topic t/1 --session-expiry-interval 600 --qos 1
…  Connecting...
✔  Connected
…  Subscribing to t/1...
✔  Subscribed to t/1
^C

将消息发布到同一主题,消息过期间隔为 30 秒:

mqttx pub --topic t/1 --message m1 --message-expiry-interval 30 --qos 1
…  Connecting...
✔  Connected
…  Message publishing...
✔  Message published

在接下来的 30 秒内,列出队列:

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages
┌─────────────────────────────┬─────────┬──────────┐
│ name                        │ type    │ messages │
├─────────────────────────────┼─────────┼──────────┤
│ mqtt-subscription-sub-1qos1 │ classic │ 1        │
└─────────────────────────────┴─────────┴──────────┘

等待 30 秒,然后再次列出队列:

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues
┌─────────────────────────────┬─────────┬──────────┐
│ name                        │ type    │ messages │
├─────────────────────────────┼─────────┼──────────┤
│ mqtt-subscription-sub-1qos1 │ classic │ 0        │
└─────────────────────────────┴─────────┴──────────┘

该消息已过期,因为客户端尚未连接到代理以使用该消息。 如果设置了死字策略,则邮件将死信发送到交易所。 在我们的例子中,死字被禁用。 查询 Prometheus 端点可证明经典队列中有 1 条消息已过期。sub-1

curl --silent localhost:15692/metrics | grep rabbitmq_global_messages_dead_lettered_expired_total
# TYPE rabbitmq_global_messages_dead_lettered_expired_total counter
# HELP rabbitmq_global_messages_dead_lettered_expired_total Total number of messages dead-lettered due to message TTL exceeded
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0

另一个有趣的功能是以下要求:

服务器发送到客户端的 PUBLISH 数据包必须包含设置为接收值减去应用程序消息在服务器中等待的时间的消息到期间隔。

向代理发送第二条消息,消息到期间隔为 60 秒:

mqttx pub --topic t/1 --message m2 --message-expiry-interval 60 --qos 1

等待 20 秒,然后重新连接订阅客户端:

mqttx sub --client-id sub-1 --topic t/1 --no-clean --session-expiry-interval 0  --qos 1 --output-mode clean
{
  "topic": "t/1",
  "payload": "m2",
  "packet": {
    ...
    "properties": {
      "messageExpiryInterval": 40
    }
  }
}

根据 MQTT 5.0 协议规范的规定,客户端接收第二条消息,消息到期间隔设置为 40 秒: 代理接收的 60 秒减去消息在代理中等待的 20 秒。

3. 实现

MQTT 5.0 消息到期是在 RabbitMQ 中使用每条消息 TTL 实现的,类似于 AMQP 0.9.1 发布者中的字段。expiration

2. 订阅标识符

1.描述

客户端可以在 SUBSCRIBE 数据包中设置订阅标识符。 如果客户端因该订阅而收到消息,则代理会将该订阅标识符包含在 PUBLISH 数据包中。

订阅标识符的用例列在 SUBSCRIBE 操作部分。

2. 举例

从同一客户端向服务器发送 3 个单独的 SUBSCRIBE 数据包,每个数据包具有不同的主题过滤器和不同的订阅标识符:

mqttx sub --client-id sub-2 --topic t/1 --subscription-identifier 1 --session-expiry-interval 600
^C
mqttx sub --client-id sub-2 --topic t/2 --subscription-identifier 2 --session-expiry-interval 600 --no-clean
^C
mqttx sub --client-id sub-2 --topic "t/#" --subscription-identifier 3 --session-expiry-interval 0 --no-clean --output-mode clean

在第二个终端窗口中,我们看到从同一队列到同一主题交换的 3 个绑定,每个绑定都具有不同的路由键:

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_bindings \
    source_name source_kind destination_name destination_kind routing_key
┌─────────────┬─────────────┬─────────────────────────────┬──────────────────┬─────────────────────────────┐
│ source_name │ source_kind │ destination_name            │ destination_kind │ routing_key                 │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│             │ exchange    │ mqtt-subscription-sub-2qos0 │ queue            │ mqtt-subscription-sub-2qos0 │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic   │ exchange    │ mqtt-subscription-sub-2qos0 │ queue            │ t.#                         │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic   │ exchange    │ mqtt-subscription-sub-2qos0 │ queue            │ t.1                         │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic   │ exchange    │ mqtt-subscription-sub-2qos0 │ queue            │ t.2                         │
└─────────────┴─────────────┴─────────────────────────────┴──────────────────┴─────────────────────────────┘

第一个条目是与默认交换的隐式绑定。

每个具有 MQTT 主题筛选器的 MQTT 订阅对应一个带有绑定键的 AMQP 0.9.1 绑定。 准确地说,表列的名称错误:应该改为调用它。 MQTT 中的主题级分隔符是 “” 字符,而 AMQP 0.9.1 主题交换中的主题级分隔符是 “” 字符。routing_key``binding_key``/``.

再次在第二个终端窗口中,向主题发送消息:t/1

mqttx pub --topic t/1 --message m1

(订阅客户端的)第一个终端窗口接收以下 PUBLISH 数据包:

{
  "topic": "t/1",
  "payload": "m1",
  "packet": {
    ...
    "properties": {
      "subscriptionIdentifier": [
        1,
        3
      ]
    }
  }
}

它包含订阅标识符 1 和 3,因为 topic filters 和 match topic .t/1``t/#``t/1

同样,如果向主题发送第二条消息,订阅客户端将收到包含订阅标识符 2 和 3 的 PUBLISH 数据包。t/2

3. 实现

订阅标识符是 MQTT 会话状态的一部分。 因此,在客户端断开连接时,订阅标识符必须保留在服务器的数据库中,直到 MQTT 会话结束。 RabbitMQ 将订阅标识符存储在绑定参数中:

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_bindings routing_key arguments
┌─────────────────────────────┬───────────────────────────────────────────────────────────────────────────────────┐
│ routing_key                 │ arguments                                                                         │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ mqtt-subscription-sub-2qos0 │                                                                                   │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.#                         │ {mqtt_subscription_opts,0,false,false,0,3}{<<"x-binding-key">>,longstr,<<"t.#">>} │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.1                         │ {mqtt_subscription_opts,0,false,false,0,1}{<<"x-binding-key">>,longstr,<<"t.1">>} │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.2                         │ {mqtt_subscription_opts,0,false,false,0,2}{<<"x-binding-key">>,longstr,<<"t.2">>} │
└─────────────────────────────┴───────────────────────────────────────────────────────────────────────────────────┘

绑定参数的确切结构并不重要,并且可能会在将来的 RabbitMQ 版本中更改。 但是,可以在绑定参数中看到整数 1、2 和 3,这些参数对应于订阅标识符。

当主题交换路由消息时,发布 Erlang 进程会将所有匹配的绑定键包含在消息中。 订阅 MQTT 客户端的 Erlang 进程将匹配的绑定密钥与其知道的 MQTT 主题过滤器进行比较,并将订阅标识符包含在发送到 MQTT 客户端的 PUBLISH 数据包中。

发布 Erlang 进程可以是 MQTT 连接进程,也可以是 AMQP 0.9.1 通道进程。 一如既往,RabbitMQ 在跨协议互操作性方面表现出色:当 AMQP 0.9.1(或 STOMP 或 AMQP 1.0)客户端向主题交换发送消息时, 正确的订阅标识符将包含在发送到 MQTT 客户端的 PUBLISH 数据包中。

3. 订阅选项

1. 描述

MQTT 5.0 提供了 3 个新的订阅选项:

  1. 无本地
  2. 保留为已发布
  3. 保留处理

所有订阅选项均由 RabbitMQ 实现。 在这里,我们只关注“保留处理”选项:

此选项指定在建立订阅时是否发送保留的消息。
这些值为:
0 = 在订阅
时发送保留消息 1 = 仅在订阅当前不存在
时在订阅时发送保留消息 2 = 在订阅时不发送保留的消息

2. 举例例

发送保留的消息:

mqttx pub --topic mytopic --message m --retain

保留处理值 0 将接收保留的消息,而值 2 不会:

mqttx sub --topic mytopic --retain-handling 0
…  Connecting...
✔  Connected
…  Subscribing to mytopic...
✔  Subscribed to mytopic
payload: m
retain: true
^C

mqttx sub --topic mytopic --retain-handling 2
…  Connecting...
✔  Connected
…  Subscribing to mytopic...
✔  Subscribed to mytopic

4 所有ACK上的原因代码

1. 描述

数据包 CONNACK、PUBACK、SUBACK、UNSUBACK 和 DISCONNECT 包含原因码。

2. 实现

一个实现示例是,如果消息未路由到任何队列,则 RabbitMQ 将在 PUBACK 数据包中使用原因代码进行回复。 MQTT 5.0 原因代码在概念上对应于 AMQP 0.9.1 中的强制消息属性和处理程序。No matching subscribers``No matching subscribers``BasicReturn

5. 用户属性

1. 描述

大多数 MQTT 数据包可以包含用户属性。 用户属性的含义不是由 MQTT 规范定义的。

2. 示例 PUBLISH 数据包

PUBLISH 数据包中的用户属性由客户端应用程序定义,并由服务器原封不动地转发。

在第一个终端窗口中订阅:

mqttx sub --topic t/5

在第二个终端窗口中发布包含用户属性的消息:

mqttx pub --topic t/5 --message m --user-properties "key1: value1"

第一个终端窗口将接收用户属性,原封不动:

payload: m
userProperties: [ { key: 'key1', value: 'value1' } ]

MQTT 5.0 PUBLISH 数据包中的用户属性类似于 AMQP 0.9.1 中的消息属性。headers

3. 示例 CONNECT 数据包

使用用户属性进行连接:

mqttx conn --client-id myclient --user-properties "connecting-from: London"

在浏览器中打开管理 UI http://localhost:15672/#/connections(用户名和密码都是 ),然后单击 MQTT 连接:guest

在这里插入图片描述

RabbitMQ 将在管理 UI 中显示 CONNECT 数据包中的用户属性。

6. 有效负载格式和内容类型

1. 描述

发布者可以指定 MIME 内容类型。 它还可以设置有效负载格式指示器,指示有效负载是由 UTF-8 编码的字符数据还是未指定的二进制数据组成。

2. 举例

在第一个终端窗口中,订阅一个主题:

mqttx sub --topic t/6 --output-mode clean

在第二个终端窗口中,发送一条带有内容类型和有效负载格式指示符的消息:

mqttx pub --topic t/6 --message "my UTF-8 encoded data 🙂" --content-type text/plain --payload-format-indicator

第一个终端窗口将原封不动地接收内容类型和有效负载格式指示器:

{
  "topic": "t/6",
  "payload": "my UTF-8 encoded data 🙂",
  "packet": {
    ...
    "properties": {
      "payloadFormatIndicator": true,
      "contentType": "text/plain"
    }
  }
}

7. 请求/响应

1. 描述

MQTT 5.0 正式化了请求/响应模式。

在发布消息之前,MQTT 客户端(请求者)订阅响应主题。 请求者将响应主题和一些关联数据包含在请求消息中。

另一个 MQTT 客户端(响应者)接收到请求消息,执行一些操作,并将具有相同关联数据的响应消息发布到响应主题。

MQTT 5.0 请求/响应功能对应于 AMQP 0.9.1 中的远程过程调用。 但是,在 AMQP 0.9.1 中,请求者将在 AMQP 0.9.1 消息属性中包含回调队列的名称。 MQTT 协议没有定义队列的概念。因此,在 MQTT 中,被回复的“地址”是一个主题名称。reply_to

尽管协议规范之间存在不兼容性,但 RabbitMQ 在协议互操作性方面大放异彩: 因此,RabbitMQ 支持跨协议的请求/响应交互。

例如,MQTT 客户端可以在请求消息中包含响应主题和关联数据。 如果 AMQP 0.9.1 客户端创建了一个绑定到主题交换的队列,该队列的绑定密钥与请求消息的主题匹配,它将收到一条 AMQP 0.9.1 消息,其属性设置为 MQTT 客户端发送的关联数据和名为 . 然后,AMQP 0.9.1 客户端可以使用相同的 MQTT 5.0 客户端响应,并将响应消息发布到具有标头中存在的主题的主题交换。amq.topic``correlation_id``x-opt-reply-to-topic``correlation_id``amq.topic``x-opt-reply-to-topic

2. 举例

此示例仅关注 MQTT 客户端。

在第一个终端窗口中,响应的 MQTT 客户端订阅主题t/7;

mqttx sub --client-id responder --topic t/7 --session-expiry-interval 600 --output-mode clean --qos 1

在第二个终端窗口中,请求的 MQTT 客户端订阅了一个名为 :my/response/topic

mqttx sub --client-id requester --topic my/response/topic --session-expiry-interval 600 --qos 1
…  Connecting...
✔  Connected
…  Subscribing to my/response/topic...
✔  Subscribed to my/response/topic
^C

在第二个终端窗口中,请求者随后发布一条请求消息:

mqttx pub --client-id requester --topic t/7 --message "my request" \
    --correlation-data abc-123 --response-topic my/response/topic \
    --session-expiry-interval 600 --no-clean

在第 1 个终端窗口中,响应方收到请求消息:

{
  "topic": "t/7",
  "payload": "my request",
  "packet": {
    ...
    "properties": {
      "responseTopic": "my/response/topic",
      "correlationData": {
        "type": "Buffer",
        "data": [
          97,
          98,
          99,
          45,
          49,
          50,
          51
        ]
      }
    }
  }
}
^C

在第一个终端窗口中,响应方通过复制关联数据并发布到响应主题来响应请求者:

mqttx pub --client-id responder --topic my/response/topic --message "my response" --correlation-data abc-123

在第 2 个终端窗口中,请求者收到响应。

mqttx sub --client-id requester --topic my/response/topic --no-clean --qos 1 --output-mode clean
{
  "topic": "my/response/topic",
  "payload": "my response",
  "packet": {
    ...
    "properties": {
      "correlationData": {
        "type": "Buffer",
        "data": [
          97,
          98,
          99,
          45,
          49,
          50,
          51
        ]
      }
    }
  }
}

关联数据可用于将响应与请求相关联。 请求者通常为其发布的每个请求选取唯一的关联数据。

8. 分配的客户端标识符

1. 描述

如果客户端使用零长度的客户端标识符进行连接,则服务器必须使用包含分配的客户端标识符的 CONNACK 进行响应。

与 MQTT 3.1.1 相比,这解除了服务器分配的客户端 ID 只能用于连接的限制。Clean Session = 1

2. 实现

RabbitMQ 将生成一些随机的客户端 ID(例如 ),并在 CONNACK 数据包中返回它。dcGB2kSwS0JlXnaBa1A6QA

9. 主题别名

1. 描述

主题别名是一个整数值,用于标识主题,而不是使用主题名称。 这减小了 PUBLISH 数据包的大小,并且在主题名称很长且在网络连接中重复使用相同的主题名称时非常有用。

2. 实现

RabbitMQ 中的默认主题别名最大值为 16。 您可以在 中配置此值,例如:rabbitmq.conf

mqtt.topic_alias_maximum = 32

此配置值映射到从 RabbitMQ 发送到客户端的 CONNACK 数据包中的 Topic Alias Maximum。 它限制了任一方向的主题别名数,即从客户端到 RabbitMQ 和 RabbitMQ 到客户端。 如果客户端发送到许多不同的主题或从许多不同的主题接收,则设置更高的值将需要更多的内存使用量。

RabbitMQ 运算符可以通过设置以下设置来禁止使用主题别名:

mqtt.topic_alias_maximum = 0

10. 流量控制

1. 描述

MQTT 5.0 属性 Receive Maximum 定义了未确认的 QoS 1 PUBLISH 数据包的上限。

2. 实现

从 RabbitMQ 发送到客户端的未确认 QoS 1 PUBLISH 数据包的最大数量由 CONNECT 数据包中从客户端发送到 RabbitMQ 的 Receive Maximum 和配置值:mqtt.prefetch

mqtt.prefetch = 10

默认值为 10。mqtt.prefetch

该值在 MQTT 3.1 和 3.1.1 的 RabbitMQ 3.13 之前已存在。 它映射到 RabbitMQ 中的使用者预取。 换句话说,它定义队列发送到其 MQTT 连接进程的动态消息数量。mqtt.prefetch

11. 最大数据包大小

1. 描述

客户端和服务器可以独立指定它们支持的最大数据包大小。

2. 举例

此示例演示如何限制从客户端发送到 RabbitMQ 的最大 MQTT 数据包大小。

假设身份验证成功后,RabbitMQ 操作员不希望 RabbitMQ 接受任何大于 1 KiB 的 MQTT 数据包。 将以下配置写入 rabbitmq.conf(在当前工作目录中):

mqtt.max_packet_size_authenticated = 1024

停止 RabbitMQ 服务器后,启动 RabbitMQ 服务器并应用新配置:

docker run -it --rm --name rabbitmq -p 1883:1883 -p 15672:15672 -p 15692:15692 \
    --mount type=bind,source="$(pwd)"/rabbitmq.conf,target=/etc/rabbitmq/conf.d/11-blog-post.conf \
    rabbitmq:3.13.0-beta.2-management
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_mqtt
docker exec rabbitmq rabbitmqctl enable_feature_flag all

在第一个终端窗口中,订阅一个主题:

mqttx sub --topic t/11

在第二个终端窗口中,向该主题发送有效负载为 3 字节的消息:

payload=$(head --bytes 3 < /dev/zero | tr '\0' x)
mqttx pub --topic t/11 -m "$payload"

第一行从特殊文件中读取 3 个字节(3 个 null 字符),将每个 null 字符转换为 ASCII 字符并将结果保存在变量中。/dev/zero``x``xxx``payload

第一个终端窗口将收到该消息:

payload: xxx

接下来,在第 2 个终端窗口中,发送有效负载为 2,000 字节的消息:

payload=$(head --bytes 2000 < /dev/zero | tr '\0' x)
mqttx pub --topic t/11 -m "$payload"

这一次,第一个终端窗口不会收到消息,因为从客户端发送到 RabbitMQ 的 PUBLISH 数据包大于配置的最大数据包大小 1024 字节。

相反,RabbitMQ 会记录一条描述性错误消息:

[error] <0.836.0> MQTT packet size (2007 bytes, type 3) exceeds mqtt.max_packet_size_authenticated (1024 bytes)

日志消息声明 2,007 字节,因为 PUBLISH 数据包的固定和可变标头需要 7 个字节(其中 4 个字节用于主题名称)。t/11

12. 服务器启动的断开连接

1. 描述

在 MQTT 5.0 中,DISCONNECT 数据包不仅可以从客户端发送到服务器,还可以从服务器发送到客户端。

2. 实现

在终止连接之前,RabbitMQ 会在以下情况下向客户端发送 DISCONNECT 数据包:

DISCONNECT 原因代码名称情况
接管的会话使用同一客户端 ID 连接的另一个客户端。
服务器关闭RabbitMQ 进入维护模式。
保持活动超时客户端无法在“保持活动”时间内进行通信。
数据包太大RabbitMQ 收到大小超过mqtt.max_packet_size_authenticated

13. 会话到期

1. 描述

在 MQTT 5.0 中,客户端可以在 CONNECT 数据包中向服务器建议会话到期间隔。 服务器可以接受建议的会话到期间隔,也可以在 CONNACK 数据包中强制要求不同的会话到期间隔。

会话可以跨一系列网络连接继续进行。它的持续时间与最新的网络连接加上会话到期间隔一样长。

当会话到期间隔到期时,客户端和服务器都将删除任何会话状态。

2. 实现

只要会话持续,客户端和服务器就会保持会话状态。

服务器中的会话状态包括已发送到客户端但尚未确认的消息、待发送到客户端的消息以及客户端的订阅。 RabbitMQ 以队列和绑定的形式对这种 MQTT 会话状态进行建模。

因此,会话到期间隔映射到 RabbitMQ 中的队列 TTL。 当 MQTT 会话过期时,队列及其消息和绑定将被删除。

3. 举例

默认情况下,服务器允许的最大会话到期间隔为 1 天。 如果 MQTT 客户端在 1 天内没有重新连接,则其会话状态将在 RabbitMQ 中删除。

此值是可配置的。 出于此示例的目的,让我们在以下情况下设置一个非常低的会话到期间隔 1 分钟:rabbitmq.conf

mqtt.max_session_expiry_interval_seconds = 60

设置名称包含前缀,因为 MQTT 5.0 客户端可以通过在 CONNECT 数据包中发送会话到期间隔来选择较低的值。 如最大数据包大小示例中所做的那样,重新启动 RabbitMQ 节点,以便应用新设置。max

以 20 秒的会话到期间隔连接到 RabbitMQ 并创建订阅:

mqttx sub --client-id sub-13 --topic t/13 --session-expiry-interval 20 --qos 1
…  Connecting...
✔  Connected
…  Subscribing to t/13...
✔  Subscribed to t/13
^C

键入终端以断开客户端连接。Ctrl+C

在接下来的 20 秒内,列出队列和绑定:

docker exec rabbitmq rabbitmqctl list_queues name
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name
mqtt-subscription-sub-13qos1

docker exec rabbitmq rabbitmqctl list_bindings source_name destination_name routing_key --formatter=pretty_table
Listing bindings for vhost /...
┌─────────────┬──────────────────────────────┬──────────────────────────────┐
│ source_name │ destination_name             │ routing_key                  │
├─────────────┼──────────────────────────────┼──────────────────────────────┤
│             │ mqtt-subscription-sub-13qos1 │ mqtt-subscription-sub-13qos1 │
├─────────────┼──────────────────────────────┼──────────────────────────────┤
│ amq.topic   │ mqtt-subscription-sub-13qos1 │ t.13                         │
└─────────────┴──────────────────────────────┴──────────────────────────────┘

20 秒后,再次列出队列和绑定:

docker exec rabbitmq rabbitmqctl list_queues name
Timeout: 60.0 seconds ...
Listing queues for vhost / ...

docker exec rabbitmq rabbitmqctl list_bindings source_name destination_name routing_key --formatter=pretty_table
Listing bindings for vhost /...

RabbitMQ 删除了队列及其绑定,因为我们的客户端未在 20 秒的会话到期间隔内连接到 RabbitMQ。Clean Session = 0

接下来,执行相同的测试,但会话到期间隔较长,例如 1 小时:

mqttx sub --client-id sub-13 --topic t/13 --session-expiry-interval 3600 --qos 1
…  Connecting...
✔  Connected
…  Subscribing to t/13...
✔  Subscribed to t/13
^C

您应该注意到,队列及其绑定将在 1 分钟后被删除,因为有效的会话到期间隔 是客户端请求的最小值(1 小时)和 RabbitMQ 中配置的值(1 分钟)。mqtt.max_session_expiry_interval_seconds

14. 会延迟

1. 描述

客户端可以在 CONNECT 数据包中定义 Will Delay Interval。

服务器会延迟发布客户的遗嘱消息,直到遗嘱延迟间隔过去或会话结束,以先发生者为准。 如果在将延迟间隔过去之前与此会话建立了新的网络连接,则服务器不得发送将消息。 这样做的一个用途是,如果存在临时网络断开连接,并且客户端在发布遗嘱消息之前成功重新连接并继续其会话,则避免发布遗嘱消息。

Will Delay Interval 的另一个用例是通知会话到期:

客户端可以通过将 Will Delay Interval 设置为 Session Expiry Interval 长并发送带有原因0x04代码的 DISCONNECT(Disconnect with Will Message),来安排 Will Message 通知会话到期已发生。

2. 实现

尽管 will 消息有效负载通常很小,但 MQTT 规范允许 will 消息有效负载大小高达 64 KiB。

为了避免在 Khepri(RabbitMQ 未来的元数据存储)中存储大型二进制数据,RabbitMQ 创建了一个包含此单个遗嘱消息的经典队列。 我们称此队列为 Will 队列。 此消息具有每条消息的 TTL 集,该集以毫秒为单位定义,对应于以秒为单位的 Will Delay Interval。 此外,Will 队列还设置了一个队列 TTL,该队列以毫秒为单位定义,对应于以秒为单位的会话到期间隔。 每条消息的有效 TTL 至少比队列 TTL 低几毫秒,以便消息将在队列(会话)过期前不久发布。

Will 队列还定义(MQTT 插件使用的默认主题交换)为死信交换,将 will 主题定义为死信路由键。amq.topic

如果 MQTT 客户端未在其 Will 延迟间隔内重新连接,则 Will 队列中的消息将死信发送到主题交换。

让我们用一个例子来说明这一点。

3. 举例

在第一个终端窗口中,创建一个将使用遗嘱消息的订阅:

mqttx sub --client-id sub-14 --topic t/14

在第二个终端窗口中,创建一个 Will Delay Interval 为 20 秒的连接:

mqttx conn --client-id conn-14 --will-topic t/14 --will-message my-will-message --will-delay-interval 20 --session-expiry-interval 40

在第 3 个终端窗口中,我们看到到目前为止,订阅 MQTT 客户端创建了一个队列:

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages arguments
┌──────────────────────────────┬────────────┬──────────┬───────────┐
│ name                         │ type       │ messages │ arguments │
├──────────────────────────────┼────────────┼──────────┼───────────┤
│ mqtt-subscription-sub-14qos0 │ MQTT QoS 00        │           │
└──────────────────────────────┴────────────┴──────────┴───────────┘

在第二个终端窗口中,键入 to disconnect the MQTT connection with client ID。Ctrl+C``conn-14

这一次,列出队列显示已创建 Will 队列:

docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages arguments
┌──────────────────────────────┬────────────┬──────────┬────────────────────────────────────────────────────────────┐
│ name                         │ type       │ messages │ arguments                                                  │
├──────────────────────────────┼────────────┼──────────┼────────────────────────────────────────────────────────────┤
│ mqtt-subscription-sub-14qos0 │ MQTT QoS 00        │                                                            │
├──────────────────────────────┼────────────┼──────────┼────────────────────────────────────────────────────────────┤
│ mqtt-will-conn-14            │ classic    │ 1{<<"x-expires">>,long,40000}                               │
│                              │            │          │ {<<"x-dead-letter-exchange">>,longstr,<<"amq.topic">>}     │
│                              │            │          │ {<<"x-dead-letter-routing-key">>,longstr,<<"t.14">>}       │
└──────────────────────────────┴────────────┴──────────┴────────────────────────────────────────────────────────────┘

Will 队列的命名模式为 。 它包含一条消息:遗嘱消息。mqtt-will-<MQTT Client ID>

如上一节所述,队列 TTL () 为 40,000 毫秒,因此与上面命令中的 40 秒会话到期间隔匹配。 如果您等待 20 秒,您的第一个终端窗口应该会收到遗嘱消息,因为我们的客户没有在遗嘱延迟间隔内重新连接:x-expires

› payload: my-will-message

15. 可选的服务器功能可用性

1. 描述

定义一组服务器不允许的功能,并为服务器提供一种机制,以便将其指定给客户端。 可以通过这种方式指定的功能包括:

  • 最大 QoS
  • 保留可用
  • 提供通配符订阅
  • 可用的订阅标识符
  • 提供共享订阅

客户端使用服务器声明不可用的功能是错误的。

2. 实现

RabbitMQ 3.13 在 CONNACK 属性中包括 Maximum QoS = 1 和 Shared Subscription Available = 0。

RabbitMQ 不支持 QoS 2。

如下一节所述,将来的 RabbitMQ 版本将支持共享订阅。

4. 局限性

本节列出了 RabbitMQ MQTT 实现的限制。

1. MQTT 5.0 特定限制

1. 共享订阅

共享订阅将在将来的 RabbitMQ 版本中添加。 尽管此功能很好地映射到 RabbitMQ 中的队列,但共享订阅是会话状态的一部分,并且需要进行某些 RabbitMQ 数据库迁移才能有效地查询给定 MQTT 客户端 ID 的共享订阅。

1. 延迟和保留的遗嘱消息

延迟和保留的遗嘱信息将不会被保留。 这是因为延迟的遗嘱消息将死信到主题交换,但保留进程当前不会从队列中使用。 将来可以通过保留邮件的新存储来解决此限制。

2. 非 MQTT 5.0 特定限制

为了完整起见,本节列出了在 RabbitMQ 3.13 中支持 MQTT 5.0 之前和 RabbitMQ 3.12 中提供原生 MQTT 之前存在的限制。

1. 保留的消息

保留消息的功能在 RabbitMQ 中受到限制。

保留的消息仅在本地节点上存储和查询。

一个有效的示例如下: MQTT 客户端向节点 A 发布一条保留的消息,主题为 。此后,另一个客户端在节点 A 上使用主题过滤器进行订阅。新订阅者将收到保留的消息。topic/1``topic/1

但是,如果主题筛选器包含通配符(多级通配符 “”或单级通配符 “”),则不会发送保留的消息 (问题 #8824)。#``+

此外,如果客户端在节点 A 上发布了保留的消息,而另一个客户端随后在节点 B 上订阅,则该订阅客户端将不会收到存储在节点 A 上的任何保留消息(问题 #8096)。

将来的 RabbitMQ 版本将复制集群中保留的消息,并发送与包含通配符的主题筛选器匹配的保留消息。

5. 总结

综上所述,RabbitMQ

  • 是领先的 AMQP 0.9.1 代理
  • 是一个流是代理
  • 擅长跨协议互操作性
  • 由于支持 3.13 中发布的 MQTT 5.0 和 3.12 中发布的原生 MQTT,它正在成为领先的 MQTT 代理之一

我们将 RabbitMQ 转变为成熟的物联网代理的旅程尚未完成,并计划在未来几个月和几年内进行更多的开发工作。 敬请关注!

6. 相关链接

MQTT 5.0 support is coming in RabbitMQ 3.13 | RabbitMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1574081.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

含风电-光伏-光热电站电力系统N-k安全优化调度模型

目录 1 主要内容 2 部分程序 3 部分结果 4 下载链接 1 主要内容 该程序参考《光热电站促进风电消纳的电力系统优化调度》光热电站模型&#xff0c;主要做的是考虑N-k安全约束的含义风电-光伏-光热电站的电力系统优化调度模型&#xff0c;从而体现光热电站在调度灵活性以及经…

Linux 常用指令及其理论知识

个人主页&#xff1a;仍有未知等待探索-CSDN博客 专题分栏&#xff1a;http://t.csdnimg.cn/Tvyou 欢迎各位指教&#xff01;&#xff01;&#xff01; 目录 一、理论知识 二、基础指令 1、ls指令&#xff08;列出该目录下的所有子目录和文件&#xff09; 语法&#xff1a; …

扫描电镜如何能拍到样品的好的形貌?

扫描电镜是表征材料微观形貌的有力工具&#xff0c;它能够呈现样品的精细结构。然而&#xff0c;要拍摄出高质量的样品形貌并非易事&#xff0c;除了要熟悉扫描电镜的各种功能&#xff0c;还需要掌握一些技巧。本文将介绍如何利用景深、倾斜校正、动态聚焦等功能以及合轴和消像…

ZS65-40-125/1.5卧式单级不锈钢离心泵

首先&#xff0c;让我们解析这款离心泵的基本参数。ZS65-40-125/1.5标识着该泵型的特定性能特征。ZS代表这是一种卧式单级离心泵&#xff1b;65表示泵的进口直径为65毫米&#xff0c;确保了足够的介质流入量&#xff1b;40是指出水口直径为40毫米&#xff0c;设计以适应特定的系…

吴恩达2022机器学习专项课程(一) 第二周课程实验:多元线性回归(Lab_02)

1.训练集 使用Numpy数组存储数据集。 2.打印数组 打印两个数组的形状和数据。 3.初始化w&#xff0c;b 为了演示&#xff0c;w&#xff0c;b预设出接近最优解的值。w是一个一维数组&#xff0c;w个数对应特征个数。 4.非向量化计算多元线性回归函数 使用for循环&…

盲盒小程序开发:年轻人成为盲盒主力军

当下&#xff0c;在各种生活压力下&#xff0c;大众的消费逐渐倾向于情绪价值较高的娱乐消费上&#xff0c;不仅可以解压&#xff0c;还可以带来更多的惊喜感。 盲盒的主要受众群体是年轻人&#xff0c;购买能力非常高&#xff0c;盲盒的营收也在成倍的增加&#xff0c;发展前…

MES实施之工控机和电脑的选择

在MES项目实施过程中,经常会碰到工控机和电脑的选型问题,那么他们的区别是什么? 1、控机和普通个人电脑(PC)相比,具有以下几个区别: 1.运行环境不同:工控机通常需要在各种恶劣的工业环境中运行,如高温、高湿、强电磁干扰等,因此需要具有防尘、防水、抗干扰等特点。而…

c++线程基础知识

1. joinable()函数 int LocalOptimizationProcess::Stop() {stop_thread_ true;cond_.notify_all();LOGD("LocalOptimizationProcess Thread Join()");if (thread_->joinable()){thread_->join();}LOGD("LocalOptimizationProcess Thread Stoped")…

实时计算平台设计方案:911-基于6U VPX的光纤图像DSP实时计算平台

基于6U VPX的光纤图像DSP实时计算平台 一、系统组成 该平台基于风冷式的 6U 6槽VPX图像处理平台&#xff0c;包括&#xff1a;计算机主板、计算机主板后板、存储板、图像信号处理板、图像信号处理板后板、图像光纤转接板、机箱背板及机箱组成。图1为系统背板结构示意图&…

SAP FICO应收票据平台开发说明书(包括测试样例、程序代码仅作参考,不保证一定可以运行)

效果展示 应收票据 应收汇票录入界面创建 应收汇票录入界面更改 应收汇票录入界面显示 应收票据批量上载 <

十分钟让我带你入门Pandas基础使用

如何导入 通常&#xff0c;我们按如下方式导入&#xff1a; In [1]: import numpy as npIn [2]: import pandas as pdPandas的基本数据结构 Pandas 提供了两种类型的类来处理数据&#xff1a; Series: 保存任何类型数据的一维标记数组 such as integers, strings, Python obj…

软件库V1.2版本开源-首页UI优化

iAppV3源码&#xff0c;首页的分类更换成了标签布局&#xff0c;各位可以参考学习&#xff0c;界面名称已经中文标注&#xff01; 老版本和现在的版本还是有较大的区别的&#xff0c;建议更新一下&#xff01; 新版本改动界面如下&#xff1a; 1、首页.iyu&#xff1a;分类按…

c++的学习之路:13、vector(2)

本章主要是模拟实现vector&#xff0c;文章末附上代码&#xff0c;和源码。 目录 一、STL源码 二、构造与析构 三、迭代器与【】、size、capacity、empty 四、reserve与resize 五、push_back与pop_back 六、insert与erase 七、测试 1 八、代码 九、思维导图 一、STL源…

永磁同步电机谐波抑制算法(4)——多同步旋转坐标系谐波电流抑制存在的问题以及改进办法

1.问题的引出 在之前的内容中以及讲了多同步旋转坐标系的五七次谐波电流的抑制。 永磁同步电机谐波抑制算法&#xff08;1&#xff09;——基于多同步旋转坐标系的五七次谐波电流抑制 - 知乎这段时间发现电机里面会存在五次谐波&#xff0c;然后学了学谐波抑制的方法。一般比…

海外公司注册推广:15种利用社交媒体的方法大揭秘-华媒舍

社交媒体在如今的全球化时代扮演着重要的角色&#xff0c;为海外公司注册推广提供了各种有效的方法。本文将介绍15种利用社交媒体的方法&#xff0c;帮助海外公司实现注册推广。 1. 制定战略 在开始利用社交媒体之前&#xff0c;海外公司需要制定一个明确的推广战略。确定目标…

BUUCTF---内涵的软件(reverse)

1.题目描述&#xff0c;下载附件&#xff0c;是一个exe文件 2.用PE查看是否有壳&#xff0c;结果无壳 3.用IDA打开&#xff0c;找到main函数 4.双击_main_0,按f5&#xff0c;页面显示如下 5.看到一串可疑的字符串 6.修改为flag{49d3c93df25caad81232130f3d2ebfad}提交&#xf…

springboot 在fegin调用中sdk集成主工程,A component required a bean of type.....

一 前景描述 1.1 总结 1.主工程启动类&#xff08;这里是FeginApp8081&#xff09;所在的路径&#xff0c;和调用sdk的类&#xff0c;这里是FeginJiekou接口类型&#xff0c;其所在目录和主工程目录启动一致。则不需要在启动加制定扫描注解。 主工程启动类路径&#xff1a;…

MFC扩展库BCGControlBar Pro v34.1 - 支持Windows 10/11字体图标

BCGControlBar库拥有500多个经过全面设计、测试和充分记录的MFC扩展类。 我们的组件可以轻松地集成到您的应用程序中&#xff0c;并为您节省数百个开发和调试时间。 BCGControlBar专业版 v34.1已正式发布了&#xff0c;这个版本包含了对Windows 10/11字体图标的支持、功能区和…

海外仓的出入库流程有什么痛点?位像素海外仓系统怎么提高出入库效率?

随着跨境电商的蓬勃发展&#xff0c;海外仓是其中不可或缺的一个关键环节。而货物的出库与入库则是海外仓管理中的一个核心业务流程&#xff0c;它的运作效率直接影响到整个跨境物流的效率和客户体验。今天&#xff0c;让我们具体来看一看关于海外仓出入库的流程&#xff0c;其…

Linux:谈谈阻塞式和非阻塞式IO

文章目录 阻塞式和非阻塞式IO 本篇总结的核心内容就是非阻塞式IO&#xff0c;直接看代码 阻塞式和非阻塞式IO 阻塞式IO 如下所示是典型的阻塞式IO #include <iostream> #include <unistd.h> using namespace std;int main() {char buff[1024];while(true){ssiz…