RocketMQ 之 IoT 消息解析:物联网需要什么样的消息技术?

news2025/1/9 15:21:31

作者:林清山(隆基)

前言:

从初代开源消息队列崛起,到 PC 互联网、移动互联网爆发式发展,再到如今 IoT、云计算、云原生引领了新的技术趋势,消息中间件的发展已经走过了 30 多个年头。

目前,消息中间件在国内许多行业的关键应用中扮演着至关重要的角色。随着数字化转型的深入,客户在使用消息技术的过程中往往同时涉及交叉场景,比如同时进行物联网消息、微服务消息的处理,同时进行应用集成、数据集成、实时分析等,企业需要为此维护多套消息系统,付出更多的资源成本和学习成本。

在这样的背景下,2022 年,RocketMQ 5.0 正式发布,相对于 RocketMQ 4.0,架构走向云原生化,并且覆盖了更多的业务场景。

物联网消息场景

我们先来了解一下物联网的场景是什么?消息在物联网里面有什么作用?

物联网肯定是最近几年最火的技术趋势之一,有大量的研究机构、行业报告都提出了物联网快速发展的态势:

首先,物联网设备规模爆发式增长,预测会在 2025 年达到 200 多亿台。

其次,物联网的数据规模快速增长,来自物联网的数据增速接近 28%,并且未来有 90% 以上的实时数据来自物联网场景。这也就意味着未来的实时流数据处理的数据类型会有大量物联网数据。

最后,边缘计算是一个重要的趋势,未来会有 75% 的数据在传统数据中心或者云环境之外来处理,这里的边缘指的是商店、工厂、火车等等这些离数据源更近的地方。由于物联网产生的数据规模很大,如果全部数据传输到云端处理,会面临难以承受的成本,应该充分利用边缘资源直接计算,再把高价值的计算结果传输云端;另一方面,在离用户近的地方计算直接响应,可以降低延迟,提升用户体验。

物联网的发展速度这么快,数据规模那么大,跟消息有什么关系呢?

我们通过这个图来看一下消息在物联网场景发挥的作用:

第一个作用是连接,承担通信的职责,支持设备和设备的通信,设备和云端应用的通信,比如传感器数据上报、云端指令下发等场景,作为支撑 IoT 的应用架构,连接云边端。

第二个作用是数据处理,物联网设备源源不断的产生数据流,有大量需要实时流处理的场景,比如设备维护,高温预警等等。基于 MQ 的事件流存储和流计算能力,可以构建物联网场景的数据架构。

图片

物联网消息技术

下面我们来看看在物联网场景里,对消息技术有什么诉求?

我们先从这个表格来对比,物联网消息技术跟之前讲过的经典消息技术的区别。

图片

经典的消息主要是为服务端系统提供发布订阅的能力,而物联网的消息技术是为物联网设备之间、设备和服务端之间提供发布订阅的能力。

我们来分别看一下各自场景的特点:

  • 经典消息场景

    消息 Broker、消息客户端都作为服务端系统的一部分,通常部署在 IDC 或者公共云环境中配置性能较高的服务器上,包括容器、虚拟机、物理机等形式。消息客户端和服务端通常部署在同一个机房,内网环境具有高带宽和稳定的网络质量。客户端数量通常与应用服务器数量相对应,规模较小,一般是数百到数千台服务器,只有超大型互联网公司才会达到百万级。从消息生产的角度来看,每个客户端的消息生产发送量一般对应到其业务的 TPS,能达到数百数千 TPS。在消息消费方面,通常采用集群消费,一个应用集群共享一个消费者 ID,共同分担该消费组的消息。每条消息的订阅比通常也不高,正常情况下不会超过 10 个。

  • IoT 消息场景

    很多条件都与经典消息场景不一样,甚至截然相反。IoT 的消息客户端通常是微型设备,其计算和存储资源都非常有限。消息服务端可能要部署在边缘环境中,使用的服务器配置也会比较低。另一方面,物联网设备通常通过公网连接,网络环境特别复杂,并且由于设备经常移动,有时会面临断网或处于弱网环境,网络质量差且不稳定。物联网场景中,消息客户端实例数对应到物联网设备数,可能达到亿级别,远远超过大型互联网公司的服务器数量。尽管每个设备的消息 TPS 不高,但是一条消息有可能同时被百万个设备接收,订阅比特别高。

RocketMQ - MQTT

由此可以看出,物联网需要的消息技术和经典的消息技术很不一样。接下来我们再来看,为了应对物联网的消息场景,RocketMQ 5.0 做了哪些事情?

RocketMQ 5.0 我们发布了一个子产品,叫做 RocketMQ - MQTT。它有三个技术特点:

第一,它采用标准的物联网协议 MQTT,该协议面向物联网弱网环境、低算力的特点设计,协议十分精简。它还提供丰富的特性,支持多种订阅模式,多种消息 QoS,比如“最多一次”、“最少一次”和“当且仅当一次”。其领域模型设计也是基于“消息、主题、发布订阅”等概念,与 RocketMQ 高度兼容,为构建一个云端一体化的 RocketMQ 产品形态奠定了坚实的基础。

第二,它采用存算分离的架构。RocketMQ Broker 作为存储层,MQTT 相关的领域逻辑都在 MQTT Proxy 层实现,并面向海量连接、订阅关系、实时推送进行深度优化,Proxy 层可以根据物联网业务负载提供独立的弹性扩展,例如增加连接数只需新增 Proxy 节点。

第三,它采用端云一体化的架构。因为领域模型接近,并且以 RocketMQ 作为存储层,每条消息只需存储一份,这份消息既能被物联网设备消费,也能被云端应用消费。另外,RocketMQ 本身是天然的流存储,流计算引擎可以无缝对 IoT 数据进行实时分析。

图片

接下来我们再从几个关键的技术点,来深入了解 RocketMQ 的物联网技术实现。

(一)IoT 消息存储模型

1. 读放大为主,写放大为辅

首先要解决的是物联网消息的存储模型,在发布订阅的业务模型里,一般会采用两种存储模型,一种是读放大,每条消息只写到一个公共队列,所有消费者读取这个共享队列,维护自己的消费位点;另外一种是写放大,每个消费者有自己的队列,每条消息都分发到目标消费者的队列中,消费者只读自己的队列。

因为在物联网场景里,一条消息可能会有百万级的设备消费,所以,很显然,选择读放大的模型能显著降低存储成本、提高性能。

但是,只选择读放大的模式没法完全满足要求,MQTT 协议有其特殊性,它的 Topic 是多级 Topic,且订阅方式既有精准订阅,也有通配符匹配订阅。比如家居场景,我们定义一个多级主题,如“家/浴室/温度”,有直接订阅完整多级主题的“家/浴室/温度”,也有采用通配符订阅只关注“温度”的,还有只关注一级主题为“家”的所有消息。

对于直接订阅完整的多级主题消费者可以采用读放大的方式直接读取对应多级主题的公共队列;而采用通配符订阅的消费者无法反推消息的 Topic,所以需要在消息存储时根据通配符的订阅关系多写一个通配符队列,这样消费者就可以根据其订阅的通配符队列读取消息。

这就是 RocketMQ 采用的读放大为主,写放大为辅的存储模型。

图片

2. 端云一体化存储

基于前文的分析,我们设计了 RocketMQ 端云一体化的存储模型,见下图。

图片

消息可以来自各个接入场景(如服务端的 RMQ/AMQP,设备端的 MQTT),但只会写一份存到 Commitlog 里面,然后分发出多个需求场景的队列索引,比如服务端场景(MQ/AMQP)可以按照一级 Topic 队列进行传统的服务端消费,设备端场景可以按照 MQTT 多级 Topic 以及通配符订阅进行消费消息。这样我们就可以基于同一套存储引擎,同时支持服务端应用集成和 IoT 场景的消息收发,达到端云一体化。

(二)队列规模问题

我们都知道像 Kafka 这样的消息队列每个 Topic 是独立文件,但是随着 Topic 增多,消息文件数量也增多,顺序写就退化成了随机写,性能明显下降。RocketMQ 在 Kafka 的基础上进行了改进,使用了一个 Commitlog 文件来保存所有的消息内容,再使用 CQ 索引文件来表示每个 Topic 里面的消息队列,因为 CQ 索引数据比较小,文件增多对 IO 影响要小很多,所以在队列数量上可以达到十万级。但是,这个终端设备队列的场景下,十万级的队列数量还是太小了,我们希望进一步提升一个数量级,达到百万级队列数量,所以,我们引入了 Rocksdb 引擎来进行 CQ 索引分发。

图片

面向 IoT 的百万级队列设计

Rocksdb 是一个广泛使用的单机 KV 存储引擎,有高性能的顺序写能力。因为我们有了 Commitlog 已具备了消息顺序流存储,所以可以去掉 Rocksdb 引擎里面的 WAL,基于 Rocksdb 来保存 CQ 索引。在分发的时候,我们使用了 Rocksdb 的 WriteBatch 原子特性,分发时把当前的 MaxPhyOffset 注入进去,因为 Rocksdb 能够保证原子存储,后续可以根据这个 MaxPhyOffset 来做 Recover 的 checkpoint。最后,我们也提供了一个 Compaction 的自定义实现,来进行 PhyOffset 的确认,以清理已删除的脏数据。

图片

(三)IoT 消息推送模型

介绍了底层的队列存储模型后,我们再详细描述一下上层的消息实时推送(匹配查找和可靠触达)是怎么做的?

在 RocketMQ 的经典消费模式里,消费者是直接采用长轮询的方式,从客户端直接发起请求,精确读取对应的 Topic 队列。而在 MQTT 场景里,因为客户端数量、订阅关系数量规模巨大,无法采用原来的长轮询模式,消费链路的实现更加复杂,所以,这里采用的是推拉结合的模型。

下图展示的是一个推拉模型,物联网终端设备通过 MQTT 协议连到 Proxy 节点。消息从服务端(MQ/AMQP/MQTT)发送过来,存到 Topic 队列后,会有一个 notify 逻辑模块来实时感知这个新消息到达,然后会生成消息事件(就是消息的 Topic 名称),把这个事件推送至 Proxy 节点,Proxy 节点根据它连上的终端设备订阅情况进行内部匹配,找到哪些终端设备能匹配上,然后会触发 pull 请求去存储层读取消息,再推送到终端设备。

一个重要问题,就是订阅关系的匹配查找。一般有两种方式:第一种,简单的广播事件;第二种,集中存储在线订阅关系(比如图里的 lookup 模块),然后进行匹配查找,再精准推送。

事件广播机制看起来有扩展性问题,但是其实性能并不差,因为我们推送的数据很小,就是 Topic 名称,而且相同 Topic 的消息事件可以攒批推送,RocketMQ 5.0 就是默认采用的这个方式。集中存储在线订阅关系,这个也是常见的一种做法,如保存到 RDS、Redis 等等,但要保证数据的实时一致性也是有难度的,而且要进行匹配查找对整个消息的实时链路 RT 开销也会有一定的影响。下图模型中可以看到,在 Proxy 节点还会引入一个 Cache 模块,用来做消息队列 Cache,避免在广播场景下每个终端设备都向存储层发起读数据的情况。

图片

总结

本文分三个部分深入探讨了 RocketMQ 5.0 关于物联网消息技术的应用与优化,第一部分概述一个典型的物联网技术架构,并重点阐述消息队列在此架构中的关键作用。第二部分,探讨了物联网场景对消息技术的特殊要求,并分析这些要求与服务端应用中的消息技术之间的差异。第三部分,深入介绍了 RocketMQ 5.0 的 MQTT 子产品,阐释其如何有效应对物联网领域的技术挑战。旨在为大家提供一个全面的视角,理解消息队列在物联网中的重要性及其解决方案。

图片

我们将持续为您带来深度剖析 RocketMQ 5.0 的系列文章,欢迎点击此处进入官网了解更多详情,也欢迎填写表单进行咨询:https://survey.aliyun.com/apps/zhiliao/bzT3AfPaq

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1579016.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

整理的微信小程序日历(单选/多选/筛选)

一、日历横向多选&#xff0c;支持单日、双日、三日、工作日等选择 效果图 wxml文件 <view class"calendar"><view class"section"><view class"title flex-box"><button bindtap"past">上一页</button&…

00-JAVA基础-JVM类加载机制及自定义类加载器

JVM 类加载机制 JVM类加载机制是Java运行时环境的核心部分&#xff0c;它负责将类的.class文件加载到JVM中&#xff0c;并将其转换为可以被JVM执行的数据结构。 类加载的整体流程 类加载的整体流程可以分为五个阶段&#xff1a;加载&#xff08;Loading&#xff09;、链接&a…

java数据结构与算法刷题-----LeetCode238. 除自身以外数组的乘积

java数据结构与算法刷题目录&#xff08;剑指Offer、LeetCode、ACM&#xff09;-----主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 文章目录 1. 动态规划&#xff1a;左右乘积列表2. 滚动数组对动态规划过程…

flutter组件_AlertDialog

官方说明&#xff1a;A Material Design alert dialog. 翻译&#xff1a;一个材料设计警告对话框。 作者释义&#xff1a;显示弹窗&#xff0c;类似于element ui中的Dialog组件。 AlertDialog的定义 const AlertDialog({super.key,this.icon,this.iconPadding,this.iconColor,t…

Cortex-M4架构

第一章 嵌入式系统概论 1.1 嵌入式系统概念 用于控制、监视或者辅助操作机器和设备的装置&#xff0c;是一种专用计算机系统。 更宽泛的定义&#xff1a;是在产品内部&#xff0c;具有特定功能的计算机系统。 1.2 嵌入式系统组成 硬件 ①处理器&#xff1a;CPU ②存储器…

分布式事务 - 个人笔记 @by_TWJ

目录 1. 传统事务1.1. 事务特征1.2. 事务隔离级别1.2.1. 表格展示1.2.2. oracle和mysql可支持的事务隔离级别 2. 分布式事务2.1. CAP指标2.2. BASE理论2.3. 7种常见的分布式事务方案2.3.1. 2PC2.3.2. 3PC2.3.3. TCC2.3.3.1. TCC的注意事项&#xff1a;2.3.3.2. TCC方案的优缺点…

【Java面试题】JVM(26道)

文章目录 JVM面试题基础1.什么是JVM&#xff1f;2.JVM的组织架构&#xff1f; 内存管理3.JVM的内存区域是什么&#xff1f;3.1堆3.2方法区3.3程序计数器3.4Java虚拟机栈3.5本地方法栈 4.堆和栈的区别是什么&#xff1f;5.JDK1.6、1.7、1.8内存区域的变化&#xff1f;6.内存泄露…

mynet开源库

1.介绍 个人实现的c开源网络库&#xff0e; 2.软件架构 1.结构图 2.基于event的自动分发机制 3.多优先级分发队列&#xff0c;延迟分发队列 内部event服务于通知机制的优先级为0&#xff0c;外部event优先级为1&#xff0e; 当集中处理分发的event_callback时&#xff0c…

鸿蒙ArkUI声明式学习:【UI资源管理】

OpenHarmony 应用的资源分类和资源的访问以及应用开发使用的像素单位以及各单位之间相互转换的方法。 资源分类 移动端应用开发常用到的资源比如图片&#xff0c;音视频&#xff0c;字符串等都有固定的存放目录&#xff0c;OpenHarmony 把这些应用的资源文件统一放在 resourc…

线程的666种状态

文章目录 在Java中&#xff0c;线程有以下六种状态&#xff1a; NEW&#xff1a;新建状态&#xff0c;表示线程对象已经被创建但还未启动。RUNNABLE&#xff1a;可运行状态&#xff0c;表示线程处于就绪状态&#xff0c;等待系统分配CPU资源执行。BLOCKED&#xff1a;阻塞状态…

Centos Docker Oracle11g 密码过期修改

症状&#xff1a; Centos Oracle11g环境变量配置 如果没有配置环境变量&#xff0c;需要先配置Oracle环境变量&#xff0c;否则执行sqlplus时会提示&#xff1a;SP2-0750: You may need to set ORACLE_HOME to your Oracle software directory 配置方法&#xff1a; 第一步&a…

《系统架构设计师教程(第2版)》第8章-系统质量属性与架构评估-03-ATAM方法架构评估实践(下)

文章目录 3. 测试阶段3.1 头脑风暴和优先场景&#xff08;第7步&#xff09;3.1.1 理论部分3.1.2 示例 3.2 分析架构方法&#xff08;第8步&#xff09;3.2.1 调查架构方法1&#xff09;安全性2&#xff09;性能 3.2.2 创建分析问题3.2.3 分析问题的答案胡佛架构银行体系结构 3…

初学ELK - elk部署

一、简介 ELK是3个开源软件组合&#xff0c;分别是 Elasticsearch &#xff0c;Logstash&#xff0c;Kibana Elasticsearch &#xff1a;是个开源分布式搜索引擎&#xff0c;提供搜集、分析、存储数据三大功能。它的特点有&#xff1a;分布式&#xff0c;零配置&#xff0c;自…

开源免费的多功能PDF工具箱

它支持修改PDF、编辑PDF书签、导出PDF书签、导入书签、生成、合并、拆分、提取页面内容、提取图片、OCR 功能介绍: 修改PDF信息&#xff1a;修改文档属性、页码编号、页面链接、页面尺寸&#xff1b;删除自动打开网页等动作&#xff0c;去除复制及打印限制&#xff1b;设置阅读…

ios swift5 “Sign in with Apple“(使用苹果登录)怎样接入(第三方登录)集成AppleID登录

文章目录 截图1.在开发者网站的app id中添加Sign in with Apple功能2.在Xcode中添加Sign in with Apple功能3.代码&#xff1a;只有第一次登录的时候可以获取到用户名参考博客chatGPT答案 截图 1.在开发者网站的app id中添加Sign in with Apple功能 1.1 如果你新建app id,记得在…

OpenHarmony实战:物联网解决方案之芯海cst85芯片移植案例

本文介绍基于芯海cst85芯片的cst85_wblink开发板移植OpenHarmony LiteOS-M轻量系统的移植案例。 开发了Wi-Fi连接样例和XTS测试样例&#xff0c;同时实现了wifi_lite, lwip, startup, utils, xts, hdf等部件基于OpenHarmony LiteOS-M内核的适配。 移植架构上采用Board和Soc分…

流式密集视频字幕

流式密集视频字幕 摘要1 IntroductionRelated Work3 Streaming Dense Video Captioning Streaming Dense Video Captioning 摘要 对于一个密集视频字幕生成模型&#xff0c;预测在视频中时间上定位的字幕&#xff0c;理想情况下应该能够处理长的输入视频&#xff0c;预测丰富、…

GEE:样本点的样式设置

作者:CSDN @ _养乐多_ 本文将介绍在Google Earth Engine (GEE)平台上为样本点设置样式的方法和代码,样本点可以设置成任何颜色,以及7种形状,以便更直观了解数据的分布和特征。 文章目录 一、统一设置样式1.1 示例代码1.2 示例代码链接二、每一类一个样式2.1 示例代码2.2…

React - 你使用过高阶组件吗

难度级别:初级及以上 提问概率:55% 高阶组件并不能单纯的说它是一个函数,或是一个组件,在React中,函数也可以做为一种组件。而高阶组件就是将一个组件做为入参,被传入一个函数或者组件中,经过一定的加工处理,最终再返回一个组件的组合…

【C语言】结构体、枚举、联合(自定义类型)

文章目录 前言一、结构体1.结构体的声明2.结构体的自引用3.结构体变量的定义和初始化4.结构体成员的访问5.结构体内存对齐&#xff08;重点&#xff09;6.#pragma修改默认对齐数7.结构体传参 二、位段1.位段的声明2.位段的内存分配3.位段的跨平台问题 三、枚举四、联合 &#x…