物联网常见协议之 Amqp 协议及使用场景解析

news2025/1/22 6:52:23

引言

本文围绕 AMQP 协议,为大家详细解析 AMQP 协议、核心技术亮点、多协议之间的对比以及使用实践,并介绍华为云 IoT 通过 Amqp 协议如何为开发者和企业提供了更加灵活和高效的通信方式,使得物联网应用得以在各个领域得到更广泛的推广和应用。

AMQP 协议,全称为 Advanced Message Queuing Protocol。在 2006 年 6 月,由 Cisco、Redhat、iMatrix 等联合制定了 AMQP 的消息标准。

除了 AMQP 协议,还有一些其他协议如 Mqtt (Message Queuing Telemetry Transport)、Http、Kafka。每个协议的发明 / 出现都是为了解决特定的问题。没有最合适的协议,只有更合适的业务场景。在后面我们也会对这些协议进行简单的对比。

Amqp 历史上大概有如下四个版本,

  • Amqp 0-8: 发布于 2006 年
  • Amqp 0-9-1:发布于 2008 年,是 Amqp 0-8 的改进版,被广泛应用,如 rabbitmq、qpid 等
  • Amqp 0-10:发布于 2008 年,是 Amqp 0-9-1 的改进,未被广泛使用
  • Amqp 1.0:发布于 2011 年,是 Amqp 协议的下一代标准,与之前的版本不兼容,但提供了更强大的特性和更好的性能。目前也在华为云 IoT、Azure 中有应用起来。包括 rabbitmq、qpid 等也提供了对 Amqp1.0 版本协议的支持

我们也会主要讨论 Amqp 0-9-1 和 Amqp 1.0 这两个版本

Amqp 0-9-1 协议简述

核心概念

  • Virtual Host:简称 vhost,个人理解是 Amqp 协议上的多租,每个 vhost 具有自己的 Exchanges、Message Queues 等,互相不干扰。
  • Exchange: 从生产者应用程序中接收消息,并根据特定的情况(消息属性或内容),将这些消息路由到 “Message Queue” 中
  • Message Queue: 消息队列,存储消息,直到它被消费者应用程序安全地处理
  • Binding:指 Exchange 将何种类型的消息发送到 Queue 中,提供消息路由机制

Amqp 0-9-1 协议是一个 多链路、协商的、异步、安全、可移植、高效的协议。Amqp 协议通常分为两层:

+------------------Functional Layer----------------+
| Basic Transactions Exchanges Message queues     |
+--------------------------------------------------+
+------------------Transport Layer-----------------+
| Framing Content Data representation             |
| Error handling Heart-beating Channels |
+--------------------------------------------------+

此外,由于 Amqp 协议的 message queue 支持许多特性:私有或共享、持久化或临时等等。根据不同的属性设定,我们可将 AMQP 用于许多应用场景,例如

  • 消息中间件使用:共享的存储转发队列,保存消息并交给多个消费者消费
  • RPC 使用:通过将队列设定为临时的,带有 IP 地址的,来模拟 RPC 接口

Amqp 0-9-1 生产时序图

Amqp 0-9-1 消费时序图

Amqp 0-9-1 协议帧及数据类型

Amqp 0-9-1 的协议帧由 FrameHeader、Payload、FrameEnd 组成

  • Integers 整数(1 到 8 个字节):用于表示大小、数量、限制等。整数总是无符号的,
  • Bits 位:用于表示开 / 关值,一个八位字节。
  • Short strings 短字符串:用于存储短文本属性。短字符串长度限制为 255 个八位字节。
  • Long strings 长字符串:用于存储二进制数据块。
  • Field tables 字段表:存储名称 - 值对。字段值可以是字符串、整数等类型。

Amqp 1-0 协议

与 Amqp 0-9-1 的差异

协议设计层面:

  • AMQP 0-9-1:此版本的 AMQP 主要针对代理的设计,涵盖了消息传递模型、代理行为和交互模式。0-9-1 版本的协议与代理的实现紧密耦合。
  • AMQP 1.0:此版本的 AMQP 更注重基于互操作性的通信协议,不依赖于特定的代理实现。AMQP 1.0 关注点在于在发送者和接收者之间传输消息,而不是代理的内部行为。
  • 比如像”Queue Declare”、“Queue Delete”、“Queue Query” 这些在 Amqp 0-9-1 支持的命令,在 Amqp1.0 中都被移除,并假设这些功能会在更高层(broker)参加。

对称层面:

  • Amqp 0-9-1 是一个典型的客户端 / 服务器通信协议。
  • Amqp 1.0 则是一个对称的协议,任何一端都可以注册为 sender 或是 receiver,并且从如下 Amqp 0.9.1 和 1.0 之间的时序图对比也可以看出来。Amqp 1.0 是完全双工的协议。从某种程度或者说网络编程的角度来说,实现的难度更大。

Amqp 1-0 鉴权时序图

Amqp 1-0 生产时序图

Amqp 1-0 消费时序图

Amqp 1.0 协议帧介绍

Amqp1.0 的协议帧由 FrameHeader、ExtendedHeader、FrameBody 组成。

  • FrameHeader 8 个字节大小,包含长度、类型信息等
  • Extended header 可变宽度区域
  • FrameBody 是一个可变宽度的字节序列,其格式取决于帧类型

FrameHeader 介绍

  • Size: FrameHeader 的第 0~3 个字节包含帧大小。无符号的 32 位整数,为 FrameHeader、ExtendedHeader、FrameBody 的总和大小。如果大小小于 8 字节,则格式错误
  • DOFF: FrameHeader 的第 4 个字节,这表示帧内 Body 的位置。
  • Type: FrameHeader 的第 5 个字节,类型代码表示帧的格式和目的。根据帧的类型,帧头中的后续字节可能会被不同地解释。类型代码 0x00 表示该帧是 AMQP 帧。类型代码 0x01 表示该帧是 SASL 帧等。

Amqp 帧介绍

Amqp 帧类型代码为 0x00。对于 Amqp 帧来说,FrameHeader 的第 6 字节和第 7 字节表示 channel 的编号。Frame Body 被定义为一个 performative 后跟一个不透明的 payload。表现形式必须是第 open、begin、attach、flow、transfer、disposition、detach、end、close 中定义的一个,并在 AMQP 类型系统中编码为描述的类型。帧体中剩余的字节构成了该帧的 payload。payload 的存在和格式由给定表现形式的语义定义。

SASL 帧介绍

Sasl 帧类型代码为 0x01。FrameHeader 中的第 6 和第 7 字节应该被忽略。也不存在扩展头。所以 DOFF 固定位 0x02。

与其他消息通信协议间的对比

Amqp 与 Mqtt 的对比

Amqp 和 Mqtt 都是应用层的消息传递协议,mqtt 更加轻量,相对来说概念不如 amqp 那么丰富,同时 mqtt 头部消息更加短小。更加适用于低带宽、功耗较低的物联网设备

Amqp 与 Kafka 协议的对比

AMQP 是一种非常灵活的协议,可以用于各种类型的消息传递场景,包括点对点和发布 - 订阅模型。Kafka 则专注于高吞吐量的流式处理,适用于数据管道和流式处理等场景。

Kafka 的设计旨在提供高吞吐量和低延迟。AMQP 的性能因实现和使用情况而异,但在大多数情况下,它的性能不如 Kafka。

Kafka 拥有强大的生态系统,包括流处理、数据湖、消息队列等多个应用场景。AMQP 也有相应的生态系统和工具,但相对来说要小得多。

总得来说,尽管 kafka 存在性能上的优势,但 kafka broker 很难对外暴露。相较于 kafka 这种私有消息中间件协议,Amqp 足够标准,更适合各种异构系统的对接。

AMQP 协议相关的开源项目

rabbitmq

提到 AMQP,就不得不提 rabbitmq。RabbitMQ 是一个开源的消息代理和队列服务器,用于通过高级消息队列协议(AMQP)在分布式系统中实现消息传递。RabbitMQ 提供了一个可靠、高性能、可扩展和易于使用的消息传递平台,支持多种编程语言和平台。它最初是用 Erlang 语言编写的,因此具有良好的并发性能和容错能力。

所谓成也 erlang,败也 erlang,由于 erlang 语言生态的问题,有能力深入维护 Rabbitmq 的人员并不是很多,也是 rabbitmq 越来越不流行的一个原因。

Qpid

Apache Qpid(Quick Platform for Interactive Distributed Messaging)是一个开源的消息传递系统,它实现了高级消息队列协议(Advanced Message Queuing Protocol,AMQP)的多种版本。AMQP 是一种开放标准的应用层协议,用于消息传递的中间件,它可以实现跨平台、跨语言的消息通信。Qpid 项目的主要目标是提供一个可靠、可扩展和高性能的消息传递平台,帮助开发者更容易地构建分布式系统。主要的组件有

  • Qpid Broker:一个高性能、可扩展的 AMQP 消息代理,支持持久化、事务和安全认证等特性。Qpid Broker 提供了 Java 和 C++ 两种实现。
  • Qpid Proton:一个轻量级的 AMQP 库,旨在为各种编程语言提供高性能的 AMQP 实现,提供了 C 和 java 的默认实现。此外 Proton 还提供了其他编程语言如 python 的绑定。

但 Qpid 总得来说,比较重型,如果仅仅是想在原有的消息组件,如 kafka/pulsar 外面叠加一层 Amqp 可访问的能力,我相信 proton 是更好的选择。

Vertx-proton

Vert.x Proton 的目标是结合 Vert.x 的响应式编程模型和 Qpid Proton 的 AMQP 支持,以简化构建高性能、可扩展的、基于 AMQP 的分布式应用程序。Vert.x Proton 提供了一套简洁、易用的 API,可以让开发者在 Vert.x 应用程序中轻松地实现 AMQP 通信。

华为云 IoT 对 AMQP 的支持

在最初阶段华为云 IoTDA 主要支持 HTTP 协议,尽管这种方式已经能满足许多需求,但随着物联网技术的普及和发展,用户对于更加灵活和高效的通信方式的需求逐渐增强,华为云 IoTDA 逐渐丰富协议库,当前支持 60 + 协议接入,为开发者和企业提供更加完善的解决方案。

在 IoT 应用对接场景中,华为云 IoT 现已新增了对 AMQP 的支持,与 HTTP 协议相比,AMQP 协议具有以下优势

  1. 无需 HTTP 服务器:AMQP 协议无需开发者搭建 HTTP 服务器,降低了项目成本,简化了系统架构。可以部署在各种类型的设备,包括手机、平板、智能家居设备等,进一步拓宽了物联网应用的领域。
  2. 低延迟、高效率:AMQP 协议采用二进制传输,降低了数据传输所需的带宽,提高了传输速度,降低了延迟。
  3. 强大的消息队列功能:AMQP 协议具有优秀的消息队列功能,支持点对点和发布订阅模式,确保消息的可靠传输和顺序处理。

通过支持 AMQP 协议,华为云 IoT 为开发者和企业提供了更加灵活和高效的通信方式,使得物联网应用得以在各个领域得到更广泛的推广和应用。

Amqp 实战:使用 qpid-proton python 消费华为云 IoTDA 的 Amqp 消息

首先通过 pip 安装依赖包

pip install python-qpid-proton

最简单的消费者 demo, consumer.py 如下

import sys
from proton.handlers import MessagingHandler
from proton.reactor import Container
class AMQPConsumer(MessagingHandler):
 def __init__(self, server_url, target_address):
 super(AMQPConsumer, self).__init__()
 self.server_url = server_url
 self.target_address = target_address
 def on_start(self, event):
        conn = event.container.connect(self.server_url)
 event.container.create_receiver(conn, self.target_address)
 def on_message(self, event):
 print(f"Received message: {event.message.body}")
 event.connection.close()
if __name__ == "__main__":
 server_url = "amqp://localhost:5672"
 target_address = "example_queue"
 try:
 Container(AMQPConsumer(server_url, target_address)).run()
 except KeyboardInterrupt:
 sys.exit(0)

我们可以使用这个 producer.py 验证 consumer.py 可用

import sys
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
class AMQPProducer(MessagingHandler):
 def __init__(self, server_url, target_address, message_body):
 super(AMQPProducer, self).__init__()
 self.server_url = server_url
 self.target_address = target_address
 self.message_body = message_body
 def on_start(self, event):
        conn = event.container.connect(self.server_url)
 self.sender = event.container.create_sender(conn, self.target_address)
 def on_sendable(self, event):
        message = Message(body=self.message_body)
 event.sender.send(message)
 print(f"Sent message: {message.body}")
 event.connection.close()
if __name__ == "__main__":
 server_url = "amqp://localhost:5672"
 target_address = "example_queue"
 message_body = "Hello, AMQP 1.0!"
 try:
 Container(AMQPProducer(server_url, target_address, message_body)).run()
 except KeyboardInterrupt:
 sys.exit(0)

为了能连接上华为云 IoTDA 的 Amqp 接入点,我们还需要给 consumer.py 配置用户名、密码。如果您对 TLS 认证服务端还有诉求,则还可以配置上证书信息。如下为样例代码,具体连接的信息、凭据如何获得可参考: https://support.huaweicloud.com/devg-iothub/iot_01_00100_2.html。注意,url 也从 amqp 修改为了 amqps

import sys
from proton import Message, SSLDomain
from proton.handlers import MessagingHandler
from proton.reactor import Container
class AMQPConsumer(MessagingHandler):
 def __init__(self, server_url, target_address, username, password, cert_file, key_file):
 super(AMQPConsumer, self).__init__()
 self.server_url = server_url
 self.target_address = target_address
 self.username = username
 self.password = password
 self.cert_file = cert_file
 self.key_file = key_file
 def on_start(self, event):
 ssl_domain = SSLDomain(mode=SSLDomain.MODE_CLIENT)
 ssl_domain.set_credentials(self.cert_file, self.key_file, None)
        conn = event.container.connect(self.server_url, user=self.username, password=self.password, ssl_domain=ssl_domain)
 event.container.create_receiver(conn, self.target_address)
 def on_message(self, event):
 print(f"Received message: {event.message.body}")
 event.connection.close()
if __name__ == "__main__":
 server_url = "amqps://localhost:5671" # 注意 'amqps',它表示使用 SSL/TLS 连接
 target_address = "example_queue"
    username = "your_username"
    password = "your_password"
 cert_file = "path/to/your/certificate.pem"
 key_file = "path/to/your/private_key.pem"
 try:
 Container(AMQPConsumer(server_url, target_address, username, password, cert_file, key_file)).run()
 except KeyboardInterrupt:
 sys.exit(0)

该样例代码已上传到 gitee

总结与展望

总体来说,AMQP 作为一种应用层协议,在消息传递和异构系统之间的通信方面提供了非常灵活和可靠的解决方案。与其他消息传递协议相比,AMQP 具有丰富的功能和灵活的设计,适用于各种类型的消息传递场景。

在使用 AMQP 时,我们可以选择现有的开源实现,如 RabbitMQ、Qpid 等,也可以自行实现 AMQP 的相关组件。通过这些实现,我们可以轻松地在不同的应用程序、语言和平台之间进行消息传递,并实现可靠、高效、安全的通信。

随着物联网、云计算和大数据等技术的发展,AMQP 的应用场景越来越广泛,比如在 IoT 设备、大数据流处理、分布式系统等方面都得到了广泛应用。未来,AMQP 将继续发挥重要作用,推动各种异构系统之间的互联互通,带来更加便捷和高效的消息传递体验。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/519768.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

项目文档(request页面代码逻辑)

项目文档 1. 封装请求基地址 2. 添加请求拦截器并设置请求头 作用 在向服务器发送请求时,很多请求都需要在请求头中携带token,每一次去请求都写请求头很麻烦,所以我们写一个请求拦截器,统一拦截并添加一个请求头 代码部分 // 请求拦截器 req…

[架构之路-194]-《软考-系统分析师》- 软件复用技术之软件产品线

目录 1. 思想来源:产品线 1.1 硬件产品线 1.2. 产品组合 2. 软件产品线 2.1 思想来源: 2.2 为什么使用软件产品线? 2.3 软件产品线是一种软件架构 2.4 软件产品线详解 2.5 软件产品线的双生命周期的模型 2.6 软件产品线建立方式 …

Kali E:Unable to locate package错误解决

默认的新装的kali 可能都会遇到这个安装报错E: Unable to locate package httrack问题,今天我记录下彻底解决过程和效果。 Command httrack not found, but can be installed with: apt install httrack Do you want to install it? (N/y)y apt install httrack Re…

介绍动作识别数据集:“NTU RGB+D”数据集和“NTU RGB+D 120”数据集

动作识别数据集:“NTU RGBD”数据集和“NTU RGBD 120”数据集 (还包括AUTH UAV手势数据集:NTU 4级) 本页介绍两个数据集:“NTU RGBD”和“NTU RGBD 120”。 “NTU RGBD”包含60个动作类和56,880个视频样本…

傅里叶分析的历史背景

目录 1. Fourier级数(三角级数)的历史背景 2. 圆和复平面 3. Fourier的大胆猜想 1. Fourier级数(三角级数)的历史背景 自古以来,圆形一直是(现在仍然是)最简单的抽象理解形状。您只需要一个中心点和一个半径就可以了。圆上的所有点与圆心的距离都是固定…

无监督域适应 (UDA)(1)

一、定义 1、无监督域自适应 Unsupervised domain adaptation 经典机器学习假设训练集和测试集来自相同的分布。 然而,这个假设在现实世界的应用程序中可能并不总是成立,例如,数据来源不同。 这种情况下,域分布之间会存在差异…

进阶自定义类型——结构体,枚举,联合

本章重点: 1.结构体 1.1 结构体类型的声明 1.2 结构的自引用 1.3 结构体变量的定义和初始化 1.4 结构体内存对齐 1.5 结构体传参 1.6 结构体实现位段(位段的填充&可移植性) 2.枚举 2.1 枚举类型的定义 2.2 枚举的优点 2.3 枚举的使用 3.联合 3.1 联合类…

年月日计算器——操作符重载的应用(含完整代码,简洁)

前言&#xff1a;大家好&#xff0c;这里是YY&#xff1b;此篇博客主要是操作符重载的应用&#xff1b;包含【流插入&#xff0c;流提取】【>,<,>,<,】【&#xff0c;-&#xff0c;&#xff0c;-】【前置&#xff0c;后置&#xff0c;前置--&#xff0c;后置--】 P…

Goby 漏洞更新 | Weblogic Commons Collections 序列化代码执行漏洞(CVE-2015-4852)

漏洞名称&#xff1a;Weblogic Commons Collections 序列化代码执行漏洞&#xff08;CVE-2015-4852&#xff09; English Name&#xff1a;Weblogic Commons Collections serialization code execution vulnerability (CVE-2015-4852) CVSS core: 7.5 影响资产数&#xff1a…

Docker ELK 监控日志(附yml)

目录 一 安装docker-commpose 二 编写yml文件 2.1 docker配置文件 2.2 filebeat配置文件 2.3 kibana配置文件 三 运行启动 四 打开kibana 一 安装docker-commpose 可以看我之前的docker文章 二 编写yml文件 2.1 docker配置文件 使用的7.17.9版本 &#xff0c;请保…

linux 下 ps、sort、top 命令详解

1、 ps命令 作用&#xff1a;查看系统进程&#xff0c;比如正在运行的进程有哪些&#xff0c;什么时候开始运行的&#xff0c;哪个用户运行的&#xff0c;占用了多少资源。 参数&#xff1a; -e 显示所有进程 -f 显示所有字段&#xff08;UID&#xff0c;PPIP&#xff0c;C…

Redis学习——单机版安装

目录 1.解压 2.安装gcc 3.执行make命令 4.复制redis的配置文件到默认安装目录下 5.修改redis.conf文件 6.启动redis服务与客户端 7.查看redis进行是否启动 8.关闭redis服务 9.redis性能测试 注意&#xff1a;安装redis前要安装jdk。 1.解压 [rootlxm148 install]# t…

ubuntu卷积神经网络——图片数据集的制作以及制作好的数据集的使用

首先我事先准备好五分类的图片放在对应的文件夹&#xff0c;图片资源在我的gitee文件夹中链接如下&#xff1a;文件管理: 用于存各种数据https://gitee.com/xiaoxiaotai/file-management.git 里面有imgs目录和npy目录&#xff0c;imgs就是存放5分类的图片的目录&#xff0c;里面…

Lesson14 高级IO

前言 IO 等待 数据拷贝,比如read/recv,write/send只要在单位事件里,让等的比重减低,IO的效率就越高 五种IO模型 钓鱼小案例 阻塞式 阻塞式: 张三拿着一根鱼竿,一直在岸边钓鱼,期间一直盯着鱼竿,等待鱼上钩 非阻塞式轮询式 非阻塞式轮询式: 李四拿着一根鱼竿,在岸边钓鱼,期…

Weblogic RCE合集

文章目录 CVE-2023-21839(T3/IIOP JNDI注入)前言漏洞简单分析漏洞复现防护措施 CVE-2020-2551(RMI-IIOP RCE)漏洞简单分析漏洞复现防护措施 CVE-2017-3506(wls-wsat组件XMLDecoder反序列化漏洞)漏洞简单分析漏洞复现防护措施 CVE-2020-14882&CVE-2020-14883漏洞简单分析 CV…

2023.05.11 c高级 day3

编写一个名为myfirstshell.sh的脚本&#xff0c;它包括以下内容。 包含一段注释&#xff0c;列出您的姓名、脚本的名称和编写这个脚本的目的和当前用户说“hello 用户名”显示您的机器名 hostname显示上一级目录中的所有文件的列表显示变量PATH和HOME的值显示磁盘使用情况用id命…

算法修炼之练气篇——练气十五层

博主&#xff1a;命运之光 专栏&#xff1a;算法修炼之练气篇 前言&#xff1a;每天练习五道题&#xff0c;炼气篇大概会练习200道题左右&#xff0c;题目有C语言网上的题&#xff0c;也有洛谷上面的题&#xff0c;题目简单适合新手入门。&#xff08;代码都是命运之光自己写的…

来领略一下带头双向循环链表的风采吧

&#x1f349; 博客主页&#xff1a;阿博历练记 &#x1f4d6;文章专栏&#xff1a;数据结构与算法 &#x1f68d;代码仓库&#xff1a;阿博编程日记 &#x1f339;欢迎关注&#xff1a;欢迎友友们点赞收藏关注哦 文章目录 &#x1f344;前言&#x1f37c;双向循环链表&#x1…

Qt使用星空图作为窗口背景,点击键盘的WASD控制小飞机在上面移动。

事件函数的使用依托于Qt的事件机制&#xff0c;一个来自于外部事件的传递机制模型如下所示 信号槽虽然好用&#xff0c;但是无法包含所有的情况&#xff0c;事件函数可以起到对信号槽无法覆盖的一些时机进行补充&#xff0c;事件函数的使用无需连接。 常用的事件函数如下所示。…

设计模式5—抽象工厂模式

5.抽象工厂模式 概念 抽象工厂模式&#xff1a;提供一个创建一系列相关或相互依赖对象的接口&#xff0c;而无须指定他们具体的类。抽象工厂又称为Kit模式&#xff0c;属于对象创建型模式。 抽象工厂可以将统一产品族的单独工厂封装起来&#xff0c;在正常使用中&#xff0c…