Flink CEP(三)pattern动态更新

news2025/1/11 10:05:56

        线上运行的CEP中肯定经常遇到规则变更的情况,如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景,如果规则窗口过长(一两个星期),状态过大,就会导致重启时间延长,期间就会造成一些想要处理的异常行为不能及时发现。

1.实现分析

  • 外部加载:通常规则引擎会有专门的规则管理模块,提供用户去创建自己的规则,对于Flink任务来说需要到外部去加载规则
  • 动态更新:需要提供定时去检测规则是否变更
  • 历史状态清理:在模式匹配中是一系列NFAState 的不断变更,如果规则发生变更,需要清理历史状态
  • API:需要对外提供易用的API

2.代码实现

       首先实现一个用户API。

package cep.functions;

import java.io.Serializable;

import org.apache.flink.api.common.functions.Function;

import cep.pattern.Pattern;

/**
 * @author StephenYou
 * Created on 2023-07-23
 * Description: 动态Pattern接口(用户调用API)不区分key
 */
public interface DynamicPatternFunction<T> extends Function, Serializable {

    /***
     * 初始化
     * @throws Exception
     */
    public void init() throws Exception;

    /**
     * 注入新的pattern
     * @return
     */
    public Pattern<T,T> inject() throws Exception;

    /**
     * 一个扫描周期:ms
     * @return
     */
    public long getPeriod() throws Exception;

    /**
     * 规则是否发生变更
     * @return
     */
    public boolean isChanged() throws Exception;
}

        希望上述API的调用方式如下。

//正常调用

CEP.pattern(dataStream,pattern);

//动态Pattern

CEP.injectionPattern(dataStream, new UserDynamicPatternFunction())

        所以需要修改CEP-Lib源码

        b.增加injectionPattern函数。

public class CEP {
    /***
     * Dynamic injection pattern function 
     * @param input
     * @param dynamicPatternFunction
     * @return
     * @param <T>
     */
    public static <T> PatternStream<T> injectionPattern throws Exception (
            DataStream<T> input,
            DynamicPatternFunction<T> dynamicPatternFunction){
        return new PatternStream<>(input, dynamicPatternFunction); 
    }
}

        增加PatternStream构造函数,因为需要动态更新,所以有必要传进去整个函数。

public class PatternStream<T> {
    PatternStream(final DataStream<T> inputStream, DynamicPatternFunction<T> dynamicPatternFunction) throws Exception {
        this(PatternStreamBuilder.forStreamAndPatternFunction(inputStream, dynamicPatternFunction));
    }
}

        修改PatternStreamBuilder.build, 增加调用函数的过程。

        final CepOperator<IN, K, OUT> operator = null;
        if (patternFunction == null ) {
            operator = new CepOperator<>(
                    inputSerializer,
                    isProcessingTime,
                    nfaFactory,
                    comparator,
                    pattern.getAfterMatchSkipStrategy(),
                    processFunction,
                    lateDataOutputTag);
        } else {
            operator = new CepOperator<>(
                    inputSerializer,
                    isProcessingTime,
                    patternFunction,
                    comparator,
                    null,
                    processFunction,
                    lateDataOutputTag);
        }

        增加对应的CepOperator构造函数。

    public CepOperator(
            final TypeSerializer<IN> inputSerializer,
            final boolean isProcessingTime,
            final DynamicPatternFunction patternFunction,
            @Nullable final EventComparator<IN> comparator,
            @Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,
            final PatternProcessFunction<IN, OUT> function,
            @Nullable final OutputTag<IN> lateDataOutputTag) {
        super(function);

        this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
        this.patternFunction = patternFunction;
        this.isProcessingTime = isProcessingTime;
        this.comparator = comparator;
        this.lateDataOutputTag = lateDataOutputTag;

        if (afterMatchSkipStrategy == null) {
            this.afterMatchSkipStrategy = AfterMatchSkipStrategy.noSkip();
        } else {
            this.afterMatchSkipStrategy = afterMatchSkipStrategy;
        }
        this.nfaFactory = null;
    }

        加载Pattern,构造NFA

    @Override
    public void open() throws Exception {
        super.open();
        timerService =
                getInternalTimerService(
                        "watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);

        //初始化
        if (patternFunction != null) {
            patternFunction.init();
            Pattern pattern = patternFunction.inject();
            afterMatchSkipStrategy = pattern.getAfterMatchSkipStrategy();
            boolean timeoutHandling = getUserFunction() instanceof TimedOutPartialMatchHandler;
            nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
            long period = patternFunction.getPeriod();
            // 注册定时器检测规则是否变更
            if (period > 0) {
                getProcessingTimeService().registerTimer(timerService.currentProcessingTime() + period, this::onProcessingTime);
            }
        }


        nfa = nfaFactory.createNFA();
        nfa.open(cepRuntimeContext, new Configuration());

        context = new ContextFunctionImpl();
        collector = new TimestampedCollector<>(output);
        cepTimerService = new TimerServiceImpl();

        // metrics
        this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
    }

        状态清理一共分为两块: 匹配状态数据清理、定时器清理;

        进行状态清理:

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {

        if (patternFunction != null) {
            // 规则版本更新
            if (needRefresh.value() < refreshVersion.get()) {
                //清除状态
                computationStates.clear();
                elementQueueState.clear();
                partialMatches.releaseCacheStatisticsTimer();
                //清除定时器
                Iterable<Long> registerTime = registerTimeState.get();
                if (registerTime != null) {
                    Iterator<Long> iterator = registerTime.iterator();
                    while (iterator.hasNext()) {
                        Long l = iterator.next();
                        //删除定时器
                        timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, l);
                        timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, l);
                        //状态清理
                        iterator.remove();
                    }
                }
                //更新当前的版本
                needRefresh.update(refreshVersion.get());
            }
        }
}

        上面是在处理每条数据时,清除状态和版本。接下来要进行状态和版本的初始化。

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);

        //初始化状态
        if (patternFunction != null) {
            /**
             * 两个标识位状态
             */
            refreshFlagState = context.getOperatorStateStore()
                    .getUnionListState(new ListStateDescriptor<Integer>("refreshFlagState", Integer.class));
            if (context.isRestored()) {
                if (refreshFlagState.get().iterator().hasNext()) {
                    refreshVersion = new AtomicInteger(refreshFlagState.get().iterator().next());
                }
            } else {
                refreshVersion = new AtomicInteger(0);
            }
            needRefresh = context.getKeyedStateStore()
                    .getState(new ValueStateDescriptor<Integer>("needRefreshState", Integer.class, 0));
        }
}

3.测试验证

        设置每10s变更一次Pattern。

 PatternStream patternStream = CEP.injectionPattern(source, new TestDynamicPatternFunction());
        patternStream.select(new PatternSelectFunction<Tuple3<String, Long, String>, Map>() {
            @Override
            public Map select(Map map) throws Exception {
                map.put("processingTime", System.currentTimeMillis());
                return map;
            }
        }).print();
        env.execute("SyCep");

    }

    public static class TestDynamicPatternFunction implements DynamicPatternFunction<Tuple3<String, Long, String>> {

        public TestDynamicPatternFunction() {
            this.flag = true;
        }

        boolean flag;
        int time = 0;
        @Override
        public void init() throws Exception {
            flag = true;
        }

        @Override
        public Pattern<Tuple3<String, Long, String>, Tuple3<String, Long, String>> inject()
                            throws Exception {

            // 2种pattern
            if (flag) {
                Pattern pattern = Pattern
                        .<Tuple3<String, Long, String>>begin("start")
                        .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                            @Override
                            public boolean filter(Tuple3<String, Long, String> value,
                                    Context<Tuple3<String, Long, String>> ctx) throws Exception {
                                return value.f2.equals("success");
                            }
                        })
                        .times(1)
                        .followedBy("middle")
                        .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                            @Override
                            public boolean filter(Tuple3<String, Long, String> value,
                                    Context<Tuple3<String, Long, String>> ctx) throws Exception {
                                return value.f2.equals("fail");
                            }
                        })
                        .times(1)
                        .next("end");
                return pattern;
            } else {

                Pattern pattern = Pattern
                        .<Tuple3<String, Long, String>>begin("start2")
                        .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                            @Override
                            public boolean filter(Tuple3<String, Long, String> value,
                                    Context<Tuple3<String, Long, String>> ctx) throws Exception {
                                return value.f2.equals("success2");
                            }
                        })
                        .times(2)
                        .next("middle2")
                        .where(new IterativeCondition<Tuple3<String, Long, String>>() {
                            @Override
                            public boolean filter(Tuple3<String, Long, String> value,
                                    Context<Tuple3<String, Long, String>> ctx) throws Exception {
                                return value.f2.equals("fail2");
                            }
                        })
                        .times(2)
                        .next("end2");
                return pattern;
            }
        }

        @Override
        public long getPeriod() throws Exception {
            return 10000;
        }

        @Override
        public boolean isChanged() throws Exception {
            flag = !flag ;
            time += getPeriod();
            System.out.println("change pattern : " + time);
            return true;
        }
    }

打印结果:符合预期

4.源码地址

感觉有用的话,帮忙点个小星星。^_^

 GitHub - StephenYou520/SyCep: CEP 动态Pattern

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/839398.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

策略模式(C++)

定义 定义一系列算法&#xff0c;把它们一个个封装起来&#xff0c;并且使它们可互相替换((变化)。该模式使得算法可独立手使用它的客户程序稳定)而变化(扩展&#xff0c;子类化)。 ——《设计模式》GoF 使用场景 在软件构建过程中&#xff0c;某些对象使用的算法可能多种多…

24考研数据结构-并查集

目录 5.5.2 并查集&#xff08;双亲表示法&#xff09;1. 并查集的存储结构2. 并查集的代码实现初始化并查时间复杂度union操作的优化&#xff08;不要瘦高的树&#xff09;并查集的进一步优化&#xff08;find的优化&#xff0c;压缩路径&#xff09;优化总结 数据结构&#x…

大模型使用——超算上部署LLAMA-2-70B-Chat

大模型使用——超算上部署LLAMA-2-70B-Chat 前言 1、本机为Inspiron 5005&#xff0c;为64位&#xff0c;所用操作系统为Windos 10。超算的操作系统为基于Centos的linux&#xff0c;GPU配置为A100&#xff0c;所使用开发环境为Anaconda。 2、本教程主要实现了在超算上部署LLAM…

SpringBoot整合redis集群实现读写分离(哨兵模式)

1 首先要在Linux虚拟机上安装redis # 安装包存放目录 cd /usr/local/redis # 下载最新稳定版 wget https://download.redis.io/releases/redis-6.2.6.tar.gz # 解压 tar -zxvf redis-6.2.6.tar.gz # 进入解压后的目录 cd /usr/local/redis/redis-6.2.6/ # 编译 make # 执行 &q…

系列二、IOC基本概念和底层原理

一、IOC基本概念 控制反转&#xff1a;把对象创建和对象之间的调用过程&#xff0c;交给Spring进行管理&#xff1b;使用IOC的目的&#xff1a;减低耦合度&#xff1b; 二、IOC底层原理 xml解析 工厂模式 反射 2.1、图解IOC底层原理 # 第一步&#xff1a;xml配置文件&am…

【小沐学NLP】在线AI绘画网站(网易云课堂:AI绘画工坊)

文章目录 1、简介1.1 参与方式1.2 模型简介 2、使用费用3、操作步骤3.1 选择模型3.2 输入提示词3.3 调整参数3.4 图片生成 4、测试例子4.1 小狗4.2 蜘蛛侠4.3 人物4.4 龙猫 结语 1、简介 Stable Diffusion是一种强大的图像生成AI&#xff0c;它可以根据输入的文字描述词&#…

AI驱动的靶点发现综述

疾病建模和靶点识别是药物发现中最关键的初始步骤。传统的靶点识别是一个耗时的过程&#xff0c;需要数年至数十年的时间&#xff0c;并且通常从学术报告开始。鉴于其分析大型数据集和复杂生物网络的优势&#xff0c;人工智能在现代药物靶点识别中发挥着越来越重要的作用。该综…

telnet检验网络能不能通

telnet检测网络能不能通&#xff08;ip地址端口号&#xff09;

wait,notify/notifyAll都要放在synchronized锁里面

wait&#xff0c;notify/notifyAll都要放在synchronized锁里面 如果没放在synchronized锁里面&#xff0c;就要报这样的错误 public class Test5 {public static void main(String[] args) throws InterruptedException {Thread t1 new Thread(()->{syn();},"t1&quo…

【C高级】Day 3 shell

1. 整理思维导图 3. 输入一个文件名&#xff0c;判断是否为shell脚本文件&#xff0c;如果是脚本文件&#xff0c;判断是否有可执行权限&#xff0c;如果有可执行权限&#xff0c;运行文件&#xff0c;如果没有可执行权限&#xff0c;给文件添加可执行权限。 #!/bin/bash rea…

python制作小程序制作流程,用python编写一个小程序

这篇文章主要介绍了python制作小程序代码宠物运输&#xff0c;具有一定借鉴价值&#xff0c;需要的朋友可以参考下。希望大家阅读完这篇文章后大有收获&#xff0c;下面让小编带着大家一起了解一下。 1 importtkinter2 importtkinter.messagebox3 importmath4 classJSQ:5 6 7 d…

CAM,PradCAM,layer CAM(可解释性分析方法)

目录 1.CAM 1.1作用 1.2应用实例 1.3CAM的特点 1.4CAM的思路 1.5CAM的缺点 2.GradCAM 2.1和GAM的区别及思路 2.2应用面 2.3一个延深&#xff08;解决模型偏见&#xff09; 2.4缺点 3.Grad CAM 4.Score CAM 5.layer CAM 参考文献 1.CAM 1.1作用 CAM不不光可以分类还…

前端JS实用操作符,一些骚操作✨

目录 0、!! 双重逻辑非操作符 &#x1f4da; 1、?? 操作符 空值合并/空判断 ✅ 2、?. 可选链运算符&#x1f50d; 3、?? 操作符 逻辑空值赋值运算符 &#x1f49a; 4、三元运算符 &#x1f4d7; 5、~~ 操作符 双位运算符 &#x1f528; 6、&&与 ||或 短…

在java中操作redis_Data

1.引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency> 2.配置Redis数据源 redis:host: ${sky.redis.host}port: ${sky.redis.port}password: ${sk…

揭秘程序员最喜欢的5个高薪工作

大家好&#xff0c;这里是程序员晚枫。想了解更多精彩内容&#xff0c;快来关注程序员晚枫 今天给大家推荐5个适合程序员的高薪岗位。 01 推荐岗位 以下是5个工资最高的程序员工作&#xff1a; 数据科学家&#xff1a;数据科学家是负责数据收集、处理、分析和报告的专业人员。…

Markdown学习简记

目录 一、写Markdown的第0步 二、语法须知 标题 粗体强调 斜体 斜体同时粗体 删除线 高亮 代码 代码块 引用块 无序列表 有序列表 链接 表格 图片 分割线 目录生成 内联HTML代码 Typora常用快捷键 Typora的主题样式与检查元素 一、写Markdown的第…

MySQL事务篇:ACID原则、事务隔离级别及事务机制原理剖析

引言 众所周知&#xff0c;MySQL数据库的核心功能就是存储数据&#xff0c;通常是整个业务系统中最重要的一层&#xff0c;可谓是整个系统的“大本营”&#xff0c;因此只要MySQL存在些许隐患问题&#xff0c;对于整个系统而言都是致命的。那此刻不妨思考一个问题&#xff1a; …

录像编辑软件—— Camtasia Studio 2023下载安装和激活的基本流程

Camtasia Studio 2023是一款简单易用的高清录屏和视频编辑软件&#xff0c;拥有录制屏幕和配音、视频的剪辑和过场动画、添加说明字幕和水印、制作视频封面和菜单、视频压缩和播放。 Camtasia Studio能在任何颜色模式下轻松地记录 屏幕动作&#xff0c;包括影像、音效、鼠标移…

C语言案例 按序输出三个整数-02

题目&#xff1a;输入三个整数a,b,c,按从小到大的顺序输出 步骤一&#xff1a;定义程序的目标 编写一个C程序&#xff0c;随机输入三个整数&#xff0c;按照从小到大的顺序输出。 步骤二&#xff1a;程序设计 整个程序由三个模块组成&#xff0c;第一个为scanf输入函数模块&a…

[FlareOn6]FlareBear

前言 apk的逆向&#xff0c;没有壳&#xff0c;但可能做的不是太多&#xff0c;没能想到整个算法的运作原理 分析 搜索flag会发现存在这么一个函数&#xff0c;那么显示flag的时候应该是熊会跳舞显示flag,只要满足熊happy和ecsstatic就可以&#xff0c;happy只要一直存在点击…