大数据之Flink(六)

news2024/11/17 10:53:10

17、Flink CEP

17.1、概念
17.1.1、CEP

CEP是“复杂事件处理(Complex Event Processing)”的缩写;而 Flink CEP,就是 Flink 实现的一个用于复杂事件处理的库(library)。

总结起来,复杂事件处理(CEP)的流程可以分成三个步骤:

(1) 定义一个匹配规则

(2) 将匹配规则应用到事件流上,检测满足规则的复杂事件

(3) 对检测到的复杂事件进行处理,得到结果进行输出
在这里插入图片描述
输入是不同形状的事件流,我们可以定义一个匹配规则:在圆形后面紧跟着三角形。那么将这个规则应用到输入流上,就可以检测到三组匹配的复杂事件。它们构成了一个新的“复杂事件流”,流中的数据就变成了一组一组的复杂事件,每个数据都包含了一个圆形和一个三角形。接下来,我们就可以针对检测到的复杂事件,处理之后输出一个提示或报警信息了。

CEP 是针对流处理而言的,分析的是低延迟、频繁产生的事件流。它的主要目的, 就是在无界流中检测出特定的数据组合,掌握数据中重要的高阶特征。

17.1.2、模式(Pattern)

CEP 的第一步所定义的匹配规则,我们可以把它叫作“模式”(Pattern)。模式的定义主要就是两部分内容:

  1. 每个简单事件的特征
  2. 简单事件之间的组合关系

事件发生的顺序即“近邻关系“。CEP 做的事其实就是在流上进行模式匹配。根据模式的近邻关系条件不同,可以检测连续的事件或不连续但先后发生的事件;模式还可能有时间的限制,如果在设定时间范围内没有满足匹配条件,就会导致模式匹配超时。

17.1.3、应用场景

⚫ 风险控制

设定一些行为模式,可以对用户的异常行为进行实时检测。当一个用户行为符合了异常行为模式,比如短时间内频繁登录并失败、大量下单却不支付(刷单),就可以向用户发送通知信息,或是进行报警提示、由人工进一步判定用户是否有违规操作的嫌疑。这样就可以有效地控制用户个人和平台的风险。

⚫ 用户画像

利用 CEP 可以用预先定义好的规则,对用户的行为轨迹进行实时跟踪,从而检测出具有特定行为习惯的一些用户,做出相应的用户画像。基于用户画像可以进行精准营销,即对行为匹配预定义规则的用户实时发送相应的营销推广;这与目前很多企业所做的精准推荐原理是一样的。

⚫ 运维监控

对于企业服务的运维管理,可以利用 CEP 灵活配置多指标、多依赖来实现更复杂的监控模式。

17.2、简易代码
17.2.1、引入依赖
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
</dependency>

定义登录事件

package flinkCEPDemo;

/**
 * @Title: LoginEvent
 * @Author lizhe
 * @Package flinkCEPDemo
 * @Date 2024/6/23 12:46
 * @description:定义登录事件
 */
public class LoginEvent {
    public String userId;
    public String ipAddress;
    public String eventType;
    public Long timestamp;

    public LoginEvent() {
    }

    public LoginEvent(String userId, String ipAddress, String eventType, Long timestamp) {
        this.userId = userId;
        this.ipAddress = ipAddress;
        this.eventType = eventType;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "LoginEvent{" +
                "userId='" + userId + '\'' +
                ", ipAddress='" + ipAddress + '\'' +
                ", eventType='" + eventType + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }
}

检测用户行为,如果连续三次登录失败,就输出报警信息。

package flinkCEPDemo;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @Title: LoginFailDetectExample
 * @Author lizhe
 * @Package flinkCEPDemo
 * @Date 2024/6/23 12:45
 * @description:检测用户行为,如果连续三次登录失败,就输出报警信息。
 */
public class LoginFailDetectExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //获取数据流
        SingleOutputStreamOperator<LoginEvent> loginEventSingleOutputStreamOperator = env.fromElements(
                new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
                new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
                new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
                new LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
                new LoginEvent("user_2", "192.168.1.29", "success", 6000L),
                new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
                new LoginEvent("user_2", "192.168.1.29", "fail", 8000L)

        ).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<LoginEvent>() {
                    @Override
                    public long extractTimestamp(LoginEvent element, long recordTimestamp) {
                        return element.timestamp;
                    }
                }));
        //定义复杂事件模式
        Pattern<LoginEvent, LoginEvent> loginEventPattern = Pattern.<LoginEvent>begin("first")//第一次登录失败事件
                .where(new IterativeCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent value, Context<LoginEvent> ctx) throws Exception {
                        return value.eventType.equals("fail");
                    }
                })
                .next("second")//紧跟着第二次登录失败事件
                .where(new IterativeCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent value, Context<LoginEvent> ctx) throws Exception {
                        return value.eventType.equals("fail");
                    }
                })
                .next("third")
                .where(new IterativeCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent value, Context<LoginEvent> ctx) throws Exception {
                        return value.eventType.equals("fail");
                    }
                });
        //将模式应用到数据流上,检测复杂事件
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventSingleOutputStreamOperator.keyBy(event -> event.userId), loginEventPattern);

        //将检测到的事件提取,进行处理得到报警信息输出
        SingleOutputStreamOperator<String> warnStream = patternStream.select(
                new PatternSelectFunction<LoginEvent, String>() {
                    @Override
                    public String select(Map<String, List<LoginEvent>> pattern) throws Exception {
                        //提取复杂事件中的三次登录失败
                        LoginEvent firstLoginEvent = pattern.get("first").get(0);
                        LoginEvent secondLoginEvent = pattern.get("second").get(0);
                        LoginEvent thirdLoginEvent = pattern.get("third").get(0);
                        return firstLoginEvent.userId + "连续三次登录失败!登录时间分别为" + firstLoginEvent.timestamp + "," + secondLoginEvent.timestamp + "," + thirdLoginEvent.timestamp;
                    }
                }
        );
        warnStream.print();
        env.execute();
    }
}

17.3、模式API(Pattern API)
17.3.1、个体模式

一个匹配规则就可以表达成先后发生的一个个简单事件,按顺序串联组合在一起。这里的每一个简单事件并不是任意选取的,也需要有一定的条件规则;所以我们就把每个简单事件的匹配规则,叫作“个体模式”(Individual Pattern)。

每一个登录失败事件的选取规则,就都是一个个体模式

.next("second")//紧跟着第二次登录失败事件
                .where(new IterativeCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent value, Context<LoginEvent> ctx) throws Exception {
                        return value.eventType.equals("fail");
                    }
                })

体模式需要一个“过滤条件”,用来指定具体的匹配规则。这个条件一般是通过调用.where()方法来实现的。

个体模式增加一个“量词”(quantifier),就能够让它进行循环匹配,接收多个事件

17.3.1.1、量词

个体模式后面可以跟一个“量词”,用来指定循环的次数。在循环模式中,对同样特征的事件可以匹配多次。比如我们定义个体模式为“匹配形状为三角形的事件”,再让它循环多次,就变成了“匹配连续多个三角形的事件”。注意这里的“连续”,只要保证前后顺序即可,中间可以有其他事件,所以是“宽松近邻”关系。

  • .oneOrMore()

匹配事件出现一次或多次,假设 a 是一个个体模式,a.oneOrMore()表示可以匹配 1 个或多个 a 的事件组合。我们有时会用 a+来简单表示。

  • .times(times)

匹配事件发生特定次数(times),例如 a.times(3)表示 aaa;

  • .times(fromTimes,toTimes)

指定匹配事件出现的次数范围,最小次数为fromTimes,最大次数为toTimes。例如a.times(2, 4)可以匹配 aa,aaa 和 aaaa。

  • .greedy()

**只能用在循环模式后,**使当前循环模式变得“贪心”(greedy),也就是总是尽可能多地去匹配。例如 a.times(2, 4).greedy(),如果出现了连续 4 个 a,那么会直接把 aaaa 检测出来进行处理,其他任意 2 个 a 是不算匹配事件的。

  • .optional()

使当前模式成为可选的,也就是说可以满足这个匹配条件,也可以不满足。

// 匹配事件出现 4 次
pattern.times(4);

// 匹配事件出现 4 次,或者不出现
pattern.times(4).optional();

// 匹配事件出现 2, 3 或者 4 次
pattern.times(2, 4);

// 匹配事件出现 2, 3 或者 4 次,并且尽可能多地匹配
pattern.times(2, 4).greedy();

// 匹配事件出现 2, 3, 4 次,或者不出现
 pattern.times(2, 4).optional().greedy();
//匹配事件出现 1 次或多次
pattern.oneOrMore();

// 匹配事件出现 1 次或多次,并且尽可能多地匹配
pattern.oneOrMore().greedy();

// 匹配事件出现 1 次或多次,或者不出现
pattern.oneOrMore().optional();

// 匹配事件出现 1 次或多次,或者不出现;并且尽可能多地匹配
pattern.oneOrMore().optional().greedy();

// 匹配事件出现 2 次或多次
pattern.timesOrMore(2);

// 匹配事件出现 2 次或多次,并且尽可能多地匹配
pattern.timesOrMore(2).greedy();

// 匹配事件出现 2 次或多次,或者不出现
pattern.timesOrMore(2).optional()

// 匹配事件出现 2 次或多次,或者不出现;并且尽可能多地匹配
pattern.timesOrMore(2).optional().greedy();

17.3.1.2、条件

对于条件的定义,主要是通过调用 Pattern 对象的.where()方法来实现的,主要可以分为简单条件、迭代条件、组合条件、终止条件几种类型。

  • 简单条件

匹配事件的user 属性以“A”开头

pattern.where(new IterativeCondition<Event>() { @Override
public boolean filter(LoginEvent value, Context<LoginEvent> ctx) throws Exception{
return value.user.startsWith("A");
}
});

返回为true为符合匹配规则,false为不符合匹配规则。

  • 迭代条件

上下文 Context。调用这个上下文的.getEventsForPattern()方法,传入一个模式名称,就可以拿到这个模式中之前已匹配到的所有数据了。

middle.oneOrMore()
.where(new IterativeCondition<Event>() { @Override
public boolean filter(Event value, Context<Event> ctx) throws Exception {
// 事件中的 user 必须以 A 开头
if (!value.user.startsWith("A")) { return false;
}

int sum = value.amount;
// 获取当前模式之前已经匹配的事件,求所有事件 amount 之和
for (Event event : ctx.getEventsForPattern("middle")) { sum += event.amount;
}
// 在总数量小于 100 时,当前事件满足匹配规则,可以匹配成功
return sum < 100;
}
});

迭代条件能够获取已经匹配的事件,如果自身又是循环模式(比如量词oneOrMore),那么两者结合就可以捕获自身之前接收的数据,据此来判断是否接受当前事件。这个功能非常强大,我们可以由此实现更加复杂的需求,比如可以要求“只有大于之前数据的平均值,才接受当前事件”。

另外迭代条件中的上下文Context 也可以获取到时间相关的信息,比如事件的时间戳和当前的处理时(processing time)。

  • 组合条件

    .where()后面再接一个.where()相当于就是多个条件的“逻辑与”(AND)

    .where()后加一个.or()相当于多个条件的逻辑或(OR)

  • 终止条件

    指定一个“终止条件”(Stop Condition),表示遇到某个特定事件时当前模式就不再继续循环匹配了。终止条件的定义是通过调用模式对象的.until() 方法来实现的, 同样传入一个IterativeCondition 作为参数。需要注意的是,终止条件只与 oneOrMore() 或者oneOrMore().optional()结合使用。因为在这种循环模式下,我们不知道后面还有没有事件可以匹配,只好把之前匹配的事件作为状态缓存起来继续等待,这等待无穷无尽;如果一直等下去, 缓存的状态越来越多,最终会耗尽内存。所以这种循环模式必须有个终点,当.until()指定的条件满足时,循环终止,这样就可以清空状态释放内存了。

17.3.2、组合模式

按一定的顺序把个体模式组合起来——模式序列。

Pattern<Event, ?> pattern = Pattern
.<Event>begin("start").where(...)
.next("next").where(...)
.followedBy("follow").where(...)
...

1、初始模式

Pattern 的静态方法.begin()来创建,注意begin前要有泛型。如下所示:

Pattern<Event, ?> start = Pattern.<Event>begin("start");

2、近邻条件

  • 严格近邻

    匹配的事件严格地按顺序一个接一个出现,中间不会有任何其他事件。.next()

  • 宽松近邻

    两个匹配的事件之间可以有其他不匹配的事件出现。.followedBy()
    在这里插入图片描述

  • 非确定性宽松近邻

    所谓“非确定性”是指可以重复使用之前已经匹配过的事件;这种近邻条件下匹配到的不同复杂事件,可以以同一个事件作为开始,即图中圆圈可以跟所有的三角进行匹配
    在这里插入图片描述
    3、其他限制条件

  • .notNext()

表示前一个模式匹配到的事件后面,不能紧跟着某种事件。

  • .notFollowedBy()

表示前一个模式匹配到的事件后面, 不会出现某种事件。这里需要注意, 由于notFollowedBy()是没有严格限定的;流数据不停地到来,我们永远不能保证之后“不会出现某种事件”。所以一个模式序列不能以 notFollowedBy()结尾,这个限定条件主要用来表示“两个事件中间不会出现某种事件”。

  • .within()

    传入一个时间参数,这是模式序列中第一个事件到最后一个事件之间的最大时间间隔,只有在这期间成功匹配的复杂事件才是有效的。一个模式序列中只能有一个时间限制,调用.within()的位置不限;如果多次调用则会以最小的那个时间间隔为准

// 严格近邻条件
Pattern<Event, ?> strict = start.next("middle").where(...);

// 宽松近邻条件
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);

// 非确定性宽松近邻条件
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);
// 不能严格近邻条件
Pattern<Event, ?> strictNot = start.notNext("not").where(...);

// 不能宽松近邻条件
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);

// 时间限制条件
middle.within(Time.seconds(10));

对于定义了量词(如 oneOrMore()、times())的循环模式,默认内部采用的是宽松近邻,即当循环匹配多个事件时,它们中间是可以有其他不匹配事件的;相当于用单例模式分别定义、再用 followedBy()连接起来。.consecutive()为循环模式中的匹配事件增加严格的近邻条件,保证所有匹配事件是严格连续的。优化17.2的代码

//  定义 Pattern,登录失败事件,循环检测 3 次
Pattern<LoginEvent, LoginEvent> pattern = Pattern
.<LoginEvent>begin("fails")
.where(new SimpleCondition<LoginEvent>() { @Override
public boolean filter(LoginEvent loginEvent) throws Exception { return loginEvent.eventType.equals("fail");
}
}).times(3).consecutive();

除严格近邻外,也可以为循环模式中的事件指定非确定性宽松近邻条件,表示可以重复使用 已 经 匹 配 的 事 件 。 这 需 要 调 用 .allowCombinations() 方 法 来 实 现 , 实 现 的 效 果与.followedByAny()相同。

17.3.3、模式组

非常复杂的场景中,可能需要划分多个“阶段”,每个“阶段”又有一连串的匹配规则。为了应对这样的需求,Flink CEP 允许我们以“嵌套”的方式来定义模式。

我们用 begin()、next()、followedBy()、followedByAny()这样的“连接词”来组合个体模式,这些方法的参数就是一个个体模式的名称;而现在它们可以直接以一个模式序列作为参数,就将模式序列又一次连接组合起来了。这样得到的就是一个“模式组”(Groups of Patterns)

// 以模式序列作为初始模式
Pattern<Event, ?> start = Pattern.begin( Pattern.<Event>begin("start_start").where(...)
.followedBy("start_middle").where(...)
);

// 在 start 后定义严格近邻的模式序列,并重复匹配两次
Pattern<Event, ?> strict = start.next( Pattern.<Event>begin("next_start").where(...)
.followedBy("next_middle").where(...)
).times(2);

// 在 start 后定义宽松近邻的模式序列,并重复匹配一次或多次
Pattern<Event, ?> relaxed = start.followedBy( Pattern.<Event>begin("followedby_start").where(...)
.followedBy("followedby_middle").where(...)
).oneOrMore();

//在 start 后定义非确定性宽松近邻的模式序列,可以匹配一次,也可以不匹配
Pattern<Event, ?> nonDeterminRelaxed = start.followedByAny( Pattern.<Event>begin("followedbyany_start").where(...)
.followedBy("followedbyany_middle").where(...)
).optional();

17.3.4、匹配后跳过策略

如果对循环模式增加了.greedy()的限制,那么就会“尽可能多地”匹配事件,这样就可以砍掉那些子集上的匹配了。不过这种方式还是略显简单粗暴,如果我们想要精确控制事件的匹配应该跳过哪些情况,那就需要制定另外的策略了。

Pattern.begin("start", AfterMatchSkipStrategy.noSkip())
.where(...)
...

匹配后跳过策略 AfterMatchSkipStrategy 是一个抽象类,它有多个具体的实现,可以通过调用对应的静态方法来返回对应的策略实例。

我们如果输入事件序列“a a a b”——这里为了区分前后不同的 a 事件,可以记作“a1 a2 a3 b”——那么应该检测到 6 个匹配结果:(a1 a2 a3 b),(a1 a2 b),(a1 b),(a2 a3 b),(a2 b),(a3 b)。如果在初始模式的量词.oneOrMore()后加上.greedy()定义为贪心匹配,那么结果就是:

(a1 a2 a3 b),(a2 a3 b),(a3 b),每个事件作为开头只会出现一次。接下来我们讨论不同跳过策略对匹配结果的影响:

  • 不跳过(NO_SKIP)

代码调用AfterMatchSkipStrategy.noSkip()。这是默认策略,所有可能的匹配都会输出。所以这里会输出完整的 6 个匹配。

  • 跳至下一个(SKIP_TO_NEXT)

代码调用 AfterMatchSkipStrategy.skipToNext()。找到一个 a1 开始的最大匹配之后,跳过a1 开始的所有其他匹配,直接从下一个 a2 开始匹配起。当然 a2 也是如此跳过其他匹配。最终得到(a1 a2 a3 b),(a2 a3 b),(a3 b)。可以看到,这种跳过策略跟使用.greedy()效果是相同的。

  • 跳过所有子匹配(SKIP_PAST_LAST_EVENT)

代码调用AfterMatchSkipStrategy.skipPastLastEvent()。找到 a1 开始的匹配(a1 a2 a3 b)之后,直接跳过所有 a1 直到 a3 开头的匹配,相当于把这些子匹配都跳过了。最终得到(a1 a2 a3 b),这是最为精简的跳过策略。

  • 跳至第一个(SKIP_TO_FIRST[a])

代码调用AfterMatchSkipStrategy.skipToFirst(“a”),这里传入一个参数,指明跳至哪个模式的第一个匹配事件。找到 a1 开始的匹配(a1 a2 a3 b)后,跳到以最开始一个 a(也就是 a1) 为开始的匹配,相当于只留下 a1 开始的匹配。最终得到(a1 a2 a3 b),(a1 a2 b),(a1 b)。

  • 跳至最后一个(SKIP_TO_LAST[a])

代码调用AfterMatchSkipStrategy.skipToLast(“a”),同样传入一个参数,指明跳至哪个模式的最后一个匹配事件。找到 a1 开始的匹配(a1 a2 a3 b)后,跳过所有 a1、a2 开始的匹配,跳到以最后一个 a(也就是 a3)为开始的匹配。最终得到(a1 a2 a3 b),(a3 b)。

17.4、模式的检测处理
17.4.1、模式应用到流上

利用 Pattern API 定义好模式还只是整个复杂事件处理的第一步,接下来还需要将模式应用到事件流上、检测提取匹配的复杂事件并定义处理转换的方法,最终得到想要的输出信息。

PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventSingleOutputStreamOperator.keyBy(event -> event.userId), loginEventPattern);

模式中定义的复杂事件,发生是有先后顺序的,这里“先后”的判断标准取决于具体的时间语义。

// 可选的事件比较器
EventComparator<Event> comparator = ...
PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);

17.4.2、处理匹配事件

PatternStream 的转换操作主要可以分成两种:简单便捷的选择提取(select)操作,和更加通用、更加强大的处理(process)操作。

  1. 匹配事件的选择提取

    处理匹配事件最简单的方式,就是从 PatternStream 中直接把匹配的复杂事件提取出来, 包装成想要的信息输出,这个操作就是“选择”(select)。

    SingleOutputStreamOperator<String> warnStream = patternStream.select(
                    new PatternSelectFunction<LoginEvent, String>() {
                        @Override
                        public String select(Map<String, List<LoginEvent>> pattern) throws Exception {
                            //提取复杂事件中的三次登录失败
                            LoginEvent firstLoginEvent = pattern.get("first").get(0);
                            LoginEvent secondLoginEvent = pattern.get("second").get(0);
                            LoginEvent thirdLoginEvent = pattern.get("third").get(0);
                            return firstLoginEvent.userId + "连续三次登录失败!登录时间分别为" + firstLoginEvent.timestamp + "," + secondLoginEvent.timestamp + "," + thirdLoginEvent.timestamp;
                        }
                    }
            );
    

    PatternSelectFunction 里需要实现一个 select()方法,这个方法每当检测到一组匹配的复杂事件时都会调用一次。它以保存了匹配复杂事件的 Map 作为输入,经自定义转换后得到输出信息返回。这里我们假设之前定义的模式序列中,有名为“start”和“middle”的两个个体模式, 于是可以通过这个名称从 Map 中选择提取出对应的事件。注意调用 Map 的.get(key)方法后得到的是一个事件的List;如果个体模式是单例的,那么List 中只有一个元素,直接调用.get(0) 就可以把它取出。

    对连续登录失败检测的改进

    patternStream
    .select(new PatternSelectFunction<LoginEvent, String>() { 
        @Override
    public String select(Map<String, List<LoginEvent>> map) throws Exception {
    // 只有一个模式,匹配到了 3 个事件,放在 List 中
    LoginEvent first = map.get("fails").get(0); 
    LoginEvent second = map.get("fails").get(1);
    LoginEvent third = map.get("fails").get(2);
    return first.userId + " 连续三次登录失败!登录时间:" + first.timestamp
    + ", " + second.timestamp + ", " + third.timestamp;
    }
    })
    .print("warning");
    
    

    PatternStream 还有一个类似的方法是.flatSelect() , 传入的参数是一个PatternFlatSelectFunction。与之前 select()的不同就在于没有返回值,而是多了一个收集器(Collector)参数out,通过调用 out.collet()方法就可以实现多次发送输出数据了。

    代码示例

    patternStream.flatSelect(new PatternFlatSelectFunction<LoginEvent, String>() { @Override
    public void flatSelect(Map<String, List<LoginEvent>> map, Collector<String> out) throws Exception {
    LoginEvent first = map.get("fails").get(0); LoginEvent second = map.get("fails").get(1); LoginEvent third = map.get("fails").get(2);
    out.collect(first.userId + " 连续三次登录失败!登录时间:" + first.timestamp +
    ", " + second.timestamp + ", " + third.timestamp);
    }
    }).print("warning");
    
    

    PatternFlatSelectFunction 使用更加灵活,完全能够覆盖 PatternSelectFunction 的功能。

  2. 匹配事件的通用处理

    直接调用PatternStream 的.process()方法,传入一个 PatternProcessFunction。PatternProcessFunction 功能更加丰富、调用更加灵活,可以完全覆盖其他接口。

    代码示例

    patternStream.process(new PatternProcessFunction<LoginEvent, String>() { @Override
    public void processMatch(Map<String, List<LoginEvent>> map, Context ctx, Collector<String> out) throws Exception {
    LoginEvent first = map.get("fails").get(0); LoginEvent second = map.get("fails").get(1); LoginEvent third = map.get("fails").get(2);
    out.collect(first.userId + " 连续三次登录失败!登录时间:" + first.timestamp +
    ", " + second.timestamp + ", " + third.timestamp);
    }
    }).print("warning");
    
    
17.4.3、处理超时事件

当一个模式上通过within加上窗口长度后,部分匹配的事件序列就可能因为超过窗口长度而被丢弃。可以使用TimedOutPartialMatchHandler接口 来处理超时的部分匹配。这个接口可以和其它的混合使用。所以往往不应该直接丢弃,而是要输出一个提示或报警信息。这就要求我们有能力捕获并处理超时事件。

1、使用 PatternProcessFunction 的侧输出流

Flink CEP 提供了一个专门捕捉超时的部分匹配事件的接口TimedOutPartialMatchHandler。这个接口需要实现一个 processTimedOutMatch()方法,可以将超时的、已检测到的部分匹配事件放在一个 Map 中,作为方法的第一个参数;方法的第二个参数是 PatternProcessFunction 的上下文Context。所以这个接口必须与 PatternProcessFunction 结合使用,对处理结果的输出则需要利用侧输出流来进行。

class MyPatternProcessFunction extends PatternProcessFunction<Event, String>
implements TimedOutPartialMatchHandler<Event> {
// 正常匹配事件的处理
@Override
public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<String> out) throws Exception{
...
}

// 超时部分匹配事件的处理
@Override
public void processTimedOutMatch(Map<String, List<Event>> match, Context ctx) throws Exception{
Event startEvent = match.get("start").get(0);
OutputTag<Event> outputTag = new OutputTag<Event>("time-out"){}; 
ctx.output(outputTag, startEvent);
}
}

2、使用 PatternTimeoutFunction

简化版的PatternSelectFunction , 它无法直接处理超时事件, 不过我们可以通过调用 PatternStream 的.select()方法时多传入一个 PatternTimeoutFunction 参数来实现这一点。

// 定义一个侧输出流标签,用于标识超时侧输出流
OutputTag<String> timeoutTag = new OutputTag<String>("timeout"){};

// 将匹配到的,和超时部分匹配的复杂事件提取出来,然后包装成提示信息输出
SingleOutputStreamOperator<String> resultStream = patternStream.select(timeoutTag,
// 超时部分匹配事件的处理
new PatternTimeoutFunction<Event, String>() { 
    @Override
public String timeout(Map<String, List<Event>> pattern, long timeoutTimestamp) throws Exception {
Event event = pattern.get("start").get(0); return "超时:" + event.toString();
}
},
// 正常匹配事件的处理
new PatternSelectFunction<Event, String>() { 
    @Override
public String select(Map<String, List<Event>> pattern) throws Exception
{
...
}
}
);

// 将正常匹配和超时部分匹配的处理结果流打印输出
resultStream.print("matched");
resultStream.getSideOutput(timeoutTag).print("timeout");

3、应用示例

在电商平台中,最终创造收入和利润的是用户下单购买的环节。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后, 用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如 15 分钟),如果下单后一段时间仍未支付,订单就会被取消。

创建订单实体类

package flinkCEPDemo;

/**
 * @Title: OrderEvent
 * @Author lizhe
 * @Package flinkCEPDemo
 * @Date 2024/6/23 19:57
 * @description:定义订单实体
 */
public class OrderEvent {
    public String userId;
    public String orderId;
    public  String eventType;
    public long timestamp;

    public OrderEvent() {
    }

    public OrderEvent(String userId, String orderId, String eventType, long timestamp) {
        this.userId = userId;
        this.orderId = orderId;
        this.eventType = eventType;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "OrderEvent{" +
                "userId='" + userId + '\'' +
                ", orderId='" + orderId + '\'' +
                ", eventType='" + eventType + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }
}

代码示例

package flinkCEPDemo;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @Title: OrderTimeOutDetectExample
 * @Author lizhe
 * @Package flinkCEPDemo
 * @Date 2024/6/23 19:59
 * @description: 订单超时检测
 */
public class OrderTimeOutDetectExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //获取数据流
        SingleOutputStreamOperator<OrderEvent> orderEventDataStream = env.fromElements(
                new OrderEvent("user_1", "order_1", "create", 1000L),
                new OrderEvent("user_2", "order_2", "create", 2000L),
                new OrderEvent("user_1", "order_1", "modify", 10 * 1000L),
                new OrderEvent("user_1", "order_1", "pay", 60 * 1000L),
                new OrderEvent("user_2", "order_3", "create", 10 * 60 * 1000L),
                new OrderEvent("user_2", "order_3", "pay", 20 * 60 * 1000L)

        ).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderEvent>forBoundedOutOfOrderness(Duration.ZERO)
        .withTimestampAssigner(new SerializableTimestampAssigner<OrderEvent>() {
            @Override
            public long extractTimestamp(OrderEvent element, long recordTimestamp) {
                return element.timestamp;
            }
        }));
        //定义模式
        Pattern<OrderEvent, OrderEvent> orderEventOrderEventPattern = Pattern.<OrderEvent>begin("create")
                .where(new IterativeCondition<OrderEvent>() {
                    @Override
                    public boolean filter(OrderEvent value, Context<OrderEvent> ctx) throws Exception {
                        return value.eventType.equals("create");
                    }
                })
                .followedBy("pay")
                .where(new IterativeCondition<OrderEvent>() {
                    @Override
                    public boolean filter(OrderEvent value, Context<OrderEvent> ctx) throws Exception {
                        return value.eventType.equals("pay");
                    }
                })
                .within(Time.minutes(15));
        //模式应用到事件流上
        PatternStream<OrderEvent> orderEventPatternStream = CEP.pattern(orderEventDataStream.keyBy(value -> value.orderId), orderEventOrderEventPattern);
        //定义侧输出流
        OutputTag<String> timeoutTag = new OutputTag<String>("timeout") {
        };
        //将完全匹配和部分超时匹配的复杂事件提取出来进行处理
        SingleOutputStreamOperator<String> reslut = orderEventPatternStream.process(new OrderPayMatch());
        reslut.print("payed");
        reslut.getSideOutput(timeoutTag).print("timeout");
        env.execute();
    }
    public static class OrderPayMatch extends PatternProcessFunction<OrderEvent,String> implements TimedOutPartialMatchHandler<OrderEvent>{

        @Override
        public void processMatch(Map<String, List<OrderEvent>> match, Context ctx, Collector<String> out) throws Exception {
            //获取当前的支付事件
            OrderEvent payEvent = match.get("pay").get(0);
            out.collect("用户"+payEvent.userId+"订单"+payEvent.orderId+"已支付");
        }

        @Override
        public void processTimedOutMatch(Map<String, List<OrderEvent>> match, Context ctx) throws Exception {
            OrderEvent createEvent = match.get("create").get(0);
            OutputTag<String> timeoutTag = new OutputTag<String>("timeout") {
            };
            ctx.output(timeoutTag,"用户"+createEvent.userId+"订单"+createEvent.orderId + "超时未支付");
        }
    }
}

17.4.4、处理迟到数据

在 Flink CEP 中沿用了通过设置水位线(watermark)延迟来处理乱序数据的做法。当一个事件到来时,并不会立即做检测匹配处理,而是先放入一个缓冲区(buffer)。缓冲区内的数据,会按照时间戳由小到大排序;当一个水位线到来时,就会将缓冲区中所有时间戳小于水位线的事件依次取出,进行检测匹配。这样就保证了匹配事件的顺序和事件时间的进展一致,处理的顺序就一定是正确的。这里水位线的延迟时间,也就是事件在缓冲区等待的最大时间。

Flink CEP 同样提供了将迟到事件输出到侧输出流的方式: 我们可以基于 PatternStream 直接调用.sideOutputLateData()方法,传入一个 OutputTag,将迟到数据放入侧输出流另行处理。代码中调用方式如下:

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

// 定义一个侧输出流的标签
OutputTag<String> lateDataOutputTag = new OutputTag<String>("late-data"){};

SingleOutputStreamOperator<ComplexEvent> result = patternStream
.sideOutputLateData(lateDataOutputTag)	// 将迟到数据输出到侧输出流
.select(
// 处理正常匹配数据
new PatternSelectFunction<Event, ComplexEvent>() {...}
);

// 从结果中提取侧输出流
DataStream<String> lateData = result.getSideOutput(lateDataOutputTag);
17.5、CEP 的状态机实现

Flink CEP 的底层工作原理其实与正则表达式是一致的,是一个“非确定有限状态自动机”(Nondeterministic Finite Automaton,NFA)。

如检测用户连续三次登录失败的复杂事件。用 Flink CEP 中的 Pattern API 可以很方便地把它定义出来;如果我们现在不用 CEP,而是用 DataStream API 和处理函数来实现,应该怎么做呢?

这需要设置状态,并根据输入的事件不断更新状态。当然因为这个需求不是很复杂,我们也可以用嵌套的 if-else 条件判断将它实现,不过这样做的代码可读性和扩展性都会很差。更好的方式,就是实现一个状态机。
在这里插入图片描述
从初始状态(INITIAL)出发,遇到一个类型为fail 的登录失败事件,就开始进入部分匹配的状态;目前只有一个 fail 事件,我们把当前状态记作 S1。基于 S1 状态,如果继续遇到 fail 事件,那么就有两个 fail 事件,记作 S2。基于 S2 状态如果再次遇到 fail 事件,那么就找到了一组匹配的复杂事件,把当前状态记作 Matched, 就可以输出报警信息了。需要注意的是,报警完毕,需要立即重置状态回 S2;因为如果接下来再遇到 fail 事件,就又满足了新的连续三次登录失败,需要再次报警。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2116666.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

IM项目运行说明

注册登录以及消息列表界面&#xff1a; 联系人界面&#xff1a;新的好友/群聊列表/好友列表 界面&#xff1a; 群聊界面&#xff1a;群聊不想支持发视频&#xff0c;因为非技术上的麻烦原因。。。 图片可以下载&#xff1a; 私聊可以发视频&#xff1a; 私聊支持服务器消…

android kotlin 数据类 data class

1、Kotlin中的数据类主要用于保存数据。对于每个数据类&#xff0c;编译器会自动生成其他成员函数&#xff0c;允许您将实例打印到可读输出、比较实例、复制实例等。 中文文档&#xff1a; https://book.kotlincn.net/text/data-classes.html 2、新建文件dataClassTest.kt 3…

vue解决“用户代理样式表“边距问题

一、问题 当我们制作页面的时候&#xff0c;会发现页面会多出边距 打开控制台发现&#xff0c;是使用了“用户代理样式表” 这个无法去除&#xff0c;但是又觉得页面有间隙不好看&#xff0c;那怎么去除间隙&#xff1f;&#xff1f; 二、解决 将要写的东西放在div里&#x…

大模型笔记02--基于fastgpt和oneapi构建大模型应用平台

大模型笔记02--基于fastgpt和oneapi构建大模型应用平台 介绍部署&测试部署fastgptoneapi服务部署向量模型m3e和nomic-embed-text测试大模型 注意事项说明 介绍 随着大模型的快速发展&#xff0c;众多IT科技厂商都开发训练了各自的大模型&#xff0c;并提供了各具特色的AI产…

专题二_滑动窗口_算法专题详细总结

目录 滑动窗口&#xff0c;引入&#xff1a; 滑动窗口&#xff0c;本质&#xff1a;就是同向双指针&#xff1b; 1.⻓度最⼩的⼦数组&#xff08;medium&#xff09; 1.解析&#xff1a;给我们一个数组nums&#xff0c;要我们找出最小子数组的和target&#xff0c;首先想到的…

arduino ide安装详细步骤

​ 大家好&#xff0c;我是程序员小羊&#xff01; 前言&#xff1a; Arduino IDE 是一个专为编程 Arduino 微控制器设计的集成开发环境&#xff0c;使用起来非常方便。下面将介绍如何在不同平台上安装 Arduino IDE 的详细步骤&#xff0c;包括 Windows、Mac 和 Linux 系统。 …

计算机毕业设计选题推荐-乐器推荐系统-乐器商城-Java/Python项目实战

✨作者主页&#xff1a;IT毕设梦工厂✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Py…

《Cloud Native Data Center Networking》(云原生数据中心网络设计)读书笔记 -- 11部署BGP

本章应有助于回答以下问题: 核心的配置概念是什么?如何为 Clos 网络配置 BGP ?无编号的 BGP 如何工作?如何配置 BGP 与主机上的 BGP 发言者 (例如 Kube-router) 建立对等关系?如何配置 BGP 以对网络进行计划维护? 核心的 BGP 配置概念 全局BGP 配置&#xff0c;包含: r…

了解开源消息代理RabbitMQ

1.RabbitMQ 是什么&#xff1f; RabbitMQ是一个消息代理:它接受并转发消息。你可以把它想象成邮局:当你把要寄的邮件放进邮箱时&#xff0c;你可以确定邮递员最终会把邮件送到收件人那里。在这个比喻中&#xff0c;RabbitMQ是一个邮筒、一个邮局和一个邮递员。RabbitMQ和邮局之…

JavaScript 循环分支语句-for循环

先n1,判断n是否<10,满足条件&#xff0c;n1&#xff0c;输出n&#xff0c;再次判断n是否<10&#xff0c;循环.......... <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Comp…

SQL进阶技巧:截止当前批次前的批次量与订单量 | 移动窗口问题

目录 0 场景描述 1 数据准备 2 问题分析 3 小结 0 场景描述 表A有如下字段,user id(用户ID),batch id(批次ID),order id(订单ID),create time(创建时间),同一个用户ID下有多个批次,同一个批次下有多个订单ID,相同批次ID的创建时间是相同的,创建时间精确到了秒。 统计,截…

如何通过ip命令修改网卡名?(ip link set en1p0f0 name eth0)

快速提取 ip link set enp1s0f0 down ip link set enp1s0f0 name eth0 ip link set enp1s0f0 up ifconfig eth0背景 ifconfig命令不支持修改网卡名字。可以使用ip link进行修改。 从Linux 4.9版本开始&#xff0c;ip link 命令支持一个 set 子命令&#xff0c;可以用来修改接…

iPhone16全系采用A18处理器,不再区别对待,市场压力所致

iPhone16即将在3天后发布&#xff0c;外媒基本确定今年的iPhone16将全系采用A18处理器&#xff0c;与此前的基本款采用上一代处理器有很大的差异&#xff0c;这对于苹果用户来说无疑是一大利好&#xff0c;也将有助于推动iPhone16的销售。 此前的iPhone14和iPhone15&#xff0c…

[Redis] Redis基本命令与数据类型+单线程模型

&#x1f338;个人主页:https://blog.csdn.net/2301_80050796?spm1000.2115.3001.5343 &#x1f3f5;️热门专栏: &#x1f9ca; Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm1001.2014.3001.5482 &#x1f355; Collection与…

【springboot】使用AOP

目录 1. 添加依赖2. 创建切面类1. 创建切面类2. 切点表达式3. 增强方法 3. 开启AOP4. 创建控制类5. 测试 1. 添加依赖 <!-- AOP依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop<…

【iOS】MVC入门

【iOS】MVC模式的学习 文章目录 【iOS】MVC模式的学习前言MVC模式概念MVC的交流模式MVC的一个简单实践Model层View层Controller层 MVC的优点与缺点总结 前言 笔者在暑假的学习中完成了一些小项目&#xff0c;这些小项目中间有时候出现了一个小bug都要寻找很久&#xff0c;而且会…

前端学习笔记-Web APls篇-05

Bom操作 1.Window对象 1.1BOM(浏览器对象模型&#xff09; BOM(Browser Object Model ) 是浏览器对象模型 window对象是一个全局对象&#xff0c;也可以说是JavaScript中的顶级对象像document、alert()、console.log()这些都是window的属性&#xff0c;基本BOM的属性和方法都…

SAM2POINT:以zero-shot且快速的方式将任何 3D 视频分割为视频

摘要 我们介绍 SAM2POINT&#xff0c;这是一种采用 Segment Anything Model 2 (SAM 2) 进行零样本和快速 3D 分割的初步探索。 SAM2POINT 将任何 3D 数据解释为一系列多向视频&#xff0c;并利用 SAM 2 进行 3D 空间分割&#xff0c;无需进一步训练或 2D-3D 投影。 我们的框架…

ML19_GMM高斯混合模型详解

1. 中心极限定理 中心极限定理&#xff08;Central Limit Theorem, CLT&#xff09;是概率论中的一个重要定理&#xff0c;它描述了在一定条件下&#xff0c;独立同分布的随机变量序列的标准化和的分布趋向于正态分布的性质。这个定理在统计学中有着广泛的应用&#xff0c;尤其…

算法篇_C语言实现霍夫曼编码算法

一、前言 霍夫曼编码&#xff08;Huffman Coding&#xff09;是一种广泛使用的数据压缩算法&#xff0c;特别适用于无损数据压缩。它是由David A. Huffman在1952年提出的&#xff0c;并且通常用于文件压缩和传输中减少数据量。霍夫曼编码的核心思想是使用变长编码表对源数据进…