Flink CEP(二) 运行源码解析

news2025/1/18 17:01:19

通过DemoApp学习一下,CEP的源码执行逻辑。为下一篇实现CEP动态Pattern奠定理论基础。

1. Pattern的定义

Pattern<Tuple3<String, Long, String>,?> pattern = Pattern
                .<Tuple3<String, Long, String>>begin("begin")
                .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                    @Override
                    public boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)
                            throws Exception {
                        return value.f2.equals("success");
                    }
                })
                .followedByAny("middle")
                .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                    @Override
                    public boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)
                            throws Exception {
                        return value.f2.equals("fail");
                    }
                })
                .followedBy("end")
                .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                    @Override
                    public boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx)
                            throws Exception {
                        return value.f2.equals("end");
                    }
                });

 在执行中,我们可以看到pattern的几个属性,进入Pattern类中查看。

public class Pattern<T, F extends T> {

    /** Name of the pattern. */
    private final String name;

    /** Previous pattern. */
    private final Pattern<T, ? extends T> previous;

    /** The condition an event has to satisfy to be considered a matched. */
    private IterativeCondition<F> condition;

    /** Window length in which the pattern match has to occur. */
    private final Map<WithinType, Time> windowTimes = new HashMap<>();

    /**
     * A quantifier for the pattern. By default set to {@link Quantifier#one(ConsumingStrategy)}.
     */
    private Quantifier quantifier = Quantifier.one(ConsumingStrategy.STRICT);

    /** The condition an event has to satisfy to stop collecting events into looping state. */
    private IterativeCondition<F> untilCondition;

    /** Applicable to a {@code times} pattern, and holds the number of times it has to appear. */
    private Times times;

    private final AfterMatchSkipStrategy afterMatchSkipStrategy;
}

可以看到每一个Pattern都会存在以下属性:

  • Name:Pattern的Name
  • previous:之前的Pattern
  • condition:Pattern的匹配逻辑
  • windowTimes:限制窗口的时长
  • Quantifier:Pattern的属性,包括配置Pattern的模式可以发生的循环次数,或者这个模式是贪婪的还是可选的。
    • /**
       * A quantifier describing the Pattern. There are three main groups of {@link Quantifier}.
       *
       * <ol>
       *   <li>Single
       *   <li>Looping
       *   <li>Times
       * </ol>
       *
       * <p>Each {@link Pattern} can be optional and have a {@link ConsumingStrategy}. Looping and Times
       * also hava an additional inner consuming strategy that is applied between accepted events in the
       * pattern.
       */
      public class Quantifier {
      
          private final EnumSet<QuantifierProperty> properties;
      
          private final ConsumingStrategy consumingStrategy;
      
          private ConsumingStrategy innerConsumingStrategy = ConsumingStrategy.SKIP_TILL_NEXT;
      }
  • untilCondition:Pattern的循环匹配的结束条件
  • times:连续匹配次数
  • afterMatchSkipStrategy:匹配后的跳过策略

2.PatternStream的构建

        对Pattern定义完成,会通过PatternStreamBuilder,将1中定义好的Pattern应用到输入流中,返回对应的PatternStream。

    static <IN> PatternStreamBuilder<IN> forStreamAndPattern(
            final DataStream<IN> inputStream, final Pattern<IN, ?> pattern) {
        return new PatternStreamBuilder<>(
                inputStream, pattern, TimeBehaviour.EventTime, null, null);
    }

    PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {
        this(PatternStreamBuilder.forStreamAndPattern(inputStream, pattern));
    }

继续执行代码,进入Select()。

    public <R> SingleOutputStreamOperator<R> select(
            final PatternSelectFunction<T, R> patternSelectFunction,
            final TypeInformation<R> outTypeInfo) {

        final PatternProcessFunction<T, R> processFunction =
                fromSelect(builder.clean(patternSelectFunction)).build();

        return process(processFunction, outTypeInfo);
    }

进入process可以看到PatternStream.select会调用builder.build函数。

    public <R> SingleOutputStreamOperator<R> process(
            final PatternProcessFunction<T, R> patternProcessFunction,
            final TypeInformation<R> outTypeInfo) {

        return builder.build(outTypeInfo, builder.clean(patternProcessFunction));
    }

在build函数中会完成NFAFactory的定义,随后构建CepOperator。inputstream随之运行CepOperator即pattern定义的处理逻辑,并返回结果流PatternStream。

    <OUT, K> SingleOutputStreamOperator<OUT> build(
            final TypeInformation<OUT> outTypeInfo,
            final PatternProcessFunction<IN, OUT> processFunction) {

        checkNotNull(outTypeInfo);
        checkNotNull(processFunction);

        final TypeSerializer<IN> inputSerializer =
                inputStream.getType().createSerializer(inputStream.getExecutionConfig());
        final boolean isProcessingTime = timeBehaviour == TimeBehaviour.ProcessingTime;

        final boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;
        final NFACompiler.NFAFactory<IN> nfaFactory =
                NFACompiler.compileFactory(pattern, timeoutHandling);

        CepOperator<IN, K, OUT> operator = new CepOperator<>(
                    inputSerializer,
                    isProcessingTime,
                    nfaFactory,
                    comparator,
                    pattern.getAfterMatchSkipStrategy(),
                    processFunction,
                    lateDataOutputTag);
  

        final SingleOutputStreamOperator<OUT> patternStream;
        if (inputStream instanceof KeyedStream) {
            KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;

            patternStream = keyedStream.transform("CepOperator", outTypeInfo, operator);
        } else {
            KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();

            patternStream =
                    inputStream
                            .keyBy(keySelector)
                            .transform("GlobalCepOperator", outTypeInfo, operator)
                            .forceNonParallel();
        }

        return patternStream;
    }

3.CepOperator的执行

        初始化。

    @Override
    public void open() throws Exception {
        super.open();
        timerService =
                getInternalTimerService(
                        "watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);


        nfa = nfaFactory.createNFA();
        nfa.open(cepRuntimeContext, new Configuration());

        context = new ContextFunctionImpl();
        collector = new TimestampedCollector<>(output);
        cepTimerService = new TimerServiceImpl();

        // metrics
        this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
    }

 可以看到,nfaFactory.createNFA();会解析pattern组合,并为每一个pattern创建一个state。

CepOperator会在processElement中处理流中的每条数据。

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {


        if (isProcessingTime) {
            if (comparator == null) {
                // there can be no out of order elements in processing time
                NFAState nfaState = getNFAState();
                long timestamp = getProcessingTimeService().getCurrentProcessingTime();
                advanceTime(nfaState, timestamp);
                processEvent(nfaState, element.getValue(), timestamp);
                updateNFA(nfaState);
            } else {
                long currentTime = timerService.currentProcessingTime();
                bufferEvent(element.getValue(), currentTime);
            }

        } else {

            long timestamp = element.getTimestamp();
            IN value = element.getValue();

            // In event-time processing we assume correctness of the watermark.
            // Events with timestamp smaller than or equal with the last seen watermark are
            // considered late.
            // Late events are put in a dedicated side output, if the user has specified one.

            if (timestamp > timerService.currentWatermark()) {

                // we have an event with a valid timestamp, so
                // we buffer it until we receive the proper watermark.

                bufferEvent(value, timestamp);

            } else if (lateDataOutputTag != null) {
                output.collect(lateDataOutputTag, element);
            } else {
                numLateRecordsDropped.inc();
            }
        }
    }

        可以看到,如果使用的是处理时间,需要先对数据根据当前处理时间将乱序的数据做一次处理,保证数据的有序。

        如果使用的事件时间,如果事件时间戳小于等于watermark会被认为是迟到数据。

        正常数据会先被缓存起来,等待处理。

    private void bufferEvent(IN event, long currentTime) throws Exception {
        List<IN> elementsForTimestamp = elementQueueState.get(currentTime);
        if (elementsForTimestamp == null) {
            elementsForTimestamp = new ArrayList<>();
            registerTimer(currentTime);
        }

        elementsForTimestamp.add(event);
        elementQueueState.put(currentTime, elementsForTimestamp);
    }

       elementQueueState 会以时间戳为key保存对应的数据。在onEventTime()函数中通过processEvent中处理缓存的匹配数据。

    @Override
    public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {

        // 1) get the queue of pending elements for the key and the corresponding NFA,
        // 2) process the pending elements in event time order and custom comparator if exists
        //		by feeding them in the NFA
        // 3) advance the time to the current watermark, so that expired patterns are discarded.
        // 4) update the stored state for the key, by only storing the new NFA and MapState iff they
        //		have state to be used later.
        // 5) update the last seen watermark.

        // STEP 1
        PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
        NFAState nfaState = getNFAState();

        // STEP 2
        while (!sortedTimestamps.isEmpty()
                && sortedTimestamps.peek() <= timerService.currentWatermark()) {
            long timestamp = sortedTimestamps.poll();
            advanceTime(nfaState, timestamp);
            // 对事件按时间进行排序
            try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {
                elements.forEachOrdered(
                        event -> {
                            try {
                                processEvent(nfaState, event, timestamp);
                            } catch (Exception e) {
                                throw new RuntimeException(e);
                            }
                        });
            }
            elementQueueState.remove(timestamp);
        }

        // STEP 3
        advanceTime(nfaState, timerService.currentWatermark());

        // STEP 4
        updateNFA(nfaState);
    }
   private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {
        try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {
            Collection<Map<String, List<IN>>> patterns =
                    nfa.process(
                            sharedBufferAccessor,
                            nfaState,
                            event,
                            timestamp,
                            afterMatchSkipStrategy,
                            cepTimerService);
            if (nfa.getWindowTime() > 0 && nfaState.isNewStartPartialMatch()) {
                registerTimer(timestamp + nfa.getWindowTime());
            }
            processMatchedSequences(patterns, timestamp);
        }
    }


    private void processMatchedSequences(
            Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {
        PatternProcessFunction<IN, OUT> function = getUserFunction();
        setTimestamp(timestamp);
        for (Map<String, List<IN>> matchingSequence : matchingSequences) {
            function.processMatch(matchingSequence, context, collector);
        }
    }

        nfa.process()最后会调用doProcess进行处理。

        computer

         可以看到每来一个新的Event,就会从上一个数据停留的状态开始遍历。判断新事件Event匹配之前已经匹配过的哪个状态,并为其版本号+1

前5条数据是success->fail->fail->success->fail,我们可以观察到partialMatches的变化如下:

success事件到达,因为之前没有事件,所以当前停留的状态是 begin。success匹配,预期会停留在middle状态

 fail事件到达,可以看到上面的success事件停留在了middle状态,并且begin的版本+1.

判断这个fail事件可以匹配后续的patern,状态从middle转移到end。存在newComputationStates中。最终更新到partialMatch中。

 第二个fail事件到达,只能匹配之前的middle状态,所以partialMatch中会新增一个end状态,并且middle的版本+1;

 

 最后如果状态到达终态,输出到potentialMatches中存储。

打印结果,可以看到每个事件都会试图去匹配所有的历史状态,nfa会存储所有匹配上的历史状态,直到到达终态。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/809367.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL检索数据和排序数据

目录 一、select语句 1.检索单个列&#xff08;SELECT 列名 FROM 表名;&#xff09; 2.检索多个列&#xff08;SELECT 列名1&#xff0c;列名2&#xff0c;列名3 FROM 表名;&#xff09; 3.检索所有的列&#xff08;SELECT * FROM 表名;&#xff09; 4.检索不同的行&#x…

【小白必看】Python图片合成示例之使用PIL库实现多张图片按行列合成

文章目录 前言效果图1. 导入必要的库2. 打开文件并获取大小3. 设置生成图片的行数和列数4. 获取所有图片的名称列表5. 创建新的画布6. 遍历每个位置并粘贴图片7. 保存合成的图片完整代码图片来源结束语 前言 本文介绍了一个用于图片合成的 Python 代码示例。该代码使用了PIL库来…

一篇文章搞定Java泛型

目录 介绍 优点 泛型类 语法定义 代码示例 泛型类注意事项 抽奖示例 泛型类派生子类 定义 代码示例 子类是泛型 子类不是泛型 泛型接口 定义 泛型方法 定义 代码示例 泛型方法与可变参数 泛型方法总结 ​编辑类型通配符 定义 代码示例 通配符的上限 定义 …

leetcode743. 网络延迟时间 floyd

https://leetcode.cn/problems/network-delay-time/ 有 n 个网络节点&#xff0c;标记为 1 到 n。 给你一个列表 times&#xff0c;表示信号经过 有向 边的传递时间。 times[i] (ui, vi, wi)&#xff0c;其中 ui 是源节点&#xff0c;vi 是目标节点&#xff0c; wi 是一个信…

详细介绍 React 中如何使用 redux

在使用之前要先了解它的配套插件&#xff1a; 在React中使用redux&#xff0c;官方要求安装其他插件 Redux Toolkit 和 react-redux Redux Toolkit&#xff1a;它是一个官方推荐的工具集&#xff0c;旨在简化 Redux 的使用和管理。Redux Toolkit 提供了一些提高开发效率的工具…

F5 LTM 知识点和实验 5-健康检测

第五章:健康检测 监控的分类: 地址监控(3层)服务监控(4层)内容监控(7层)应用监控(7层)性能监控(7层)路径监控(3、4、7层)三层监控: 三层监控可以帮助bipip系统通过检查网络是否可达监视资源。比如使用icmp echo,向监控节点发送icmp_echo报文,如果接收到响应…

求分享如何批量压缩视频的容量的方法

视频内存过大&#xff0c;不但特别占内存&#xff0c;而且还会使手机电脑出现卡顿的现象&#xff0c;除此之外&#xff0c;如果我们想发送这些视频文件可能还会因为内存太大无法发送。因此&#xff0c;我们可以批量地压缩视频文件的内存大小&#xff0c;今天小编要来分享一招&a…

车载软件架构 —— 信息安全与基础软件

车载软件架构 —— 信息安全与基础软件 我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 没有人关注你。也无需有人关注你。你必须承认自己的价值,你不能站在他人的角度来反对自己。人生在世,最怕…

郑州多域名https证书

多域名https证书是https证书中比较特殊的一款&#xff0c;它保护的域名记录是众多https证书中最灵活的。不管是DV基础型的多域名https证书还是OV企业型和EV增强型的多域名https证书既可以保护多个主域名或者子域名&#xff0c;还可以主域名子域名随意组合&#xff0c;只要申请者…

matlab--solve函数的用法

目录 1.用法结构 2.解单变量方程 3.解多变量方程 4.解带参方程 5.解不等式 6.总结 1.用法结构 solve函数是MATLAB中的一个符号计算函数&#xff0c;用于求解方程组或方程的符号解。 它的用法如下&#xff1a; 定义符号变量&#xff1a;使用syms函数定义符号变量&#…

CSDN博客置顶操作

目录 背景: 过程: 第一步: 第二步&#xff1a; 总结&#xff1a; 背景: 对于文章博主都想把优质的好文放在第一位与大家分享&#xff0c;让更多的人看到自己的文章以便更好的展现出来&#xff0c;那么就是置顶。 过程: 第一步: 打开CSDN主页文章&#xff0c;将鼠标放在…

css定义超级链接a标签里面的title的样式

效果: 代码: 总结:此css 使用于任何元素,不仅仅是a标签!

找不到mfc140u.dll怎么解决

第一&#xff1a;mfc140u.dll有什么用途&#xff1f; mfc140u.dll是Windows操作系统中的一个动态链接库文件&#xff0c;它是Microsoft Foundation Class (MFC)库的一部分。MFC是 C中的一个框架&#xff0c;用于构建Windows应用程序的用户界面和功能。mfc140u.dll包含了MFC库中…

12. Mybatis 多表查询 动态 SQL

目录 1. 数据库字段和 Java 对象不一致 2. 多表查询 3. 动态 SQL 使用 4. 标签 5. 标签 6. 标签 7. 标签 8. 标签 9. 通过注解实现 9.1 查找所有数据 9.2 通过 id 查找 1. 数据库字段和 Java 对象不一致 我们先来看一下数据库中的数据&#xff1a; 接下来&#…

冠达管理:股指预计维持震荡格局 关注汽车、酿酒等板块

冠达管理指出&#xff0c;周四A股商场冲高遇阻、小幅震动整理&#xff0c;早盘股指高开后震动上行&#xff0c;沪指盘中在3245点邻近遭遇阻力&#xff0c;午后股指逐级回落&#xff0c;轿车、金融、酿酒以及军工等职业轮番领涨&#xff0c;互联网、软件、半导体以及证券等职业震…

Git克隆文件不显示绿色勾、红色感叹号等图标

1、问题 Git和TorToiseGit安装后&#xff0c;Git克隆的文件不会显示绿色勾、红色感叹号等图标。 2、检查注册表 2.1、打开注册表 (1)WinR打开运行窗口&#xff0c;输入regedit&#xff0c;点击确定&#xff0c;打开注册表编辑器。 2.2、找如下路径 (1)找到路径 计算机\HKEY_…

Unity 性能优化四:UI耗时函数、资源加载、卸载API

UI耗时函数 1.1 Canvas.SendWillRenderCanvases 这个函数是由于自身UI的更新&#xff0c;产生的耗时 1. 这里更新的是vertex 属性&#xff0c;比如 color、tangent、position、uv&#xff0c;修改recttransform的position、scale&#xff0c;rotation并不会导致顶点属性改变…

想测试入门就必须要懂的软件开发流程

从事软件测试行业&#xff0c;每天面对的被测对象都是软件。如果想要更好的去完成测试工作&#xff0c;首先需要对被测对象&#xff0c;也就是对软件要有基本的了解。 软件 与计算机系统操作有关的计算机程序、可能有的文件、文档及数据。 程序好理解&#xff0c;就是可以操…

JS正则表达式:常用正则手册/RegExp/正则积累

一、正则基础语法 JavaScript 正则表达式 | 菜鸟教程 JS正则表达式语法大全&#xff08;非常详细&#xff09; 二、使用场景 2.1、校验中国大陆手机号的正则表达式 正则 /^1[3456789]\d{9}$/解释 序号正则解释1^1以数字 1 开头2[3456789]第二位可以是 3、4、5、6、7、8、…

MybatisPlus拓展篇

文章目录 逻辑删除通用枚举字段类型处理器自动填充功能防全表更新与删除插件MybatisX快速开发插件插件安装逆向工程常见需求代码生成 乐观锁问题引入乐观锁的使用效果测试 代码生成器执行SQL分析打印多数据源 逻辑删除 逻辑删除的操作就是增加一个字段表示这个数据的状态&…