Flink CEP 新特性进展与在实时风控场景的落地

news2024/11/22 19:30:38

摘要:本文整理自阿里云开发工程师耿飙&阿里云开发工程师胡俊涛,在 FFA 实时风控专场的分享。本篇内容主要分为四个部分:

  1. Flink CEP 介绍&新功能解读

  2. 动态多规则支持与 Demo

  3. Flink CEP SQL 语法增强

  4. 未来规划

■ 分享中的动态 CEP 和 CEP SQL 新功能目前已在阿里云实时计算 Flink 版上线支持。

Tips:点击「阅读原文」查看原文视频&演讲 ppt

01

Flink CEP 介绍&新功能解读

1.1 什么是 Flink CEP

0b1be5cb066787fa73826ac5e0d6a650.jpeg

CEP 是复杂事件处理 Complex Event Processing 的缩写,而 Flink CEP 则是基于 Flink 实现的复杂事件处理库,它可以识别出数据流中符合特定模式(Pattern)的事件序列,并允许用户作出针对性处理。

下面我们举个例子,如上图所示,假设我们对模式 A、B、B、C 感兴趣,它代表我们想要找到这样的事件序列:A 类事件发生后,发生了两次 B 类事件,又发生一次 C 类事件。注意,这里我们并不要求事件之间是严格连续的。

当我们使用 Flink CEP 开发了相关代码并跑起作业后,遇到 d1、a1、b1、b2、d2、c1 的事件流,Flink CEP 就能找到其中的 a1、b1、b2、c1 这一次匹配,之后用户就可以在作业中针对这次匹配做出处理,比如发送报警到下游系统等。

1.2 Flink CEP 应用场景

148ae895c21fc8d7cbb3a096d9ba38f9.jpeg

在实际场景中,Flink CEP 基于 Flink 的分布式特性、毫秒级处理延迟以及自身丰富的规则表达能力有非常多的应用。我们这里展示三个典型场景:

  • 第一个场景,实时风控。Flink CEP 可以运用到风险用户识别中,例如读取并分析客户行为日志,将 5 分钟内转账次数超过 10 次且金额大于 10000 的客户识别为异常用户。

  • 第二个场景,实时营销。Flink CEP 可以用来做营销策略的优化,比如检测用户行为日志,从而在电商大促时,找到“10 分钟内,在购物车中添加超过 3 次的商品,但最终没有付款”的用户,针对性的调整营销策略。同样,在实时营销的反作弊场景中,我们也可以使用 Flink CEP。

  • 第三个场景,物联网。Flink CEP 可以用于检测异常状态并发出告警,比如共享单车被骑出指定区域,且 15 分钟内没有回到指定区域时发出风险提示。如果和物联网传感器结合,还可以用于检测工业生产中的流水线异常。比如检测到三个时间周期内,温度传感器都反馈温度超过设置阈值,就发布报警等等。

1.3 Flink CEP 在 1.16 的改进

48a84da6ef362b48928b1a505fc128d4.jpeg

在 1.16 版本中,Flink CEP 主要包含四个改进。

  • FLINK-27392:支持在 Pattern 内的相邻事件之间定义时间窗口。之前 Flink CEP 的时间窗口只能定义到整个复合 Pattern 中,这个改进则允许在两个相邻的子 Pattern 之间也定义时间窗口,提高了灵活性,之后会有个例子详细介绍这个改进。

  • FLINK-26941:支持在带有窗口的 Patten 中以 notFollowedBy 结尾。1.16 之前 notFollowedBy 语法只能出现在 Pattern 中间,现在我们允许在定义时间窗口的前提下,把 notFollowedBy 放到 Pattern 的结尾。

  • FLINK-24865:支持批模式下使用 MATCH_RECOGNIZE。1.16 支持在批模式下使用 Flink SQL 的 MATCH_RECOGNIZE 语法,进而调用 Flink CEP 的能力。

  • FLINK-23890:优化 Timer 创建策略。它改进了 Flink CEP 实际运行中定时器的创建策略,降低了 CPU 的消耗。

接下来举一个简单的例子来演示 1.16 的新特性给用户带来的好处。

046ec810568ed38940db254670406360.jpeg

在营销场景中,我们希望用户在领取品类优惠券,并添加对应品类商品后,马上下单付款。如果没有付款,我们会采取一些针对性的措施。把刚才的描述细化成一个具体的营销场景,也就是寻找大促当天在领取优惠券后的五分钟内,向购物车中添加了商品,但最终没有结账付款的用户。找到这些用户就可以让下游业务方进行用户分析,或者采取营销措施(例如实时发送提醒消息等)。

针对该场景,1.16 后我们就可以写出上图中的 Pattern。首先起始判断的条件是领取了优惠券,具体判断优惠券领取的逻辑,我们写在 StartCondition 对应的代码中。中间的 Pattern 是 addItem,它对应着添加商品到购物车,具体的判断逻辑我们写在 MiddleCondition 代码中。

注意,这里我们在相邻的子 Pattern 之间定义了 Within 时间窗口,类型为 REVIOUS_AND_CURRENT,它表示只有在领取优惠券事件发生后的五分钟内,发生的添加商品事件,才会被纳入这次模式匹配的考虑中。

最后以 notFollowedBy 结尾,后面是付款 Pattern,并且定义整个付款 Pattern 的时间窗口是一天。可以看到整个 Flink CEP 的 Pattern 写起来更轻松,表达能力也更强。

02

动态多规则的设计与云上实践

2.1 动态规则支持:背景

b7681a80492d60e499bfb5b34b3ff7a9.jpeg

在介绍我们为什么需要动态规则更新前,先看一下右边的图,明确一下规则究竟包含哪些要素。我们认为 Flink CEP 中的规则(即 Pattern)是由阈值、条件、事实三部分组成的。下面我们以“五分钟内通过广告链接访问某商品超过五次,但最终没有购买”为例来介绍这三个要素。

阈值指的是超过五次中的“五”;事实指的是规则所针对的动作,比如通过广告链接访问某商品等;而条件则是用来描述如何过滤符合要求的动作。比如超过五次中的“超过”。

明晰规则的组成要素后,我们也更容易理解为什么需要支持规则动态更新。在实际生产中,频繁变化的现实场景要求我们对规则的内容,进行修改或者添加。

比如有一个 CEP 作业会在某个用户在一分钟内连续进行某操作超过 10 次后将其认为是风险用户。但在流量暴增或者举行某些活动的时候,这个阈值被改为 20 或者 30 次才更合适。现有的条件下想要更新规则,我们只能重新编写 Java 代码,再重启作业来使最新规则生效。

这样做时间成本高、延迟敏感的作业很难接受,除此之外,如果规则的时间窗口较长,状态又比较大的话,重启作业的代价会更高,因此我们需要支持动态规则更新。

要做到这一点,我们有两个关键问题需要解决。

第一,如何让 Flink 作业不停机加载新规则。第二,如何解决规则(Pattern)的序列化与反序列化。第二个问题本质上是由第一个问题衍生而来的。如果想让作业不停机加载,作业就必须从某个地方拿到我们传给它的 Pattern,并生成对应的 Pattern 对象在作业中使用。

针对上述两个问题,有一些现有的解决方案,比如通过修改 CepOperator 添加注入规则的接口,来实现不停机加载,以及基于 Groovy 引擎动态生成 Pattern 对象,解决序列化问题。但我们也发现,这两个方案其实都有一些不足。

比如第一个方案,通常情况下,规则都会存储在数据库中,而典型的对 CepOperator 的修改,则是让 CepOperator 直接和数据库交互,拉取最新规则。这样当 CEP 作业并发较多的时候,每个 sub_task 都要去连接数据库,这会给数据库带来额外的压力,并且更大的问题是,不同的 sub_task 拉取到的规则一致性难以保证。除此之外,这种修改通常只支持修改单条规则,不容易拓展到多规则的场景。

对于第二个方案,使用 Groovy 引擎动态生成 Pattern 对象也有自己的缺点。比如它的表达能力有限,一般只能结合 Aviator 动态修改阈值,很难改变规则整体的逻辑。并且 Groovy 是一个较重的引擎,它生成规则的时间也相对较长。

2.2 动态规则支持:设计

a3e24c4af0510291498cfe6b977c52e7.jpeg

基于以上提到的背景和问题,阿里的同学在社区内提出了 FLIP-200,并在阿里云实时计算 Flink 云产品中按照 FLIP-200 实现了一版 CEP 中动态多规则的支持。下面我将详细介绍我们是如何实现的,以及如何解决刚才提到的那些问题。

首先我们新增了 PatternProcessor 接口,用于完整的定义 CEP 中的一条规则。PatternProcessor 包含 getId,getVersion 等用于获取该 Pattern 唯一标识符的方法;getTimestamp 等用于获得时间戳,进行调度的方法;getPattern 对象用于拿到 PatternProcessor 所内嵌的规则;PatternProcessorFunction 用于描述如何处理找到的匹配。除此之外,为了功能的完整性,我们还添加了 PatternProcessorDiscoverer 和 PatternProcessorManager,用于描述如何发现和管理 Processor。

c6a9614fa87a52f0935a8172839ea5c9.jpeg

下面介绍一下动态规则支持的整体架构。首先要提一下 Flink 的 OperatorCoordinator 机制,顾名思义它负责协调 Flink 作业中的各个 operator。OperatorCoordinator 自身运行在 JobManager 中,可以给 TM 的 Operator 发送事件,之前主要在 Source 和 Sink 中使用,用于发现和分配读写的 Workload。

在 CEP 中我们也复用了这一机制,实现了 DynamicCEPOperatorCoordinator,它是 JobManager 中运行的线程,负责调用 PatternProcessorDiscoverer 接口拿到最新的 Pattern。

上图左侧展示的是从数据库中读取序列化后的 PatternProcessor 的过程。可以看到 Operator 拿到 PatternProcessor 后,会发送给和它关联的 DynamicCEPOperator。DynamicCEPOperator 接收到发送的事件并进行解析与反序列化,最终生成要使用的 PatternProcessor 并构造对应的 NFA,用于处理上游发送的事件并输出到下游。

另外注意一下,这里允许一个 CepOperator 里有多个 NFA 对应多个 PatternProcessor,这样可以比较好的支持多规则。

基于这样的方式,我们就可以做到不停机的更新规则内容,且只有 OperatorCoordinator 会和外部规则数据库交互,可以有效减少对数据库的访问,并保证了各个下游 sub_task 中使用规则的一致性。

2.3 动态规则支持:(de)serialization

692705a803f8d165162efd4879dc0273.jpeg

接下来我们来思考下 Pattern 的抽象。Pattern 本质上是描述了规则匹配时用到的 NFA 的状态转换图,即根据输入事件如何从一个状态转移到另一个状态,直到终态为止。

有了这样的观察后,我们就可以稍微做一些简化。比如将一个复合 Pattern 看成一个图,节点是每个子 Pattern,边则对应事件选择策略,即如何从一个子 Pattern 的匹配转移到另一个子 Pattern 的匹配。而每个图也可以看作是一个更大的图的子节点,这样我们就允许了模式的嵌套。

那么我们该如何描述这个图呢?我们定义的格式有如下几个设计原则。

  • 有完整的表达能力,能对标完整的 Java API。

  • 方便序列化和反序列化,最好能依赖于一个常见的格式。

  • 易于拓展,方便集成。当用户或者平台方新增一个字段或者一个类型的时候要足够方便,方便被更上层平台修改和使用。

  • 可读可编辑,方便策略人员在可视化页面理解与编辑规则内容。

根据这些原则,最终我们选择了基于 JSON 定义一套描述 Pattern 的规范。下面用一个简单的例子来展示我们定义的 JSON 格式。

93aad94819408f2513b80e751147c2f5.jpeg

上图左侧是我们用 Java API 定义的示例 Pattern,当满足 StartCondition 的事件出现大于等于三次之后,如果跟着一个满足 EndtCondition 的事件,那么我们就认为这是一个匹配。

可以注意到这里的 Pattern 包含两个子 Pattern,第一个 Pattern 对应 Star Pattern,第二个 Pattern 对应 End Pattern,逻辑上也存在两条边。由于 StartCondition 包含 timesOrMore 的声明,所以它有一条指向自己的边,另外也有一条从 StartCondition 指向 EndCondition 的边。

上图右侧就是用我们定义的 JSON 格式来描述 Java Pattern 得到的结果。我们注意这里有几个关键字段。

第一个是 node 字段,它是一个数组,包含每个子 Pattern 的完整描述,比如这里我们用 times 字段表示这个子 Pattern 对应的 Condition,要被满足大于等于三次。第二个是 edges 字段,它用于记录边的信息。

整个 JSON 格式完整的定义,可以参考阿里云的官方文档。

2.4 动态规则支持:拓展 Condition

b743f9ddc5a82059b3bdba6ef074f2c4.jpeg

接下来我们介绍一下我们是如何支持动态修改 Condition 中使用的阈值。和业界典型的实践一样,我们基于 Aviator 表达式引擎定义了 AviatorCondition。在 AviatorCondition 的构造函数中,根据输入的表达式字符串生成 AviatorExpression,然后在 filter 方法中通过反射来解析传入的事件字段和阈值,执行 AviatorExpression,最后返回 true or false 作为 filter 这个方法的返回结果。

举一个简单的例子,假设有一个叫 Event 的类,它有两个字段 price 和 action。那么我们就可以构造一个这样的 AviatorCondition,它的参数是一个表达式字符串,这个字符串里描述了对 Event 中事件字段的取值要求。比如我们要求 action==1&&price>20。如果我们想要更新阈值,就直接修改表达式,变成 action==0&&price>50。

注意这个字符串是传入的参数,它也可以在我们刚才介绍的 JSON 格式中定义和描述,所以我们也可以直接编辑数据库中的字段进行阈值的动态更新。

2.5 多规则支持

f12e932d76d75125d3e7248078867565.jpeg

多规则是指在同一输入流上运用多条规则。按照开源 Flink CEP 的方案,我们要想在一个 Flink 作业中做到这点,就需要定义多个 Pattern Stream,对应也会生成多个 CepOperator 和 NFA,这也意味着上游数据要复制多次,这显然带来了很多额外的开销。

所以我们进行了优化,允许一个 DynamicCEPOperator 在它里面构建多个 NFA,这样上游的数据只需要传递一次,避免了额外的数据拷贝。

2.6 动态 CEP Demo

接下来我们以广告投放中的实时反作弊场景来演示动态 CEP Demo。

首先为大家介绍一下 demo 所针对的场景,我们知道在电商平台投放广告时,广告主通常有预算限制。例如对于按点击次数计算费用的广告而言,如果有黑灰产构造虎假流量,攻击广告主,就会很快消耗掉正常广告主的预算,使得广告内容被提前下架。

在这种情况下,广告主的利益受到了损害,容易导致后续的投诉与纠纷。为了应对上述作弊场景,我们需要快速辨识出恶意流量,采取针对性措施。例如限制恶意用户、向广告主发送告警等来保护用户权益。同时考虑到可能有意外因素,例如达人推荐、热点事件引流等导致某一商品的流量骤变,我们也需要动态调整用于识别恶意流量的规则,避免损害正常用户的利益。

本 Demo 将为大家演示如何使用 Fink 动态 CEP 解决上述问题。

1c2632c505d5e572486cc7f98e17c8d1.jpeg

我们假设客户的行为日志会被存放入消息队列 Kafka 中,Fink CEP 作业会消费 Kafka 数据,同时会去轮询 RDS 数据库中的规则表,拉取策略人员添加到数据库的最新规则,并用最新规则去做事件匹配。针对匹配到的事件,Flink CEP 作业会发出告警或将相关信息写入其他数据存储中,整体数据链路如上图所示。

在一会儿的演示中,我们对用户的日志做了一些简化,日志中的 action 字段,它的值如果为 0,就代表点击事件,为 1 代表购买事件,为 2 代表分享事件。接下来为大家演示具体的操作步骤。

首先需要创建好 Flink 全托管实例、RDS MySQL 实例、消息队列 Kafka 实例。然后准备好数据库相关内容,在 RDS 控制台创建 database。

8821b9d18362e3dd3b6dd076d9fa2d46.jpeg

注意在配置好之后,我们要在数据库连接中设置白名单,来保证我们的 Flink 全托管实例能访问 RDS 数据库。

然后打开 RDS 的 SQL 编辑页面创建一张数据表,命名为 RDS demo,四个字段 id、version、pattern、function。id 和 version 用于标名唯一的版本和 id 信息,pattern 代表了序列化后的 Pattern Stream,function 用于指代要用的 PatternProcessor 的函数名。然后编写 Flink DataStream 作业并打包提交到 Flink 全托管实例中运行。

21d5ef231479ddff9e075d78fa7fcf8c.jpeg

接下来为大家介绍 main 函数的大致流程以及部分关键实现。首先读取一些必要的参数用于构造 KafkaSource 以及 RDS 数据库的一些连接信息。然后对 Source 基于用户和商品的 ID 做 keyBy,方便后续进行 CEP 的匹配。

49b30d72c8b367144dbe0e23319c6d70.jpeg

接下来介绍一下在动态 CEP 中引入的新接口 DynamicPatterns。它有四个参数,第一个用来指定输入事件流,第二个参数 PatternProcessorDiscovererFoctory 用来构造 PatternProcessorDiscoverer;第三个参数 TimeBehaviour 用来指定是按照 even time 处理事件还是按照 processing time 处理事件;第四个用来描述输出流的类型信息。

另外注意这里用的是 JDBCPeriodPatternProcessorDiscovererFactory,它会周期性地扫描指定的数据库,检测到更新后,会对应地更新 Flink CEP 作业中使用的 PatternProcessor。

b82955d71072921bcf5dd9fb58a9c205.jpeg

完成作业的打包后,我们接下来把作业上传到 Flink 全托管中,然后指定了一些必要的参数,比如 KafkaBrockers 以及 RDS 的一些连接信息,然后点击上线,进入运维,启动作业。

d0a0b11695499fe98d4e1dcd917b71af.jpeg

接下来我们在 RDS 数据库中插入插入规则 1:“连续 3 条 action 为 0 的事件发生后,下一条事件的 action 仍非 1”,其业务含义为连续 3 次访问该产品但最后没有购买。

399603942722c2d36986ef498eaeb6ff.jpeg

它的 JSON 序列化表现如上图。

8769097bfe3b367933a37e50f88b146d.jpeg

然后将该条 JSON 数据插入到数据库中。

6cac93771e683665a575ff753c38e630.jpeg

接下来我们去作业中查看一下 TaskManager 日志,可以看到已经插入了最新规则。

214cf387d571afd12aa8a99abe3ac8b2.jpeg

接下来我们尝试往 Kafka 中发送几条消息来验证 CEP 的匹配逻辑,这里直接发四条一样的消息。

57c06d082ca6f997e78d6a8790e9a9ca.jpeg

接下来检测一下,TaskManager 中是否有相应的输出。可以看到(id=1, version=1)的规则的最新匹配,匹配的事件序列就是刚才发送的那四条事件。

fded9f1afff83e9052dd310d1a3f4edf.jpeg

然后我们来验证动态修改这个规则并插入新的规则。这里将出现次数改为大于等于五次,StartCondition 的判断条件也改为 action==0 或者 action==2,然后执行插入。同时我们插入第二条规则,它的 ID 为 2,版本为 1,内容和规则 1 的第 1 个版本完全一致,主要用来辅助展示对多规则的支持。

3254d141ff948b88449688ccf412fe9d.jpeg

接下来可以在作业日志中查看到我们刚刚插的两个规则,然后用 Kafka 发送三条 action 为 0 的消息,一条 action 为 2 的消息,并将之前的四条消息再发一遍。

081b277b12191e7cc4e182405ae0fb6f.jpeg

接着我们查看作业的匹配结果,可以看到针对(id=1, version=2)的规则,作业匹配到 1 次“5 个 action 为 0 或 1 的事件+1 个 action 非 1 的事件”的序列后输出了匹配结果,代表动态修改的规则成功生效;而对于(id=1, version=2)的规则,CEP 作业匹配到 2 次“3 个 action 为 0 的事件+1 个 action 非 1 的事件”的序列后输出结果,代表新增的规则也在作业中被采用。

03

Flink CEP SQL 语法增强

3.1 Flink CEP SQL 简介

233801cf662f15a480b7c2d2dd83ae5f.jpeg

Flink CEP SQL 主要基于 SQL2016 标准中的行模式识别语句,将 Flink 流表,例如上图中的 csv_source 作为 MATCH_RECOGNIZE 语句的输入,使用非确定有穷状态机对流表中的时序数据进行匹配,最终对识别出特定模式的数据序列进行计算后重新输出为 Flink 流表,从而无缝对接 Flink SQL 生态。

其 MATCH_RECOGNIZE 语句主要包含以下几个部分:

  • PARTITION BY 定义表的逻辑分区,类似 GROUP BY 操作,用于在并行执行时确定数据的分区。

  • ORDER BY 指定数据的排序方式,由于 CEP 需要在时序数据中识别特定模式,排序是必须的,并且要求 ORDER BY 的第一个字段必须是升序的时间属性。

  • MEASURES 类似 SELECT 操作,对识别出的序列执行映射、聚合等操作计算输出结果。例如,上图中使用了 FIRST、LAST、COUNT 函数对循环模式 A 执行了聚合计算,而对普通模式 B 则执行了简单的映射操作。

  • PATTERN 是 MATCH_RECOGNIZE 语句的核心,使用类似正则表达式的语法来定义匹配的序列模式。例如,上图中的 A + B 表示序列中先连续出现一个或多个 A 事件,紧接再出现一个 B 事件。

  • DEFINE 定义 PATTERN 中所使用变量对应的事件匹配条件。例如,PATTERN 中 A 和 B 对应数据中 event_type 字段的不同取值。若 PATTERN 中使用的变量在 DEFINE 中未定义,则表示任意匹配一条数据。

d8ee2592ba22d44182bdf320c52e1314.jpeg

对于上图中的源表,示例中的 CEP SQL 语句会输出四条结果,其中 Alice 用户识别到序列为 AAAB,如红色箭头所指。由于默认使用的 AFTER MATCH 策略为 SKIP TO NEXT ROW,结果表中会包含 AAAB 序列的两个子序列 AAB、AB 对应的输出。对于 Bob 用户则匹配到 AB 序列,如蓝色箭头所指。

3.2 Flink CEP SQL 语法增强

1037da17bcd1ff539c3706e25665efaf.jpeg

目前 Flink CEP 的主要工作集中在 Java API 上,但基于 Flink SQL 和其他 SQL 类 ETL 软件庞大的用户群和成熟的生态考虑,我们也尽可能在保持对 SQL 标准兼容的同时,持续完善和改进 Flink CEP SQL 的功能和使用体验。

在最近的工作中,Flink CEP SQL 主要在语法层面对以下三个功能进行了支持:

  • 输出带时间约束模式的匹配超时序列。

  • 定义事件之间的连续性。

  • 定义循环模式中的连续性和贪婪性。

01 输出带时间约束模式的匹配超时序列

f0eee573308c8625fb0ec9967f036824.jpeg

在目前版本的 Flink CEP SQL 中可以通过 WITHIN 语句对模式的整体匹配时间进行约束。例如一个常见的应用场景是用户行为模式识别,从流量入口到最终完成用户价值转化的一系列流程中,我们希望整体流程周期在十分钟之内的高潜用户,则可以像上图中在 PATTERN 后使用 WITHIN INTERVAL 加时间参数来进行约束。

9ec465356903933ccf8f1f3b0503a959.jpeg

例如对于上图中的源表,前面的 MATCH_RECOGNIZE 语句示例将会匹配到 Alice 用户在十分钟之内完成了 ABC 操作。而 Bob 用户由于 C 操作距离 A 操作已经过去了 13 分钟,将会匹配失败。

fd3c0a237a74d278e25d8d812bab0914.jpeg

但可能也存在这样的需求,对于这些流程周期超过十分钟或流程中断的用户,我们也希望能够识别出来,进一步去分析其超时或中断的原因。那么我们可以如上图中示例,在 ONE ROW PER MATCH 之后使用 SHOW TIMEOUT MATCHES 来声明输出匹配超时的序列。

在 Java API 中,我们使用 Output Tag 来将超时序列输出到侧流处理,而在 SQL 中,匹配超时序列和匹配成功序列会在同一张流表中,但对超时序列未匹配到的事件,在 MEASURES 中计算将会得到空值。上图结果表中 Bob 用户的 C 操作超时,因此得到 C 的映射操作结果也为空值。通过这些空值,我们可以将这些匹配超时序列从流表中分离出来,并且判断是在哪个步骤超时的。

■ 02 定义事件之间的连续性

265a7d8e85d16fa3acae69c2a14ca987.jpeg

在使用 Flink CEP Java API 的时候,我们可以通过函数很方便地定义事件之间的连续性,例如用 next()指定严格连续,模式中相邻的事件在数据流中必须紧接着出现,使用 followedBy()则可以指定松散连续,模式中相邻事件匹配时可以忽略一些不匹配的事件。

之前的 Flink CEP SQL 中只支持声明严格连续,即表中第一行的语法,现在每一个 Java API 中的连续性函数在 SQL 中都有了对应的表达方式。

例如 followedBy()对应的 SQL 语法,在 A 和 B 之间使用 SQL EXCLUDE 的语法,即{- X*?-},其中使用了一个未在 DEFINE 中定义的变量 X 来表示任意匹配,并使用 X*?表示非贪婪地匹配 0 至任意多个任意事件,其效果是 exclude 部分会连续匹配任何非 B 的事件,等效于 followedBy()的语义。在 followedBy() SQL 语法的基础上去掉 X 上的非贪婪修饰即为 followedByAny()的语义。对于 notNext(),则使用[^B] 的表达形式,表示 A 事件之后紧接着不能出现 B 事件。对于 notFollowedBy(),只需要将 followedBy()和 notNext()的 SQL 语法结合使用即可。

■ 03 定义循环模式中的连续性和贪婪性

6e7c2d59df9ce0b956c0434247f65cd9.jpeg

对于一个循环模式,例如上表中的 A+,在之前的 Flink CEP SQL 中已经支持了贪婪性的声明,不使用任何符号为贪婪匹配,使用一个问号则为非贪婪。两者的区别是,例如上图示例中当 a3 可以同时匹配 A 条件或 C 条件,贪婪匹配会选择更长的序列,而非贪婪则会选择更短的。

现在我们在原有贪婪性的声明上新增了对连续性的声明,使用??表示松散连续且贪婪,???表示松散连续非贪婪。循环模式的松散连续可以认为是在循环模式中的事件之间使用 followedyBy 关系,例如 a1、a2 之间有非匹配的 b1 事件,在严格连续的情况下,a1 会无法匹配到循环模式 A 中,如表中(A+ C)得到的 a2 a3 c1 序列,而松散连续的情况下则可以跳过 b1 事件而形成更长的匹配序列,例如(A+?? C)得到的 a1 a2 a3 c1 序列。

04

未来规划

d5e9e702ded33228957cac8b158545ec.jpeg

Flink CEP 未来工作的重点还是在动态 CEP 和 CEP SQL 上:

  • 扩展动态 CEP 多规则能力到静态场景。在目前版本的 Flink CEP 中,如果要在静态场景下使用多规则的话,只能通过创建多个 CepOperator,而这会带来数据的额外拷贝。在动态 CEP 中我们已经支持了在一个 Operator 中处理多条规则的能力,后续会将这个能力扩展到静态场景中。

  • 动态 CEP 的 JSON 格式规则描述支持定义参数化的 Condition。目前只有示例中的 AviatorCondition 支持在 JSON 中传入表达式作为构造的参数,其他 Condition 只能传入类名。因此之后我们考虑通过 Condition 的参数化来提高自定义 Condition 的扩展性,避免需要动态添加新的 Condition 类实现。

  • CEP SQL 表达能力增强。一方面 CEP SQL 相比 Java API 缺失的能力是首要进行对齐的,另一方面我们可以看到 PATTERN 定义的语法和正则语法非常相似,未来我们也会去做更多对正则语法的对齐,让用户在定义 PATTERN 的时候更加自然。

  • 动态 CEP 作为一个备受关注的新功能,我们计划让 Flink CEP SQL 也支持动态 CEP,能够在保持 schema 不变的情况下动态更新事件匹配条件和模式的定义。

往期精选

ed3044025b4cc34e6354e81e067e921c.png

3d6a4f2ffb8b20869d544a19ccb08355.jpeg

da3564aaa1275ed41da66fb6e211721e.jpeg

863fc0efc9413bf19d52091250ebb984.jpeg

15e599c4beff6f938ff26e2e59ee8462.jpeg

▼ 关注「Apache Flink」,获取更多技术干货 ▼

772edc2c9148a8fda90df5b42d306c23.png

 2a1eb36eb2c27659e3ee0c9a221fe37f.gif  点击「阅读原文」,查看原文视频&演讲 PPT

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/337536.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mysql面试题(集合)

mysql如何实现索引机制 mysql中索引分三类:B树索引、Hash索引、全文索引 InnoDB索引与MYISAM索引实现的区别是什么 1.InnoDB的辅助索引data域存储相应记录主键的值而不是地址 2.InnoDB的数据文件本身就是主索引文件 3.MyISAM的索引和数据是分开存储的 一个表中如…

ChatGPT 来了,你准备好了吗?

周三的晚上,我做了一次直播,题目叫做《ChatGPT 来了,老师和同学们准备好了吗?》。如果你还没看,欢迎看看回放视频。做这次直播,是因为受了三重刺激。第一重,来自于我的一位好友,也是…

自上而下的传输协议-TCP/IP 的演化

动机来自昨天下班路上快到家发的一则朋友圈: 作为因果的历史是不存在的。因为有无数种对等的解释。这个可以用拓扑学证明的,模型非常简单,事件作为点,事件之间的关系作为连接两点的有向边。 最近思考一个问题,传输协…

MongoDB 更新文档

MongoDB 使用 update() 和 save() 方法来更新集合中的文档。 update() 方法 update() 方法用于更新已存在的文档。语法格式如下&#xff1a; db.collection.update(<query>,<update>,{upsert: <boolean>,multi: <boolean>,writeConcern: <document…

移除元素-力扣27-java

一、题目描述给你一个数组 nums 和一个值 val&#xff0c;你需要 原地 移除所有数值等于 val 的元素&#xff0c;并返回移除后数组的新长度。不要使用额外的数组空间&#xff0c;你必须仅使用 O(1) 额外空间并 原地 修改输入数组。元素的顺序可以改变。你不需要考虑数组中超出新…

数据与C(整数存储的各种类型)

目录 一.int&#xff08;%d&#xff09; 1.sizeof应用 2.声明int变量以及初始化 3.打印int值 二.short&#xff08;%hd&#xff09;和long(%ld) 1.short大小 2.long大小 3.short和long的分支 三.综合讲解 四.整数类型总结 五.可移植类型&#xff1a; stdint.h 和 in…

年轻人越来越反感“专家”,问题出在哪儿?

专家的建议&#xff0c;又惹怒了年轻人。 最近&#xff0c;某学者表示&#xff0c;在鼓励年轻人生育、婚恋的问题上&#xff0c;可以将法定结婚年龄下调至18岁&#xff0c;以此降低成婚门槛促进单身男女建立家庭。 此话一出&#xff0c;立刻引来无数板砖。不少人质疑&#xf…

CrossOver2023mac跨系统互通切换win虚拟机

CrossOver2023版是在Mac上运行Win软件的最简单方法&#xff0c;有了它&#xff0c;你无须 Win许可、重新启动或使用虚拟机即可在mac上使用Win软件。CrossOver23可以轻松地从Dock本地启动Win程序。CrossOver版还集成了macOS 功能&#xff0c;例如跨平台复制和粘贴&#xff0c;以…

数据与C(ASCII码,char)

目录 一.ASCII码讲解 二.非打印字符&#xff08;转义字符&#xff09; 三.扩展小知识 一.ASCII码讲解 char类型用于存储字符&#xff0c;从技术层面看&#xff0c;char时整数类型&#xff0c;因为char类型实际上存储的是整数而不是字符。计算机使用数字编码来处理字符&…

javaEE 初阶 — 超时重传机制

文章目录超时重传机制1. 数据重复传输问题2. 如何解决数据重复传输问题3. 重传次数问题TCP 的工作机制&#xff1a;确认应答机制 超时重传机制 如果传输数据的时候丢包了该怎么办&#xff1f; 利用 超时重传&#xff0c;也就是超过了一定的时间&#xff0c;如果还没响应就重新…

回顾 | 开学季(一)- 在 Windows / macOS 上配置你的开发环境

点击蓝字关注我们编辑&#xff1a;Alan Wang排版&#xff1a;Rani Sun微软 Reactor 为帮助广开发者&#xff0c;技术爱好者&#xff0c;更好的学习 .NET Core, C#, Python&#xff0c;数据科学&#xff0c;机器学习&#xff0c;AI&#xff0c;区块链, IoT 等技术&#xff0c;将…

社交登陆OAuth2.0

QQ、微博、github 等网站的用户量非常大&#xff0c;别的网站为了 简化自我网站的登陆与注册逻辑&#xff0c;引入社交登陆功能&#xff1b; 步骤&#xff1a; 1&#xff09;、用户点击 QQ 按钮 2&#xff09;、引导跳转到 QQ 授权页 3&#xff09;、用户主动点击授权&#xff…

Linux 服务器(centos7)搭建 Web 版 VSCode

首先到code-server的git地址找到下载链接 打开https://github.com/cdr/code-server/releases&#xff0c;选择一个版本下载&#xff0c;我选择的是code-server-4.9.1-linux-amd64.tar.gz&#xff0c;这是 64 位 Linux 的通用版。 下载压缩包到服务器并解压 cd /home mkdir v…

Python-第四天 Python循环语句

Python-第四天 Python循环语句一、while循环1.while循环的基础语法2.while循环的基础案例3.while循环的嵌套应用4.while循环的嵌套案例二、for循环1.for循环的基础语法1.1基础语法1.2 range语句2.for循环的嵌套应用三、循环中断 : break和continue1.continue2.break四、 综合案…

nginx相关反爬策略总结笔记

引言 互联网站点的流量一部分由人类正常访问行为产生&#xff0c;而高达30%-60%的流量则是由网络爬虫产生的&#xff0c;其中一部分包含友好网络爬虫&#xff0c;如搜索引擎的爬虫、广告程序、第三方合作伙伴程序、Robots协议友好程序等;而并非所有的网络爬虫都是友好的&#x…

【C#项目】图书馆管理系统-WinForm+MySQL

文章目录前言一、业务梳理与需求分析1.功能描述2.实现步骤3.功能逻辑图二、数据库设计1.实体-关系&#xff08;E-R图&#xff09;概念模型设计2.数据表设计三、WinForm界面交互设计1、界面交互逻辑2、项目树3、主界面登录界面4、 图书查询界面5、图书借阅界面6、图书插入界面7、…

pixhawk2.4.8-地面站配置-APM固件

目录一、硬件准备二、APM固件、MP地面站下载三、地面站配置1 刷固件2 机架选择3 加速度计校准4 指南针校准5 遥控器校准6 飞行模式7 紧急断电&无头模式8 基础参数设置9 电流计校准10 电调校准11 起飞前检查&#xff08;每一项都非常重要&#xff09;12 飞行经验四、遇到的问…

10.数据库恢复技术

其他章节here 梳理 名词解释 事务&#xff1a;事务是用户定义的 一个数据操作序列&#xff0c;这些操作要么全做&#xff0c;要么全不做&#xff0c;是一个不可分割的工作单位 事务≠程序 事务是恢复和并发控制的基本单位 &#xff08;事务时数据库应用程序的基本逻辑单元&am…

制造业升级转型:制造业上市公司-智能制造词频统计数据集

发展智能制造&#xff0c;关乎中国制造业转型升级的成效。基于中国制造业上市公司年报&#xff0c;通过文本数据挖掘&#xff0c;提取关键词反映企业对智能制造的关切焦点&#xff0c;进而运用词频及共词网络分析&#xff0c;洞察中国智能制造的发展态势。 研究发现&#xff0…

OpenAI最重要的模型【CLIP】

最近的 AI 突破 DALLE和 Stable Diffusion有什么共同点&#xff1f; 它们都使用 CLIP 架构的组件。 因此&#xff0c;如果你想掌握这些模型是如何工作的&#xff0c;了解 CLIP 是先决条件。 此外&#xff0c;CLIP 已被用于在 Unsplash 上索引照片。 但是 CLIP 做了什么&…