使用 EMQX 和 eKuiper 进行 MQTT 流处理:快速教程

news2025/1/18 11:54:10

引言

MQTT 协议是一种专为物联网应用而设计的轻量级消息传输协议。它具有简单、开放、易于实现的特点,是物联网应用的理想选择。MQTT 数据以连续实时的方式进行传输,非常适合由流处理引擎进行处理。

EMQX 是一款大规模分布式物联网 MQTT Broker,能够高效、可靠地连接海量的物联网设备,并实时处理和分发消息和事件流数据。eKuiper 是一个开源的流处理引擎,可以对流数据进行过滤、转换和聚合等操作。

本文将向您展示如何使用 eKuiper 实时流处理引擎来处理来自 EMQX 的 MQTT 数据。

MQTT Stream Processing with EMQX and eKuiper

场景描述

假设我们有个 MQTT 主题 demo/sensor,用于在 EMQX 中接收温度和湿度数据。我们希望使用 eKuiper 订阅该主题,并用流处理技术对数据进行处理和分析。然后,我们可以根据分析结果,触发用户的 HTTP 服务,或者将结果保存到外部存储中。

EMQX

由于 EMQX 支持标准的 MQTT 协议,所以 eKuiper 可以连接到任何版本的 EMQX。在这里,我们使用 EMQX Cloud 提供的免费公共 MQTT Broker 进行测试:

集群集群地址监听端口
emqx1broker.emqx.io1883

eKuiper

eKuiper 可以部署在边缘或云端。我们可以使用 Docker 进行快速安装。

docker run -p 9081:9081 -d --name kuiper -e MQTT_SOURCE__DEFAULT__SERVER=tcp://broker.emqx.io:1883 lfedge/ekuiper:1.10.0

我们可以用这个命令拉取并运行 eKuiper 1.10.0 版本的 docker 镜像。我们将 REST API 端口设置为 9081,在本教程中,我们将使用 REST API 来管理 eKuiper。我们还通过环境变量把默认的 MQTT Broker 地址指向了 EMQX Cloud 集群。

如果您想使用其他方法安装 eKuiper,请查看安装指南。

EMQX ECP (EMQX Edge-to-Cloud Platform) 是专为云边协同而打造的高级 MQTT 平台。它提供了专业的 Web UI 让您可以方便地管理 eKuiper。在本教程中,您也可以使用 ECP 来管理 eKuiper。更多细节,请参考 ECP 文档。

配置 eKuiper 订阅 MQTT 数据流

MQTT 数据是一种无界的、连续的流式数据。在 eKuiper 中,我们使用流的概念来映射这种类型的数据。要处理 MQTT 数据,我们首先要创建一个流来描述数据。

我们用 eKuiper REST API 来创建一个流:

POST http://127.0.0.1:9081/streams
Content-Type: application/json

{
  "sql": "CREATE STREAM demoMqttStream (temperature FLOAT, humidity FLOAT) WITH (TYPE=\"mqtt\", DATASOURCE=\"demo/sensor\", FORMAT=\"json\", SHARED=\"true\")"
}

用 Postman 等 HTTP 客户端发送上面的请求,将创建一个名为 demoMqttStream 的流,它是 MQTT 类型的数据源。datasource 属性的值是 demo/sensor,表示订阅 MQTT 的 demo/sensor 主题。数据格式是 JSON。SHARED 选项表示这个流可以被所有规则共享。

注意

我们运行 eKuiper docker 容器时,MQTT Broker 地址默认是 tcp://broker.emqx.io:1883。如果您用的是别的 MQTT Broker,请在安装时换成您的 Broker 地址。

如果您想改变 MQTT Broker 地址或其他 MQTT 连接参数,如认证相关配置,可以修改 data/mqtt_souce.yaml 文件里的设置。

您可以用 +# 通配符订阅多个主题,在 datasource 属性里使用这些通配符。比如,demo/+ 是订阅所有以 demo/ 开头的主题。demo/# 是订阅所有以 demo/ 开头的主题和 demo/ 下的所有子主题。

流处理 MQTT 数据

在 eKuiper 中,我们用规则来定义流处理的工作流程。规则是 SQL 语句,它规定了数据处理的方式和处理后执行的动作。除了连续的数据处理,像 eKuiper 这样的流处理引擎还支持有状态处理。我们将演示两个流处理和有状态处理的例子。

有状态的报警规则

第一个流处理例子是监测温度和湿度数据,温度上升超过 0.5 或湿度上升超过 1 就触发报警。这要求处理引擎能够记住前一条数据的状态,并和当前数据比较。

假设我们有个 URL 为 http://yourhost/alert 的 HTTP webhook,用来接收报警数据。我们首先用下面的 HTTP 请求创建一个规则。

###
POST http://{{host}}/rules
Content-Type: application/json

{
  "id": "rule1",
  "sql": "SELECT temperature, humidity FROM demoMqttStream WHERE temperature - LAG(temperature) > 0.5 OR humidity - LAG(humidity) > 1",
  "actions": [{
    "rest": {
      "url": "http://yourhost/alert",
      "method": "post",
      "sendSingle": true
    }
  }]
}

上述请求创建了一个名为 rule1 的规则,该规则对应的 SQL 语句如下:

SELECT temperature, humidity 
FROM demoMqttStream 
WHERE 
  temperature - LAG(temperature) > 0.5 
  OR humidity - LAG(humidity) > 1

这个 SQL 从 demoMqttStream 里选出变化达到我们条件的温度和湿度数据。LAG 函数用来获取前一条数据。

actions 属性规定了规则触发后的动作。这里,我们用 rest 动作把数据发送到 http://yourhost/alert 。发送的是 SQL 筛选出的数据,以 JSON 格式发送。所以,发送的数据是这样的:

{
  "temperature": 25.5,
  "humidity": 60.5
}

测试规则

我们可以用 MQTTX 或者其他 MQTT 客户端来发布 MQTT 数据到 demo/sensor 主题。规则会处理这些数据。比如,我们发送以下数据到主题:

{"temperature": 25.5, "humidity": 60.5}
{"temperature": 26.1, "humidity": 62}
{"temperature": 25.9, "humidity": 62.1}
{"temperature": 26.5, "humidity": 62.3}

我们将在 HTTP 警报服务中收到下列数据:

{"temperature": 26.1, "humidity": 62}
{"temperature": 26.5, "humidity": 62.3}

这是因为只有第二和第四条消息,温度上升超 0.5 或湿度上升超 1。

时间窗口聚合规则

第二个例子是计算每分钟的平均温度和湿度,并把它发送回 EMQX。这涉及到一个经典的流处理概念,叫做时间窗口。我们可以用以下 HTTP 请求来创建一个规则。

###
POST http://{{host}}/rules
Content-Type: application/json

{
  "id": "rule2",
  "sql": "SELECT 
  trunc(avg(temperature), 2) as avg_temperature, trunc(avg(humidity), 2) as avg_humidity, window_end() as ts FROM demoMqttStream GROUP BY TumblingWindow(mi, 1)",
  "actions": [{
    "mqtt": {
      "server": "tcp://broker.emqx.io:1883",
      "topic": "result/aggregation",
      "sendSingle": true
    }
  }]
}

上述请求创建了一个名为 rule2 的规则,该规则对应的 SQL 语句如下:

SELECT 
  trunc(avg(temperature), 2) as avg_temperature, 
  trunc(avg(humidity), 2) as avg_humidity,
  window_end() as ts
FROM demoMqttStream
GROUP BY TumblingWindow(mi, 1)

这个 SQL 会选出每分钟的温度和湿度平均值。时间窗口在 GROUP BY 子句中用 TumblingWindow 定义。这种窗口类型把 MQTT 数据分成固定长度的窗口。在 SELECT 子句中,我们用聚合函数 avg 来计算时间窗口内温度和湿度的平均值。window_end() 函数用来获取时间窗口的结束时间,这样我们就能知道这些平均值对应的时间段。trunc 函数用来把平均值四舍五入到两位小数。

actions 属性规定了规则触发后的动作。这里,我们用 mqtt 动作发送数据到 EMQX 的 result/aggregation 主题。发送的是 SQL 筛选出的数据,以 JSON 格式发送。所以,发送到主题的数据是这样的:

{
  "avg_temperature": 25.5,
  "avg_humidity": 60.5,
  "ts": 1621419600000
}

测试规则

同样,我们可以用 MQTTX 或者其他 MQTT 客户端来发布 MQTT 数据到 demo/sensor 主题。规则会处理这些数据。比如,我们每 30 秒发送一条数据到主题,两分钟的数据如下所示:

{"temperature": 25.5, "humidity": 60.5}
{"temperature": 26.1, "humidity": 62}
{"temperature": 25.9, "humidity": 62.1}
{"temperature": 26.5, "humidity": 62.3}

我们将在 HTTP 警报服务中收到下列数据:

{"avg_temperature": 25.8, "avg_humidity": 61.25, "ts": 1621419600000}
{"avg_temperature": 26.2, "avg_humidity": 62.2, "ts": 1621419660000}

我们发送了两分钟的数据,所以得到了两个每分钟的平均值。

结语

在本教程中,我们学习了如何使用 eKuiper 处理 MQTT 数据。通过本教程,您能够:

  • 通过订阅 EMQX MQTT Broker 主题接收 MQTT 数据
  • 制定规则来处理 MQTT 数据
  • 将处理后的数据反馈给 EMQX Broker

我们用两个示例展示了 eKuiper 对 MQTT 数据的流处理能力。eKuiper 强大的流处理能力可以应用于多种流式数据源。欢迎您探索 eKuiper 的各种功能,构建实时高效的 MQTT 数据处理通道。

版权声明: 本文为 EMQ 原创,转载请注明出处。
原文链接:https://www.emqx.com/zh/blog/mqtt-stream-processing-with-emqx-and-ekuiper

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/755158.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

队列--C语言实现数据结构

本期带大家一起用C语言实现队列🌈🌈🌈 文章目录 1、队列的概念2、队列的操作流程3 、队列的结构4、队列的实现4.1 队列的结构设计4.2 队列的初始化4.3 入队4.4 判断队列是否为空4.5 出队4.6 获取队头数据4.7 获取队尾数据4.8 获取队列当中数据…

HTTP1和HTTP2和HTTP3的区别

超文本传输协议是一个简单的请求-响应协议,它通常运行在TCP之上。 目录 HTTP1.1: HTTP2 HTTP3 参考文献 HTTP1.1: 特点: 1.一条链接只能一次请求一次返回这样子来回。一般的我们浏览器会帮我们一次次请求和收到。…

安卓UI:SearchView

目录 一、SearchView介绍 二、常用方法 (一)、监听器: (二)、常用方法: (三)、其他常用方法 三、例子: MainActivity2 : ChatListAdapter : item_people_view: activity_main2: 运行结果…

043、TiDB特性_缓存表和分区表

针对于优化器在索引存在时依然使⽤全表扫描的情况下,使⽤缓存表和分区表是提升查询性能的有效⼿段。 缓存表 缓存表是将表的内容完全缓存到 TiDB Server 的内存中表的数据量不⼤,⼏乎不更改读取很频繁缓存控制: ALTER TABLE table_name CACHE|NOCACHE; # 使用tr…

【Ubuntu】安装docker-compose

要在Ubuntu上安装Docker Compose,可以按照以下步骤进行操作: 下载 Docker Compose 二进制文件: sudo curl -L "https://github.com/docker/compose/releases/latest/download/docker-compose-$(uname -s)-$(uname -m)" -o /usr/loc…

产业大模型刚开卷,京东跑进“最后半公里”

点击关注 文|姚 悦 编|王一粟 “京东一直在探索哪些产品、技术、场景可以真正把大模型用起来,在我们内部的场景中反复验证后,才决定在7月份对外发布,现在我们在零售、健康、物流、金融等业务场景里已经积累了一些经…

架构训练营:3-3设计备选方案与架构细化

3架构中期 什么是备选架构? 备选架构定义了系统可行的架构模式和技术选型 备选方案筛选过程 头脑风暴 :对可选技术进行排列组合,得到可能的方案 红线筛选:根据系统明确的约束和限定,一票否决某些方案(主要…

Java分布式项目常用技术栈简介

Spring-Cloud-Gateway : 微服务之前架设的网关服务,实现服务注册中的API请求路由,以及控制流速控制和熔断处理都是常用的架构手段,而这些功能Gateway天然支持 运用Spring Boot快速开发框架,构建项目工程;并结合Spring…

java错误:不支持发行版本5或java: 不再支持源选项 5。请使用 6 或更高版本的解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

Docker本地镜像发布到私有库

Docker Registry(Docker镜像仓库) 使用Docker Registry,可以创建私有或公共的镜像仓库,以存储Docker镜像。私有仓库可以用于存储公司内部的镜像,或者用于个人项目的镜像。公共仓库则会将发布的镜像分享到全世界。 1 …

【C++】string模拟实现

个人主页🍖:在肯德基吃麻辣烫 文章目录 前言一、string的成员变量二、string默认成员函数1.构造函数1.1 无参构造(默认构造)1.2 普通构造1.3无参构造和全缺省构造可以合并 浅拷贝和深拷贝2.拷贝构造3.赋值运算符重载4.析构函数 三、[]的下标访问和iterat…

P5 第二章 电阻电路的等效变换

1、电阻的Y形联结和△形联结的等效变换 可以发现电阻的Y形联结和△形联结可以刻画成下图模型: 如果Y形联结和△形联结i1,i2,i3都相等,则可以列公式解出R1,R2,R3之间的大小关系。 电路普遍存在对偶关系,可以将上图的电阻换成电导&#xff0c…

干货 | 一个漏洞利用工具仓库

0x00 Awesome-Exploit 一个漏洞证明/漏洞利用工具仓库 不定期更新 部分漏洞对应POC/EXP详情可参见以下仓库: https://github.com/Threekiii/Awesome-POC https://github.com/Threekiii/Vulhub-Reproduce 0x01 项目导航 ActiveMQ CVE-2015-5254 Apisix CVE-2…

el-upload实现上传文件夹(批量上传文件)

el-upload实现上传文件夹(批量上传文件)&#xff1a;关键代码在于 this.$refs.uploadFolder.$children[0].$refs.input.webkitdirectory true;//让el-upload支持上传文件夹 <template><div class"sg-body"><el-upload ref"uploadFolder" :…

【智能时代的颠覆】AI让物联网不再是物联网

自我介绍⛵ &#x1f4e3;我是秋说&#xff0c;研究人工智能、大数据等前沿技术&#xff0c;传递Java、Python等语言知识。 &#x1f649;主页链接&#xff1a;秋说的博客 &#x1f4c6; 学习专栏推荐&#xff1a;MySQL进阶之路、C刷题集、网络安全攻防姿势总结 欢迎点赞 &…

HTML5 WebSocket介绍与基本使用(解析服务端返回的二进制数据)

WebSocket基本介绍 WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。 WebSocket 使得客户端和服务器之间的数据交换变得更加简单&#xff0c;允许服务端主动向客户端推送数据。在 WebSocket API 中&#xff0c;浏览器和服务器只需要完成一次握手&a…

数据库系统 - 家庭教育平台设计开发

目录 1.绪论 1.1项目背景 1.2家庭教育平台的发展现状与优势 1.2.1国内外发展现状 1.2.2家庭教育平台的优势 2.需求分析 2.1可行性分析 2.1.1经济可行性 2.1.2 技术可行性 2.1.3操作可行性 2.2系统功能 2.2.1 家庭教育资源 2.2.2 家庭教育指导师 2.2.3家庭教育咨询…

H3C-Cloud Lab实验-静态路由配置实验

实验拓扑图&#xff1a; 接口IP地址规划&#xff1a; 实验需求&#xff1a; 1、理解静态路由的运行原理 2、掌握静态路由的配置 实验步骤&#xff1a; PC1和PC2的IP地址、子网掩码、网关 1、连接所有设备并重命名 2、R1&#xff0c;配置R1的接口IP地址&#xff0c;配置3.0、…

042、TiDB特性_系统表的使用

系统表存储位置 MySQL 存储TiDB 系统表 mysql.user 等 information_schmea提供了一种查看系统元数据的方法 与mysql兼容的表&#xff1a;tables、processlist、columns等自定义的表&#xff1a; cluster_config、cluster_hardware、tiflash_replica等等 metrics_schema: 基于P…

Java的数据结构-Map集合

文章目录 Map概述Map常用方法Map遍历元素的方法1.方法一&#xff1a;keySet()2.方法二&#xff1a;entrySet() Map概述 1、Map和collection没有继承关系2、Map集合以key和value的方式存储数据&#xff1a;键值对key和value都是引用数据类型。key和value都是存储对象的内存地址…