使用eKuiper进行实时流计算

news2024/9/28 5:23:11

运行环境 centos7

官网下载rpm包 并安装
https://ekuiper.org/zh/downloads-and-install?version=1.13.6&os=Linux&oslable=Linux

ekuiper流计算的方式类似flink table api那一套 熟悉的比较快能上手

首先配置mqtt source源
通过内置的yaml配置

vim /etc/kuiper/mqtt_source.yaml
#Global MQTT configurations
default:
  qos: 0
  server: "tcp://127.0.0.1:1883"
  protocolVersion: "3.1.1"
  insecureSkipVerify: false
  #decompression: zlib
  username: admin
  password: public
  #certificationPath: /var/kuiper/xyz-certificate.pem
  #privateKeyPath: /var/kuiper/xyz-private.pem.key
  #rootCaPath: /var/kuiper/xyz-rootca.pem
  #insecureSkipVerify: false
  #connectionSelector: mqtt.mqtt_conf1
  #kubeedgeVersion: 
  #kubeedgeModelFile: ""

# demo_conf: #Conf_key
#   qos: 0
#   server: "tcp://10.211.55.6:1883"

创建一个流(类似flink创建一个table)

/usr/bin/kuiper create stream demo '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="demo")'

DATASOURCE配置的是要读取的topic 其实还有个type属性 用来指定源类型 不配置默认是mqtt 所以此处source为mqtt

流创建以后 是惰性的 在没有执行sql查询 或者启动一个规则之前 流是不会运作的 此时demo流还没有开始读取demo topic

执行一个sql 看看效果

/usr/bin/kuiper query
kuiper > select count(*), avg(humidity) as avg_hum, max(humidity) as max_hum from demo where temperature > 30 group by TUMBLINGWINDOW(ss, 30);
Query was submit successfully.

这个sql的意思是 通过每30s的滚动窗口 统计30度以上的平均湿度和最高湿度

往mqtt demo topic推送消息

{ "temperature": 35,"humidity":60 }
{ "temperature": 35,"humidity":70 }

可以看到控制台打印出流计算的结果

kuiper > [{}]
[{"avg_hum":65,"count":2,"max_hum":70}]

此时已经完成了一个流计算 ctrl+c退出query

配置一个规则 把流计算结果输出到sink
首先配置一个规则文件 rule1.txt

vim /usr/ekuiper/rule1.txt
{
    "sql": "SELECT avg(humidity) as avg_hum,format_time(tstamp(),'YYYY-MM-dd HH:mm:ss') as current_time from demo where temperature > 30 group by TUMBLINGWINDOW(ss, 30)",
    "actions": [
        {
            "log": {}
        },
        {
            "mqtt": {
                "server": "tcp://127.0.0.1:1883",
                "qos": 0,
                "username": "admin",
                "password": "public",
                "topic": "demoSink"
            }
        }
    ]
}

action里面配置的就是sink 该规则就是把流计算结果发送到demoSink topic

创建规则

/usr/bin/kuiper create rule rule1 -f /usr/ekuiper/rule1.txt

规则创建出来以后 默认就是运行状态

查看规则的状态

/usr/bin/kuiper getstatus rule rule1
{
  "status": "running",
  "lastStartTimestamp": "1720686733949",
  "lastStopTimestamp": "0",
  "nextStopTimestamp": "0",
  "source_demo_0_records_in_total": 5,
  "source_demo_0_records_out_total": 5,
  "source_demo_0_messages_processed_total": 5,
  "source_demo_0_process_latency_us": 9,
  "source_demo_0_buffer_length": 0,
  "source_demo_0_last_invocation": "2024-07-11T17:49:41.150715",
  "source_demo_0_exceptions_total": 0,
  "source_demo_0_last_exception": "",
  "source_demo_0_last_exception_time": 0,
...

往demo topic里推送数据 再消费demoSink topic
可以看到demoSink里面已经有数据了

数据推送

{ "temperature": 35,"humidity":50 }
{ "temperature": 35,"humidity":60 }
{ "temperature": 35,"humidity":70 }

消费demoSink

[{"avg_hum":60,"current_time":"2024-07-11 18:11:30"}]

至此实现了一个完整的流计算

eKuiper是物联网领域的流计算框架
文档地址 https://ekuiper.org/docs/zh/latest/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2172450.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

检查一个CentOS服务器的配置的常用命令

在CentOS系统中,查看服务器配置的常用命令非常丰富,这些命令可以帮助用户快速了解服务器的硬件信息、系统状态以及网络配置等。以下是一些常用的命令及其简要说明: 1. 查看CPU信息 (1) cat /proc/cpuinfo:显示CPU的详细信息&…

【YashanDB知识库】如何dump数据文件,转换rowid, 查询对应内容

本文来自YashanDB官网,具体内容可见https://www.yashandb.com/newsinfo/7459464.html?templateId1718516 问题现象 客户环境有时候会遇到文件损坏的情况,需要dump文件,根据rowid查询数据情况。 问题的风险及影响 熟练掌握崖山数据文件du…

ROS理论与实践学习笔记——2 ROS通信机制之通信机制实践

5.1 话题发布 需求描述:编码实现乌龟运动控制,让小乌龟做圆周运动。 实现分析: ①乌龟运动控制实现,关键节点有两个,一个是乌龟运动显示节点 turtlesim_node,另一个是控制节点,二者是订阅发布模…

公交换乘C++

题目: 样例解释: 样例#1: 第一条记录,在第 3 分钟花费 10 元乘坐地铁。 第二条记录,在第 46 分钟乘坐公交车,可以使用第一条记录中乘坐地铁获得的优惠票,因此没有花费。 第三条记录,…

基于微信小程序的智慧社区的设计与实现

博主介绍: ✌我是阿龙,一名专注于Java技术领域的程序员,全网拥有10W粉丝。作为CSDN特邀作者、博客专家、新星计划导师,我在计算机毕业设计开发方面积累了丰富的经验。同时,我也是掘金、华为云、阿里云、InfoQ等平台…

docker拉取镜像失败

docker拉取镜像失败 错误提示检查linux服务器是否开通防火墙开放端口重启防火墙查看已开放的端口 修改配置文件 错误提示 检查linux服务器是否开通防火墙 firewall-cmd --staterunning表示防火墙正在运行,显示not running表示未运行,使用以下命令开启防…

vite 底层解析

vite 目前大多数框架的前端构建工具都已经被vite取代,相信你已经使用过vite了。可是在使用过程中,vite对我来说一直是模糊的,现在就来一探究竟,为啥它更好? 接下来我将为从以下几点出发,究其原理 一、原生…

基于大数据技术的智慧居家养老服务平台

作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码 精品专栏:Java精选实战项目…

Redis实战篇-短信登入

Redis实战篇-短信登入 该笔记是来源于黑马程序员的Redis项目课程,为了后续方便复习。将笔记记录在博客之中 实战篇我们要学习一些什么样的内容 1.本期任务 短信登录 使用redis共享session来实现 商户查询缓存 理解缓存击穿,缓存穿透,缓存雪崩等问题 …

基于冲突动态监测算法的健身房预约管理系统

系统展示 用户前台界面 管理员后台界面 系统背景 随着健身热潮的兴起,健身房管理面临着日益增长的会员需求与资源分配的挑战。传统的人工预约方式不仅效率低下,且容易出现时间冲突和资源浪费的情况。为了解决这一问题,基于冲突动态监测算法的…

【CSS/HTML】CSS实现两列布局,一列固定宽度,一列宽度自适应方法

文章目录 1.固定宽度区浮动,自适应区不设宽度而设置 margin2.float与margin配合使用3.固定宽度区使用绝对定位,自适应区设置margin4.使用display:table实现 不管是左是右,反正就是一边宽度固定,一边宽度自适应。 博客的很多主题也…

Python学习(3):画散点图和箱线图

1. 散点图(matplotlib库) 1.1 代码示例 import matplotlib.pyplot as plt# 准备数据 x [1, 2, 3, 4, 5] y [2, 4, 6, 8, 10]# 绘制散点图 plt.scatter(x, y)# 添加标题和标签 plt.title("散点图示例") plt.xlabel("X 轴") plt.y…

Android PopupWindow.showAsDropDown报错:BadTokenException: Unable to add window

Android PopupWindow.showAsDropDown报错:BadTokenException: Unable to add window Android PopupWindow.showAsDropDown报错: android.view.WindowManager$BadTokenException: Unable to add window -- token null is not valid; is your activity ru…

【华为HCIP实战课程一】OSPF相关基础介绍及基础配置,网络工程师必修

一、OSPF介绍 开放式最短路径优先协议OSPF(Open Shortest Path First),IPv4使用的OSPFv2,针对IPv6使用OSPFv3协议。 二、为什么需要OSPF OSPF出现之前,网络广泛使用RIP路由协议,RIP由于最大16跳数限制无法适应大型网络,RIP是基于距离矢量算法的路由协议,应用在大型网…

MySQL使用FROM_UNIXTIME转换时间戳timestamp无效问题原因

问题点在于timestamp的长度,检查下存储在数据库里的时间戳的数据格式及长度。 MySQL的FROM_UNIXTIME函数默认处理的是10位的时间戳,不是10位就会出现无效的情况,但是数据库并不会进行异常提示。 一般情况下,普遍遇到的是10位或者…

VSCode调试Electron

使用vscode来调试electron主进程,实现断点调试、监视变量,跟踪代码执行,极大地提高开发效率。 在vscode代码编辑器中左侧找到运行或调试 上方下拉框添加配置 点击添加配置后,会在根目录的.vscode目录下存在launch.json文件&am…

阿里云部署1Panel(失败版)

官网脚本部署不成功 这个不怪1panel,这个是阿里Linux 拉不到docker的下载源,懒得思考 正常部署直接打开官网 https://1panel.cn/docs/installation/online_installation/ 但是我使用的阿里云os(Alibaba Cloud Linux 3.2104 LTS 64位) 我执行不管用啊装不上docker 很烦 curl -s…

Android中使用RecyclerView制作横向轮播列表及索引点

在Android开发中,RecyclerView是一个非常强大的组件,用于展示列表数据。它不仅支持垂直滚动,还能通过配置不同的LayoutManager实现横向滚动,非常适合用于制作轮播图或横向列表。本文将详细介绍如何使用RecyclerView在Android应用中…

【中间件——基于消息中间件的分布式系统的架构】

1. 基于消息中间件的分布式系统的架构 从上图中可以看出来,消息中间件的是 1:利用可靠的消息传递机制进行系统和系统直接的通讯 2:通过提供消息传递和消息的排队机制,它可以在分布式系统环境下扩展进程间的通讯。 1.1 消息中间件…

PaddleOCR 表格识别,docker部署,cpu版本

前置环境 centeros7 docker 拉取镜像 docker pull registry.baidubce.com/paddlepaddle/paddle:2.6.1 参考:开始使用_飞桨-源于产业实践的开源深度学习平台 这里拉取的镜像并不能立马用,只是内置好运行环境 随便找个目录下载paddleocr的代码 git…