flink-触发器Trigger和移除器Evictor

news2024/11/20 1:24:46

Trigger 触发器

触发器作用:控制窗口什么时候除法计算。即执行窗口函数;基于WindowStream调用trigger()方法,传入自定义触发器(trigger);

每一个窗口分配器(windowAssigner) 都会对应一个默认的触发器;

 源码样例

  
  @PublicEvolving
    public <W extends Window> WindowedStream<T, KEY, W> window(
            WindowAssigner<? super T, W> assigner) {
        return new WindowedStream<>(this, assigner);
    }
  

  @PublicEvolving
    public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {

        this.input = input;

        this.builder =
                new WindowOperatorBuilder<>(
                        windowAssigner,
                        windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),
                        input.getExecutionConfig(),
                        input.getType(),
                        input.getKeySelector(),
                        input.getKeyType());
    }

==============默认触发器===
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }

Triger类有4个方法

  1.  onElement:窗口中每来一个元素调用该方法。
    onProcessingTime:当注册的处理时间定时器触发时,将调用这个方法。
     onEventTime:当注时的事件时间定时器触发时,将调用这个方法。
     clear:窗口关闭冰销毁时调用这个方法,一般用来清除自定义状态。
    
    onElement() ,onProcessingTime(),onEventTime()方法的返回类型都是 TriggerResult;TriggerResult中包含四个枚举值:
    CONTINUE:表示对窗口不执行任何操作。
    FIRE:触发计算并输出结果。注意计算完成后,窗口中的数据并不会被清除,将会被保留。
    PURGE:表示将窗口中的数据和窗口清除。
    FIRE_AND_PURGE:表示先将数据进行计算,输出结果,然后将窗口中的数据和窗口进行清除。
    

源码

/** No action is taken on the window. */
CONTINUE(false, false),
/** {@code FIRE_AND_PURGE} evaluates the window function and emits the window result. */
FIRE_AND_PURGE(true, true),
/**
 * On {@code FIRE}, the window is evaluated and results are emitted. The window is not purged,
 * though, all elements are retained.
 */
FIRE(true, false),
/**
 * All elements in the window are cleared and the window is discarded, without evaluating the
 * window function or emitting any elements.
 */
PURGE(false, true);

flink提供的触发器 

flink提供触发器

ProcessingTimeoutTrigger

  • EventTimeTrigger通过对比EventTime和窗口的Endtime确定是否触发窗口计算,如果EventTime大于Window EndTime则触发,否则不触发,窗口将继续等待。
  • ProcessingTimeoutTrigger:当内置触发器满足设置的超时时间时,触发窗口的计算。
  • ProcessTimeTrigger:通过对比ProcessTime和窗口EndTme确定是否触发窗口,如果ProcessTime大于EndTime则触发计算,否则窗口继续等待。
  • ContinuousEventTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前EndTime触发窗口计算。
  • ContinuousProcessingTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前ProcessTime触发窗口计算。
  • CountTrigger:根据接入数据量是否超过设定的阙值判断是否触发窗口计算。
  • DeltaTrigger:根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算。
  • PurgingTrigger:可以将任意触发器作为参数转换为Purge类型的触发器,计算完成后数据将被清理。
  • NeverTrigger:任何时候都不触发窗口计算,全局窗口触发器,

原文链接:https://blog.csdn.net/qq_37555071/article/details/122514061

水印触发一般是窗口关闭时间

flink提供的触发器是与窗口对应,当有水印时,如果水印时间大于等于窗口结束时间会触发计算;window.maxTimestamp()获取的是窗口end-1; EventTimeTrigger 的源码可以很明确可以看到注册时注册了触发时间为window.maxTimestamp(),这也是窗口关闭的触发机制。

如果在窗口关闭前触发计算设置多个触发计算时间,这样实现一些特定的需求。例如,每10s输出一次当天的累计数据;

public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {}

    @Override
    public TriggerResult onElement(
            Object element, long timestamp, TimeWindow window, TriggerContext ctx)
            throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
       // 限定触发条件为窗口关闭时间,否则就继续窗口 
       return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
            throws Exception {
        return TriggerResult.CONTINUE;
    }
.....
 

自定义触发器

继承Triger,重写抽象方法,案例

.window(TumblingEventTimeWindows.of(Time.hours(24)))
                .trigger(new MyTrigger())
                .process(new WindowResult())
                .print();

窗口长24小时,每十秒触发一次计算
===================

    public static class MyTrigger extends Trigger<Event, TimeWindow> {
        @Override
        public TriggerResult onElement(Event event, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
             //定义状态,记录该状态 触发器第一个元素进来时注册全部的触发器
            ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(
                    new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN)
            );
            //第一次注册,右面全部跳过
            if (isFirstEvent.value() == null) {
                for (long i = timeWindow.getStart(); i < timeWindow.getEnd(); i = i + 10000L) {
                    //注册触发器  间隔10s
                    triggerContext.registerEventTimeTimer(i);
                }
                isFirstEvent.update(true);
            }
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
           //使用的事件时间,因此触发窗口的计算
            return TriggerResult.FIRE;
        }

        @Override
        public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
            ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(
                    new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN)
            );
            isFirstEvent.clear();
        }
    }

 

移除器Evictor

作用:主要用来定义移除某些数据的逻辑。基于windowedStream调用evictor()方法,就可以传入一个自定义得移除器(Evictor)。不同窗口类型都有各自预测实现的移除器。

stream.keyby().window().evictor(new MyEvictor)

evictBefore():定义窗口执行函数之前移除的数据操作,移除后的数据不参与窗口计算;

evictAfter():定义执行窗口函数后移除数据的操作;

默认情况下预实现的移出弃都是在执行窗口函数之前移除数据

注意:如果在evict中使用了iterable.iterator(),后面再次使用时不能直接使用

 .keyBy(r -> r.user)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)));
        window.evictor(new Evictor<Event, TimeWindow>() {
                    @Override
                    public void evictBefore(Iterable<TimestampedValue<Event>> elements, int size, TimeWindow window, EvictorContext evictorContext) {
                        Iterator<TimestampedValue<Event>> iterator = elements.iterator();
                        while (iterator.hasNext()){
                            TimestampedValue<Event> next = iterator.next();
                            if(next.getValue().url.equals("./prod?id=1")){
                                iterator.remove();
                            }
                        }
                    }

                    @Override
                    public void evictAfter(Iterable<TimestampedValue<Event>> elements, int size, TimeWindow window, EvictorContext evictorContext) {
                        return;
                    }
                })
                .process(new ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow>() {
                    @Override
                    public void process(String s, ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow>.Context context, Iterable<Event> elements, Collector<UrlViewCount> out) throws Exception {
                        AtomicInteger i= new AtomicInteger();
                        elements.forEach(v-> i.getAndIncrement());
                        out.collect(
                                new UrlViewCount(
                                        s+"====",
                                        // 获取迭代器中的元素个数  
 不能再使用iterable.spliterator().getExactSizeIfKnown(),否侧获取数据一一直为-1
                                        i.longValue(),
                                        context.window().getStart(),
                                        context.window().getEnd()
                                ));
                    } })
                .print();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1867080.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Websocket在Java中的实践——握手拦截器

在《Websocket在Java中的实践——最小可行案例》一文中&#xff0c;我们看到如何用最简单的方式实现Websocket通信。本文中&#xff0c;我们将介绍如何在握手前后进行干涉&#xff0c;以定制一些特殊需求。 在《Websocket在Java中的实践——最小可行案例》的基础上&#xff0c;…

工商业储能柜用的Acrel-2000ES储能能量管理系统-安科瑞 蒋静

概述 Acrel-2000ES储能能量管理系统&#xff0c;专门针对工商业储能柜、储能集装箱研发的一款储能EMS&#xff0c;具有完善的储能监控与管理功能,涵盖了储能系统设备(PCS、BMS、电表、消防、空调等)的详细信息&#xff0c;实现了数据采集、数据处理、数据存储、数据查询与分析…

数字化世界的守卫之防火墙

在这个数字化的时代&#xff0c;我们的电脑和手机就像是一座座繁华的城市&#xff0c;而病毒和黑客则是那些潜伏在暗处的敌人。但别担心&#xff0c;我们有一群忠诚的守卫——“防火墙”&#xff0c;它们日夜守护着我们的数字家园。 1. 病毒&#xff1a;数字世界的“瘟疫” 想象…

CNware虚拟化平台功能介绍:虚拟机业务连续性保护,确保核心业务持续运行,构筑稳健的数字防线

全球数字化转型的大潮中&#xff0c;虚拟化技术已成为企业IT架构的基石。据Gartner预测&#xff0c;到2026年&#xff0c;全球90%以上的组织将采用某种形式的虚拟化技术。虚拟化环境的广泛应用&#xff0c;不仅提升了资源利用率、降低了成本&#xff0c;更极大地增强了业务灵活…

PDF转成清晰长图

打开一个宝藏网址在线PDF转换器/处理工具 - 在线工具系列 点击图下所示位置 按照图下所示先上传文件&#xff0c;设置转换参数后点击转换&#xff0c;等待 等待转换完成后&#xff0c;可以在转换结果处选择下载地址&#xff0c;点击即可进行下载使用了。对比了其他几个网站的转…

在Python中使用类继承提高代码复用性使用详解

概要 在面向对象编程(OOP)中,继承是一种创建新类的方式,这些新类继承了一个或多个父类的特性。Python 中的继承允许我们定义保持代码通用性和简洁性的类层次结构。本文将详细探讨 Python 中的类继承机制,包括基本继承、多重继承、方法重写以及使用 super() 函数的技巧,并…

无线幅频仪制作(WiFi通信)-含STM32源程序,JAVA上位机与设计报告

资料下载地址&#xff1a;无线幅频仪制作(WiFi通信)-含STM32源程序,JAVA上位机与设计报告 目录 项目功能 1、 系统方案1.1 比较与选择 1.1.1 控制器的论证与选择 1.1.2 信号源的论证与选择 1.1.3 放大器模块的论证与选择 1.1.4 键盘与显示模块的论证与选择 1.1.5 网络通…

【Spring Cloud Alibaba AI】简单使用

本文基于官方文档。 Spring AI 官方文档&#xff1a;Spring AI :: Spring AI Reference 中文文档&#xff1a;Spring AI 简介 - spring 中文网 (springdoc.cn) Spring AI 是 Spring 官方社区项目&#xff0c;旨在简化 Java AI 应用程序开发&#xff0c;让 Java 开发者像使用…

MySQL进阶——锁

目录 1全局锁—一致性数据备份 1.1全局锁介绍 1.2语法 1.3 一致性备份案例 1.4 全局锁特点 2表级锁 2.1表锁 2.1.1共享读锁 2.1.2独占写锁 2.2元数据锁 2.3元数据锁 MySQL中的锁&#xff0c;按照锁的粒度分&#xff0c;分为以下三类&#xff1a; &#xff08;1&…

DigitalOcean Droplet 云主机新增内置第五代 Xeon CPU 机型

DigitalOcean 近期宣布&#xff0c;在其高级 CPU 服务器&#xff08;Premium CPU-Optimized Droplet&#xff09;队列中引入英特尔第五代Xeon可扩展处理器&#xff08;代号为 Emerald Rapids&#xff09;。作为英特尔产品线中的最新一代用于数据中心工作负载的处理器&#xff0…

香港办公室顺利落地,量子之歌发布白皮书开启银发新篇章

6月25日&#xff0c;量子之歌香港办公室开业典礼暨《2023年中国中老年服务市场白皮书&#xff1a;银发经济&#xff0c;耀眼的黄金赛道》发布会于香港中环交易广场隆重开幕。 这一里程碑事件不仅彰显了量子之歌在银发经济领域的行业领军者风范&#xff0c;更凸显了其在专业服务…

【服务器部署】Jenkins配置前端工程自动化部署

作者介绍&#xff1a;本人笔名姑苏老陈&#xff0c;从事JAVA开发工作十多年了&#xff0c;带过刚毕业的实习生&#xff0c;也带过技术团队。最近有个朋友的表弟&#xff0c;马上要大学毕业了&#xff0c;想从事JAVA开发工作&#xff0c;但不知道从何处入手。于是&#xff0c;产…

秋招Java后端开发冲刺——非关系型数据库篇(Redis)

一、非关系型数据库 1. 主要针对的是键值、文档以及图形类型数据存储。 2. 特点&#xff1a; 特点说明灵活的数据模型支持多种数据模型&#xff08;文档、键值、列族、图&#xff09;&#xff0c;无需预定义固定的表结构&#xff0c;能够处理各种类型的数据。高扩展性设计为水…

2024年6月26日 (周三) 叶子游戏新闻

老板键工具来唤去: 它可以为常用程序自定义快捷键&#xff0c;实现一键唤起、一键隐藏的 Windows 工具&#xff0c;并且支持窗口动态绑定快捷键&#xff08;无需设置自动实现&#xff09;。 土豆录屏: 免费、无录制时长限制、无水印的录屏软件 《Granblue Fantasy Versus: Risi…

使用方法——注意事项及好处

public class MethodDemo01 {public static void main(String[] args) {// 目标&#xff1a;掌握定义方法的完整性&#xff0c;清楚使用方法的好处。// 需求&#xff1a;假如现在有很多程序员都要进行2个整数求和的操作。//1、李工。int rs sun(10,20);System.out.println(&q…

web前端——VUE

1.什么是框架&#xff1f; ①概述 框架结构就是基本功能&#xff0c;把很多基础功能已经实现了、封装了。在基础语言之上&#xff0c;对各种基础功能进行封装&#xff0c;方便开发者&#xff0c;提高开发效率 ②前端框架 javaScript是原生的 vue.js&#xff1a; 是一个js框架&…

LLM AI工具和Delphi名称的起源

LLM AI工具和Delphi名称的起源 使用ChatGPT&#xff0c;直接或通过微软工具&#xff0c;以及其他基于llm的引擎。我很欣赏他们提供好的总结和比较的能力&#xff0c;并且还编写了一些样板代码。与此同时&#xff0c;当你问一些重要的问题时&#xff0c;你会得到一些令人惊讶的好…

Java初识集合(后续不断补充)

第一次更新时间&#xff1a;2024.6.26 集合概述 Java中的集合就像一个容器&#xff0c;专门用来存储Java对象&#xff08;实际上是对象的引用&#xff0c;但习惯称为对象&#xff09;&#xff0c;这些对象可以是任意的数据类型&#xff0c;并且长度可变。其中&#xff0c;这些…

GPOPS-II教程(3): 航天器最优控制问题

文章目录 问题描述GPOPS代码main functioncontinuous functionendpoint function完整代码代码仿真结果 最后 问题描述 例子出自论文 Direct solution of nonlinear optimal control problems using quasilinearization and Chebyshev polynomials&#xff08;DOI&#xff1a;1…

Interview preparation--elasticSearch倒排索引原理

搜索引擎应该具备哪些要求 查询速度快 优秀的索引结构设计高效率的压缩算法快速的编码和解码速度 结果准确 ElasiticSearch 中7.0 版本之后默认使用BM25 评分算法ElasticSearch 中 7.0 版本之前使用 TP-IDF算法 倒排索引原理 当我们有如下列表数据信息&#xff0c;并且系统…