eKuiper 源码解读:从一条 SQL 到流处理任务的旅程

news2025/1/17 3:17:04

概述

LF Edge eKuiper 是 Golang 实现的轻量级物联网边缘分析、流式处理开源软件,可以运行在各类资源受限的边缘设备上。eKuiper 的主要目标是在边缘端提供一个流媒体软件框架。其规则引擎允许用户提供基于SQL 或基于图形(类似于 Node-RED)的规则,在几分钟内创建物联网边缘分析应用。

本文中,我们将以源码为脉络,阐述一条 SQL 从被 eKuiper 接收后,是如何从一条文本变成一个可执行的处理过程。通过本文,你可以了解到以下内容:

  1. 一个 SQL 计算引擎基本的处理流程
  2. eKuiper 在每个处理流程中的具体代码节点

准备

为了更加直观地了解到 eKuiper 内部的代码运行逻辑,在讲解 eKuiper 规则引擎的处理过程中,我们会涉及到 eKuiper 中的一部分代码,并对其中的关键部分进行较为详细的讲解。

为了更好地理解之后的内容,你需要了解:

  1. eKuiper 项目:https://github.com/lf-edge/ekuiper
  2. Golang 的基础用法

框架

从 eKuiper 接收到 SQL 的文本,到最终根据这个 SQL 的语义去做出相应的读取与计算工作。eKuiper 内部的 SQL 计算引擎在其中承担了解析、构造、优化与运行这总共 4 部分工作,即我们之后将重点关注 SQL 处理过程中的以下几个环节:

  1. SQL Parser 将 SQL 文本转换为 AST 对象
  2. 基于 AST 对象生成逻辑计划
  3. 优化逻辑计划并生成执行算子
  4. 运行执行算子,开始读取数据与计算并最终将结果写入到下游

从 SQL 文本到执行算子树

从这一节开始,我们将开始根据 eKuiper 中的代码节点,来理解一条 SQL 文本是如何一步步被最终转换为一个可以被实际执行的算子树。

以下代码实际展示了 eKuiper 代码中解析文本、优化计划、构造执行算子这几个处理流程,我们将一一进行展开了解。

func PlanSQLWithSourcesAndSinks(rule *api.Rule, sources []*node.SourceNode, sinks []*node.SinkNode) (*topo.Topo, error) {
    sql := rule.Sql
    conf.Log.Infof("Init rule with options %+v", rule.Options)
    stmt, err := xsql.GetStatementFromSql(sql)
    if err != nil {
       return nil, err
    }

    ......
    // Create logical plan and optimize. Logical plans are a linked list
    lp, err := createLogicalPlan(stmt, rule.Options, store)
    if err != nil {
       return nil, err
    }
    tp, err := createTopo(rule, lp, sources, sinks, streamsFromStmt)
    if err != nil {
       return nil, err
    }
    return tp, nil
}

解析 SQL 文本

通过以下函数,我们将一个 SQL 文本解析为了 AST 对象

func GetStatementFromSql(sql string) (*ast.SelectStatement, error) {

本文中我们暂时先不涉及 SQL 解析器中的具体实现细节,相关内容将在之后的 eKuiper 源码阅读中进行讲解。感兴趣的朋友可以通过以下函数作为入口进行了解:

func (p *Parser) Parse() (*ast.SelectStatement, error) {

值得一提的是,在 SQL Parser的具体实现上,也有一些其他 well-known 的数据库实现使用了 yacc 的方案来直接生成 SQL Parser。eKuiper 之所以选择自己实现 SQL Parser,一个非常重要的原因是对于一个运行在边缘端的应用而言,binary size 是一个非常重要的指标。自己实现 SQL Parser 而非使用 yacc 这类的 Parser Generator 的技术,有助于控制和降低 eKuiper 编译后整体的 binary size 的大小。

构造与优化逻辑计划

当 SQL 文本还解析为 AST 对象后,我们需要将该 AST 对象转换为一个可以用来描述该 SQL 应当被计算引擎如何执行的逻辑计划。这一步骤被封装在了以下代码函数入口中:

func createLogicalPlan(stmt *ast.SelectStatement, opt *api.RuleOption, store kv.KeyValue) (LogicalPlan, error) {

在 createLogicalPlan 函数中,它接收一个 AST 树对象,并返还一个逻辑计划树对象,在整个函数过程中,它总共做了以下 3 件事情:

  1. 抽取 SQL 中的各类信息,并将其与实际的表达式或者是 schema 信息进行绑定。
  2. 根据 AST 对象构造最初的逻辑计划。
  3. 根据最初的逻辑计划进行逻辑优化。

在一条 SQL 中,它所带的信息里包含了一些原本注册计算引擎中的信息,比如流、表的定义,也包含了一些临时声明的信息,比如列或者表达式的 alias name。在以下代码函数入口中,eKuiper 会从 AST 树对象中抽取出以下信息,并进行响应的绑定:

func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]streamInfo, []*ast.Call, error) {
  1. 从 AST 树对象中抽取出流与表的 AST 对象,并从 eKuiper 的存储中取出预先设置好的流、表的定义,并将这些 schema 信息绑定到 ast 对象中。
  2. 从 AST 对象中将查询中的 filed 与各个流、表进行绑定

当我们处理好 AST 树对象中的各个节点的信息绑定后,我们就可以根据 AST 树对象来构造一个最初的逻辑计划。以下代码显示了在 eKuiper 中是如何根据自底向上的构建逻辑计划。从最底层的 DataSource 算子,一路向上 build 逻辑算子,直至整个逻辑算子树构造完毕。

func createLogicalPlan(stmt *ast.SelectStatement, opt *api.RuleOption, store kv.KeyValue) (LogicalPlan, error) {
// 1. build Datasource
// 2. build Window
// 3. Buld JoinAlign / Join
// 4. Build Filter
// 5. Build Agg
// 6. Build Having
// 7. Build Sort
// 8. Build Proj
}

当我们获得了最原始的逻辑计划树以后,我们需要对逻辑计划进行逻辑优化。逻辑优化阶段会对原本的计划进行优化。逻辑优化阶段,简单来说就是对一个逻辑算子树进行等价的变换,这个变换并不会影响最终的计算结果,但是可以让计算过程减少更多不必要的计算量。

举一个简单的例子,对于 select * from t1 join t2 on [t1.](http://t1.id)a = t2.a where t1.b > 10 这条 SQL 来说,其原本的逻辑计划如下:

原本的逻辑计划

然后在逻辑优化阶段,我们可以将 Filter 算子进行下推至 Join 算子之下,从而让参与 Join 算子的数据量被提前过滤一部分,来减少整个计算过程中所涉及到的计算量。

优化后的逻辑

以下代码展示了 eKuiper 中是如何进行逻辑优化的:

var optRuleList = []logicalOptRule{
    &columnPruner{},
    &predicatePushDown{},
}

func optimize(p LogicalPlan) (LogicalPlan, error) {
    var err error
    for _, rule := range optRuleList {
        p, err = rule.optimize(p)
        if err != nil {
            return nil, err
        }
    }
    return p, err
}

在随后的系列当中我们会比较详细地介绍目前 eKuiper 中的逻辑优化环节中的代码细节。

当我们的逻辑计划优化完毕以后,我们需要根据逻辑计划来构造具体的执行算子。在 eKuiper 中,我们通过 Topo 结构来维护整个执行算子的上下文环境。

以下代码展示了构建执行算子的函数入口:

func createTopo(rule *api.Rule, lp LogicalPlan, sources []*node.SourceNode, sinks []*node.SinkNode, streamsFromStmt []string) (*topo.Topo, error) {
type Topo struct {
   ......
   sources            []node.DataSourceNode
   sinks              []*node.SinkNode
   ops                []node.OperatorNode
   ......
}

Topo 作为执行算子 Context,会将逻辑计划中的 DataSource 算子放在 sources 中,将其他算子放在 ops 中,而最终的 SQL 结果会汇总到 sinks 中。 在这里我们重点关注算子是如何构造的:

以下代码展示了 eKuiper 中是如何根据逻辑算子构造执行算子的:

func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []*node.SourceNode, streamsFromStmt []string, index int) (api.Emitter, int, error) {
    var inputs []api.Emitter
    newIndex := index
    for _, c := range lp.Children() {
       input, ni, err := buildOps(c, tp, options, sources, streamsFromStmt, newIndex)
       .......
    }
    ......
    switch t := lp.(type) {
    case *DataSourcePlan:
       isSchemaless := t.isSchemaless
       switch t.streamStmt.StreamType {
       case ast.TypeStream:
          ......
          op = srcNode
    ......
    case *ProjectPlan:
       op = Transform(&operator.ProjectOp{ColNames: t.colNames, AliasNames: t.aliasNames, AliasFields: t.aliasFields, ExprFields: t.exprFields, IsAggregate: t.isAggregate, AllWildcard: t.allWildcard, WildcardEmitters: t.wildcardEmitters, ExprNames: t.exprNames, SendMeta: t.sendMeta}, fmt.Sprintf("%d_project", newIndex), options)
    default:
       return nil, 0, fmt.Errorf("unknown logical plan %v", t)
    }
    ......
    if onode, ok := op.(node.OperatorNode); ok {
       tp.AddOperator(inputs, onode)
    }
    return op, newIndex, nil
}

在构造算子的过程中,我们主要关注 2 个问题:

  1. buildOps 是如何遍历整个逻辑算子树,将每个逻辑算子转换为执行算子
  2. buildOps 是如何串联起整个执行算子的树形结构,将下层算子的 Ouput 结果传递给上层算子的 Input 来源。

在 buildOps 过程中,通过递归的方式,以自底向上的方式遍历整个逻辑算子树来构造执行算子。当下层算子构造完毕以后,我们在以下代码中会将下层算子的 Ouput 作为结果参数传递给上层算子的构造过程中,将下层算子的 Output 和上层算子的 Input 连接起来

if onode, ok := op.(node.OperatorNode); ok {
   tp.AddOperator(inputs, onode)
}

当执行算子树被创建完毕以后,我们会将顶层算子的 Output 和这条 SQL 的 sink 连接起来,从而使得 eKuiper 会将 SQL 计算的结果写入到下游的 sink 中。

func createTopo(rule *api.Rule, lp LogicalPlan, sources []*node.SourceNode, sinks []*node.SinkNode, streamsFromStmt []string) (*topo.Topo, error) {
    ......
    input, _, err := buildOps(lp, tp, rule.Options, sources, streamsFromStmt, 0)
    if err != nil {
        return nil, err
    }
    inputs := []api.Emitter{input}
    ......
    for _, sink := range sinks {
        tp.AddSink(inputs, sink)
    }
    ......
    return tp, nil
}

启动执行算子树

当执行算子树被构造完毕后,我们就需要启动执行算子树来真正执行这条 SQL,在以下的代码中展示了 eKuiper 启动执行算子的代码入口:

func (s *Topo) Open() <-chan error {
    ......
    for _, snk := range s.sinks {
       snk.Open(s.ctx.WithMeta(s.name, snk.GetName(), s.store), s.drain)
    }

    //apply operators, if err bail
    for _, op := range s.ops {
       op.Exec(s.ctx.WithMeta(s.name, op.GetName(), s.store), s.drain)
    }

    // open source, if err bail
    for _, source := range s.sources {
       source.Open(s.ctx.WithMeta(s.name, source.GetName(), s.store), s.drain)
    }
    .......
}

我们会以 sink / 执行算子 / source 的顺序,开始启动每个环节的算子。在这里,我们以单个算子运行为例,来了解执行算子的运行过程中的大致逻辑。

在以下的代码中展示了,对于单个算子而言,是如何读取下层算子的数据,进行计算,然后交付给上层算子进行处理。

func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
    ......   
    for {
       select {
       // process incoming item
       case item := <-o.input:
          ......
          result := o.op.Apply(exeCtx, item, fv, afv)
          switch val := result.(type) {
          default:
             .......
             o.Broadcast(val)
          }
       // is cancelling
       case <-ctx.Done():
          return
       }
    }
}

每个执行算子会从自己的 input channel 中取出下层算子交付的数据,对于 UnaryOperator 而言,会通过 Apply 行为来将数据进行计算,将计算后的结果通过 Broadcast 转交给上层算子进行处理。

总结

在本篇文章中,我们以梳理关键代码节点的方式了解了 eKuiper 的 SQL 计算引擎中是如何解析、处理,并最终执行这条 SQL 得到相应的结果。对于整个计算引擎关键处理节点里,我们了解了每个环节的代码大致是如何运行的。

在后续的分享中,我们将以具体 SQL 为例,深入到各个环节、算子的内部执行的代码逻辑,从而让大家更好地理解 eKuiper 是如何在边缘端接受数据、处理计算并最终写入下游的整体流程。敬请期待。

版权声明: 本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.com/zh/blog/ekuiper-source-code-interpretation

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/532870.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

权威硬核认证|数说故事携手IDEA共创学术论文获NLP国际顶会 ACL 2023收录

日前&#xff0c;数说故事携手IDEA共创的学术论文——《A Unified One-Step Solution for Aspect Sentiment Quad Prediction (一个统一的单步情感四元组识别方法) 》被国际学术顶会 ACL 2023 接收为 Findings长文。这是继上一年IDEA数说故事实验室论文获「国际AI顶会IJCAI-ECA…

加密解密软件VMProtect教程(六):主窗口之控制面板“项目”部分(1)

VMProtect 是保护应用程序代码免遭分析和破解的可靠工具&#xff0c;但只有在正确构建应用程序内保护机制并且没有可能破坏整个保护的典型错误的情况下才能最有效地使用。 接下来为大家介绍关于VMProtect主窗口中的控制面板&#xff0c;其中包括&#xff1a;“项目”部分、“功…

AD20 原理图设计流程

Altium Designer 20 的原理图设计大致可以分为9个步骤&#xff1a; &#xff08;1&#xff09;新建原理图。这是原理图设计的第一步。 &#xff08;2&#xff09;图纸设置。图纸设置就是要设置图纸的大小&#xff0c;方向等信息。图纸设置要根据电路图的内容和标准化来进行。…

教你几分钟玩转.ipynb文件

找代码的时候最不喜欢遇到.ipynb文件&#xff0c;因为要打开jupyter&#xff0c;作为懒癌患者&#xff0c;即使电脑安装了jupyter也很少去用。不知道有没有人和我一样&#xff0c;真的很不喜欢在终端开一个程序&#xff0c;不能关的那种。 今天又遇到.ipynb文件&#xff0c;这…

我是如何利用midjourney制作表情包的

起初是在看到大厂文章《【Midjourney教程】设计麻瓜也能10分钟上架一套表情包》以后&#xff0c;才想自己试试的。如果你是midjourney的老鸟了&#xff0c;那么参照着文章&#xff0c;应该也能很顺利的完成。下面我介绍下&#xff0c;我遇到的问题和解决方案 准备&#xff1a;…

Tesseract.js离线识别图片中的文字

从官网下载Tesseract.js的离线版本 https://github.com/jeromewu/tesseract.js-offline 初始化 解压下载文件使用cmd命令行进入解压的文件夹&#xff08;tesseract.js-offline-master&#xff09;&#xff0c;使用命令下载安装相关包npm install下载安装完成后&#xff0c;该…

看懂二维码识别OCR:从算法到API 接入代码

引言 二维码识别OCR&#xff08;Optical Character Recognition&#xff09;是结合了图像处理和OCR技术&#xff0c;以识别和提取二维码中的信息的技术&#xff0c;二维码识别OCR 可以实现对图像中的二维码进行自动检测和解码&#xff0c;并将其内容提取为可编辑的文本&#x…

腾讯云 Serverless Stable Diffusion 应用免费名额限量放送,试用申请开启!

近半年&#xff0c;AIGC 领域惊喜接踵而至。除了 Chatgpt&#xff0c;在AI绘图方面 Stable Diffusion 也大放异彩。网上的教程五花八门&#xff0c;有很多小伙伴根本不知如何下手&#xff0c;苦不堪言。 现在腾讯云 Serverless Stable Diffusion 应用免费名额限量放送&#xf…

阿里P6测试总监分享,这份《接口自动化测试》总结,让我成功入门接口自动化测试...

昨晚在某个测试交流群&#xff0c;听了一个测试老司机分享接口自动化测试的内容&#xff0c;对接口自动化有了更深的一些认识&#xff0c;也为接下来公司的接口自动化实施&#xff0c;提供了更多的思路。 这篇文章&#xff0c;就说说功能测试到接口自动化的进阶&#xff0c;以…

( 位运算 ) 318. 最大单词长度乘积 ——【Leetcode每日一题】

❓318. 最大单词长度乘积 难度&#xff1a;中等 给你一个字符串数组 words &#xff0c;找出并返回 length(words[i]) * length(words[j]) 的最大值&#xff0c;并且这两个单词不含有公共字母。如果不存在这样的两个单词&#xff0c;返回 0 。 示例 1&#xff1a; 输入&…

sqlmap对dvwa靶场的账号密码进行破解

1.进行靶场搭建 准备两台虚拟机 靶机&#xff1a;win7 攻击机&#xff1a;kali linux win7IP 172.26.0.130kali linuxIP 172.26.0.129 虚拟机搭建好后,相互ping能ping同就行 安装xampp XAMPP Installers and Downloads for Apache FriendsXAMPP is an easy to install…

数字化赋能,探索智慧银行建设的最佳实践

导语 | 数字经济时代&#xff0c;数字化已成为银行业转型升级的战略手段。近年来&#xff0c;商业银行纷纷加大对信息科技的投入&#xff0c;数字化在改变银行业务模式的同时&#xff0c;更是构建起了数字金融新生态。今天&#xff0c;我们特邀腾讯云 TVP 行业大使、舜源科技合…

值传递、引用传递

​​​​​辟谣时间 错误理解一&#xff1a;值传递和引用传递&#xff0c;区分的条件是传递的内容&#xff0c;如果是个值&#xff0c;就是值传递。如果是个引用&#xff0c;就是引用传递。 错误理解二&#xff1a;Java是引用传递。 错误理解三&#xff1a;传递的参数如果是普通…

国内有哪些SAAS软件?SAAS软件有哪些优点?

国内有哪些SAAS软件&#xff1f;SAAS软件有哪些优点&#xff1f;不请自来答一下&#xff0c;通过SaaS软件与传统软件的对比来详细讲下SaaS软件有哪些优点&#xff1f; 配合以下内容食用更佳&#xff1a; 关于概念——深度详解什么是SaaS&#xff08;软件即服务&#xff09;关…

项目报告:turtle画小猪佩奇

目录 项目&#xff1a;一、项目思路二、项目实战1. 导入模块2. 创建画布3. 绘制鼻子4. 绘制猪头5. 绘制耳朵6. 绘制眼睛7. 绘制脸8. 绘制嘴9. 绘制身体10.绘制手11.绘制脚12.绘制尾巴 三、项目展示 总结&#xff1a; 项目&#xff1a; ​ 我们做的项目是小猪佩奇绘画的一个项目…

农场农庄偷菜卖菜h5多端流量主小程序开发

农场农庄偷菜卖菜h5多端流量主小程序开发 种菜&#xff0c;收菜&#xff0c;偷菜&#xff0c;卖菜&#xff09;玩法。 功能&#xff1a;动态背包&#xff0c;动态排行榜&#xff0c;定时收获&#xff0c;广告组件接入&#xff0c;背景音乐&#xff0c;按钮点击声音接入&#x…

多线程概念,常用接口与多进程之间的比较

多线程概念&#xff0c;常用接口与多进程之间的比较 多线程概念与常用接口多线程概念与相对于线程的区别什么是多线程&#xff08;概念&#xff09;进程和线程的区别在Linux系统下&#xff0c;进程和线程的区别如下&#xff1a;多进程和多线程优缺点比较&#xff1a;在多任务处…

国产仪器 1612A无线信道仿真器

1612A无线信道仿真器是一款专门的无线信道仿真设备&#xff0c;可准确实时仿真复杂的无线信道特征&#xff0c;包含路径损耗、延迟、多径衰落以及噪声等&#xff0c;重现真实的信号传播环境&#xff0c;用于对比测试及反复测试&#xff0c;加快问题的发现及解决的过程。本产品突…

canvas学习笔记

其实还有react还没有学&#xff0c;但是公司技术栈里面有canvas&#xff0c;所以先系统学习一下canvas 一、canvas 简介 ​<canvas> 是 HTML5 新增的&#xff0c;一个可以使用脚本(通常为 JavaScript) 在其中绘制图像的 HTML 元素。它可以用来制作照片集或者制作简单(也…

微服务之服务间通信:关于Feign的练习demo

一、主要流程&#xff1a; 创建两个最基础的springboot项目调用方引入Feign的依赖在调用方服务项目中创建agent接口类&#xff0c;类使用FeignClient注解&#xff0c;注解重点配置url&#xff08;即被调用方服务所在的地址ip端口号&#xff09;、写接口方法等。在具体业务代码…