使用 LF Edge eKuiper 将物联网流处理数据写入 Databend

news2024/11/18 9:40:23

作者:韩山杰

Databend Cloud 研发工程师

https://github.com/hantmac

LF Edge eKuiper

LF Edge eKuiper 是 Golang 实现的轻量级物联网边缘分析、流式处理开源软件,可以运行在各类资源受限的边缘设备上。eKuiper 的主要目标是在边缘端提供一个流媒体软件框架(类似于 Apache Flink (opens new window))。eKuiper 的规则引擎允许用户提供基于 SQL 或基于图形(类似于 Node-RED)的规则,在几分钟内创建物联网边缘分析应用。具体介绍可以参考 [LF Edge eKuiper - 超轻量物联网边缘流处理软件(https://ekuiper.org/docs/zh/latest/)。 

Databend Sql Sink

eKuiper 支持通过 Golang 或者 Python 在源 (Source)SQL 函数目标 (Sink) 三个方面的扩展,通过支持不同的 Sink,允许用户将分析结果发送到不同的扩展系统中。Databend 作为 Sink 也被集成到了 eKuiper plugin 当中,下面通过一个案例来展示如何使用 eKuiper 将物联网流处理数据写入 Databend。

编译 eKuiper 和 Databend Sql Plugin

eKuiper

git clone https://github.com/lf-edge/ekuiper & cd ekuiper
make

Databend Sql Plugin

go build -trimpath --buildmode=plugin -tags databend -o plugins/sinks/Sql.so extensions/sinks/sql/sql.go

编译后的 sink plugin 拷贝到 build 目录:

cp plugins/sinks/Sql.so _build/kuiper-1.11.1-18-g42d9147f-darwin-arm64/plugins/sinks

Databend 建表

在 Databend 中先创建目标表 ekuiper_test:

create table ekuiper_test (name string,size bigint,id bigint);

启动 eKuiperd

cd _build/kuiper-1.11.1-18-g42d9147f-darwin-arm64 
./bin/kuiperd

服务正常启动:

创建流(stream) 和 规则 (rule)

eKuiper 提供了两种管理各种流、规则,目标端的方式,一种是通过 ekuiper-manager 的 [docker image](https://hub.docker.com/r/lfedge/ekuiper) 启动可视化管理界面,一种是通过 CLI 工具来管理。这里我们使用 CLI。

创建 stream

流是 eKuiper 中数据源连接器的运行形式。它必须指定一个源类型来定义如何连接到外部资源。这里我们创建一个流,从 json 文件数据源中获取数据,并发送到 eKuiper 中。

首先配置文件数据源,连接器的配置文件位于 /etc/sources/file.yaml

default:
  # 文件的类型,支持 json, csv 和 lines
  fileType: json
  # 文件以 eKuiper 为根目录的目录或文件的绝对路径。
  # 请勿在此处包含文件名。文件名应在流数据源中定义
  path: data
  # 读取文件的时间间隔,单位为ms。如果只读取一次,则将其设置为 0
  interval: 0
  # 读取后,两条数据发送的间隔时间
  sendInterval: 0
  # 是否并行读取目录中的文件
  parallel: false
  # 文件读取后的操作
  # 0: 文件保持不变
  # 1: 删除文件
  # 2: 移动文件到 moveTo 定义的位置
  actionAfterRead: 0
  # 移动文件的位置, 仅用于 actionAfterRead 为 2 的情况
  moveTo: /tmp/kuiper/moved
  # 是否包含文件头,多用于 csv。若为 true,则第一行解析为文件头。
  hasHeader: false
  # 定义文件的列。如果定义了文件头,该选项将被覆盖。
  # columns: [id, name]
  # 忽略开头多少行的内容。
  ignoreStartLines: 0
  # 忽略结尾多少行的内容。最后的空行不计算在内。
  ignoreEndLines: 0
  # 使用指定的压缩方法解压缩文件。现在支持`gzip`、`zstd` 方法。
  decompression: ""

使用 CLI 创建 steam 名为 stream1:

./bin/kuiper create stream stream1 '(id BIGINT, name STRING,size BIGINT) WITH (DATASOURCE="test.json", FORMAT="json", TYPE="file");'

Json 文件的内容为:

[
  {"id": 1,"size":100, "name": "John Doe"},
  {"id": 2,"size":200, "name": "Jane Smith"},
  {"id": 3,"size":300, "name": "Kobe Brant"},
  {"id": 4,"size":400, "name": "Alen Iverson"}
]

创建 Databend Sink Rule

一个规则代表了一个流处理流程,定义了从将数据输入流的数据源到各种处理逻辑,再到将数据输入到外部系统的动作。eKuiper 有两种方法来定义规则的业务逻辑。要么使用 SQL / 动作组合,要么使用新增加的图 API。

这里我们通过指定 sql 和 actions 属性,以声明的方式定义规则的业务逻辑。其中,sql 定义了针对预定义流运行的 SQL 查询,这将转换数据。然后,输出的数据可以通过 action 路由到多个位置。

规则由 JSON 定义,下面是准备创建的规则 myRule.json:

{
  "id": "myRule",
  "sql": "SELECT id, name from stream1",
  "actions": [
    {
      "log": {
      },
      "sql": {
        "url": "databend://databend:databend@localhost:8000/default?sslmode=disable",
        "table": "ekuiper_test",
        "fields": ["id","name"]
      }
    }
  ]
}

执行 CLI 创建规则:

./bin/kuiper create rule myRule -f myRule.json

可以查看所创建规则的运行状态:

./bin/kuiper getstatus rule myRule

规则创建后,会立即将符合规则条件的数据发送到目标端,此时我们查看 Databend 的 ekuiper_test 表,可以看到文件数据源中的数据已经被写入到 Databend:

可以看到由于我们的规则 SQL 中只指定了 idname 字段,所以这里只有这两个字段被写入。

结论

eKuiper 是 EMQ 旗下的一款流处理软件,其体积小、功能强大,在工业物联网、车辆网、公共数据分析等很多场景中得到广泛使用。本文介绍如何使用 eKuiper 将物联网流处理数据写入 Databend。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1106081.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

怎样正确做 Web 应用的压力测试?

Web应用,通俗来讲就是一个网站,主要依托于浏览器来访问其功能。 那怎么正确做网站的压力测试呢? 提到压力测试,我们想到的是服务端压力测试,其实这是片面的,完整的压力测试包含服务端压力测试和前端压力测…

hue实现对hiveserver2 的负载均衡

如果你使用的是CDH集群那就很是方便的 在Cloudera Manager中,进入HDFS Service 进入Instances标签页面,点击Add Role Instances按钮,如下图所示 点击Continue按钮,如下图所示 返回Instances页面,选择HttpFS角色…

Jmeter测试添加凭证和导出压测结果

选中测试计划中的HTTP请求,右键-->添加配置元件-->HTTP信息头管理器,在窗口中添加 如果是post请求,还需在信息头管理器中添加Content-Type:application/json 导出聚合报告

数学建模——最大流问题(配合例子说明)

目录 一、最大流有关的概念 例1 1、容量网络的定义 2、符号设置 3、建立模型 3.1 每条边的容量限制 3.2 平衡条件 3.3 网络的总流量 4、网络最大流数学模型 5、计算 二、最小费用流 例2 【符号说明】 【建立模型】 (1)各条边的流量限制 &a…

(Python)使用Matplotlib将x轴移动到绘图顶部

移动前: 我们有两种方法可以实现这个目标: import warnings warnings.filterwarnings(ignore)import numpy as np import matplotlib.pyplot as pltcolumn_labels list(ABCD) row_labels list(WXYZ)data np.random.rand(4, 4)fig, ax plt.subplots(…

手写商用Java虚拟机HotSpot,疯狂磨砺技术中

在当前Java行业激烈竞争的形式下,唯有掌握技术,心中才不能慌。在多年前,我就开始苦练底层技术,但是眼看百遍也不如手过一遍,所以我打算把虚拟机的精华实现部分用手敲出来,这个过程注定不会轻松,…

基于springboot的学生宿舍管理系统(源码+LW+调试)

项目描述 临近学期结束,还是毕业设计,你还在做java程序网络编程,期末作业,老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。这里根据疫情当下,你想解决的问…

副业兼职做什么好呢?七个线上兼职线下副业可供选择

生活节奏的加快,人们的生活压力也与日俱增。为了缓解压力、增加收入,许多人都开始寻找副业兼职的机会。副业兼职不仅可以帮助我们应对经济困境,更可以为我们的生活注入新的乐趣和意义。但是在众多的副业兼职选择中,该如何找到适合…

(1)攻防世界web-Training-WWW-Robots

1.开启环境,查看网页 翻译一下 2.前往robots.txt 命令:http://61.147.171.105:57663/robots.txt 3.前往fl0g.php 命令:http://61.147.171.105:57663/fl0g.php 4.得到flag cyberpeace{92ec1ef9b6d900100399093b9ae9e386}

python烟花代码

下面是一个用Python编写的简单烟花特效代码,使用了Pygame库来实现图形显示。请确保你已经安装了Pygame库,如果没有安装,可以使用pip install pygame来安装。 import pygame import random# 初始化Pygame pygame.init()# 屏幕大小 width, hei…

热点报告 | 解压经济成为新风向,素人改造踩中用户痛点

您是否曾有以下困惑?打开小红书首页推荐,似乎已经被算法教育成了成熟的信息茧房,想要找到下一个热点,又忧虑一叶以障目;看着搜索框热词,又担心无法掌握热词背后的话题命脉,难以在浮光掠影中寻找…

【学位论文】GB/T 7714-2015引用的快捷操作方法

GB/T 7714-2015《信息与文献参考文献著录规则》于2015年12月1日开始实施,成为了目前国内主流的学位论文引用格式之一。本文介绍一种比较方便简单的引用方法。 7714示例: [1] He K, Gkioxari G, Dollr P, et al. Mask r-cnn[C]//Proceedings of the IEEE …

web安全之XSS攻击

什么是XSS攻击 XSS(Cross-Site Scripting)又称跨站脚本,XSS的重点不在于跨站点,而是在于脚本的执行。XSS是一种经常出现在 Web 应用程序中的计算机安全漏洞,是由于 Web 应用程序对用户的输入过滤不足而产生的。 常见…

基于react18+arco+zustand通用后台管理系统React18Admin

React-Arco-Admin轻量级后台管理系统解决方案 基于vite4构建react18后台项目ReactAdmin。使用了reactarco-designzustandbizcharts等技术架构非凡后台管理框架。支持 dark/light主题、i18n国际化、动态路由鉴权、3种经典布局、tabs路由标签 等功能。 技术框架 编辑器&#xff…

pip install huggingface_hub时报错

pip install huggingface_hub时报错: 可以尝试:pip install --upgrade huggingface_hub 进行安装 方法参考了:https://blog.csdn.net/m0_72295867/article/details/132060750

vue.js - 断开发送的请求,解决接口重复请求数据错误问题(vue中axios多次相同请求中断上一个)

描述 进入页面时第一个接口还在请求,立即切换tab请求第二个接口。但是第二个接口响应比第一个接口响应快,页面展示的时第一个接口的数据,如图: 解决方法 判断如果是相同的接

谷歌浏览器跨域及--disable-web-security无效解决办法

谷歌浏览器跨域设置 (1)创建一个目录,例如我在C盘创建MyChromeDevUserData文件夹 (2) 在桌面选择谷歌浏览器右键 -> 属性 -> 快捷方式 -> 目标,添加--disable-web-security --user-data-dirC:\M…

第六届物业管理创新发展论坛在深召开,鹏业受邀参加并发表主题演讲

10月12日至14日,由中国物业管理协会(以下简称“中物协”)主办,物业管理行业唯一报备商务部的国际性展会——2023中国国际物业管理产业博览会(以下简称“物博会”)在深圳会展中心隆重举行。 本届物博会同期还…

Docker是一个流行的容器化平台,用于构建、部署和运行应用程序。

文章目录 Web应用程序数据库服务器微服务应用开发环境持续集成和持续部署 (CI/CD)应用程序依赖项云原生应用程序研究和教育 🎈个人主页:程序员 小侯 🎐CSDN新晋作者 🎉欢迎 👍点赞✍评论⭐收藏 ✨收录专栏:…

基于PHP的创意设计分享系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序(小蔡coding) 代码参考数据库参考源码获取 前言 💗博主介绍:✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…