Flink 学习五 Flink 时间语义

news2025/1/1 21:58:51

Flink 学习五 Flink 时间语义

1.时间语义

在流式计算中.时间是一个影响计算结果非常重要的因素! (窗口函数,定时器等)

在这里插入图片描述

Flink 可以根据不同的时间概念处理数据。

  • 处理时间: process time System.currentTimeMillis()是指执行相应操作的机器系统时间(也称为纪元时间,例如 Java 的时间)。是现实世界的时间,时间是单调递增的
  • 事件时间: event time 是指根据附加到每一行的时间戳处理流式数据。时间戳可以对事件发生的时间进行编码。是业务数据中的时间,时间有可能停滞,但是不会回溯,时间是不可会退的;

2.时间语义API设置

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  //事件时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 处理时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); // 算子到达时间  其实就是处理时间

上述代码在1.12 版本之前, 已TimeCharacteristic.ProcessingTime 作为默认的时间语义,也可以用上述代码设置时间语义;

1.12 版本之后,flink 以 TimeCharacteristic.EventTime 作为时间语义 ,并且Deprecated上面的代码 在使用需要指定时间语义的API 时,在显示的指定对应的时间语义;

        keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(2)));
        keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2)));

        keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));
        keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(2)));

也可以禁用 event time 语义

    /**
     * Sets the interval of the automatic watermark emission. Watermarks are used throughout the
     * streaming system to keep track of the progress of time. They are used, for example, for time
     * based windowing.
     *
     * <p>Setting an interval of {@code 0} will disable periodic watermark emission.
     *
     * @param interval The interval between watermarks in milliseconds.
     */
    @PublicEvolving
    public ExecutionConfig setAutoWatermarkInterval(long interval) {
        Preconditions.checkArgument(interval >= 0, "Auto watermark interval must not be negative.");
        this.autoWatermarkInterval = interval;
        return this;
    }

3.watermark 简介

上面说到1.12 版本之后,flink 以 TimeCharacteristic.EventTime 作为时间语义,Flink 收到数据中的事件时间有可能不是有序的,这就导致会收到迟到的数据,其事件时间是属于过去的窗口;

为了能够基于事件时间进行计算,Flink引入了Watermark的概念

  • watermark ,本质上也是flink各个算子之间流转的一种标记数据,flink内部自动产生并插入到数据流里面的
  • 消息流信息中就是时间戳

watermark 的产生源头:

  • watermark 一般是 来源于source 算子
  • source 算子计算出来的watermark 广播到下面

4.watermark 状态

4.1.初始watermark

一般是来源于source 算子

在watermark 产生的源头算子中,subTask 程序会用一个定时器,去周期性的检查收到的数据的时间的最大值,如果超过了之前记录的最大值,就把这个最大值更新为watermark,并下游算子广播(通过API设置数据中那个字段作为事件时间)

在这里插入图片描述

4.2.下游/中间算子的watermark

中间算子收到上游算子广播的watermark ,其算子内部也会有一个定时器去定时的检测收到的所有的上游算子的watermark ,并计算其中最小值作为当前算子的watermark,并下游算子广播

:当其中一个所有算子,不在更新watermark 怎么处理? flink 提供一个机制设置watermark的idletime,意思就是如果在idletime时间内没有收到上游算子广播的watermark,则会自动的往前面推进watermark

在这里插入图片描述

5.watermark生成策略

5.1 生成watermark时机

Flink 1.12 版本之后,watermark 的生产策略是固定频率周期性的产生

  • AssignerWithPeriodicWatermarks 周期性生成
  • AssignerWithPunctuatedWatermarks 指定标志生成,比如数据中的某个属性

5.1 生成watermark的数值

新版API watermark生成策略

  • 紧跟最大事件时间(watermark=周期内最大时间) :WatermarkStrategy.forMonotonousTimestamps();
  • 允许乱序(watermark=周期内最大时间-容错时间) :WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5));
  • 自定义生成策略:WatermarkStrategy.forGenerator()
  • 不生产watermark 禁用了时间推进:WatermarkStrategy.noWatermarks()

6.watermark 示例

简单的测试并发度为1下的算子watermark更新情况


public class _01_Watermark {

    public static void main(String[] args) throws Exception {

        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStreamSource = env.socketTextStream("192.168.141.141", 9000);
        WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy.<String>forMonotonousTimestamps( )
                .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
                        String s = element.split(",")[0];
                        long time = 0;
                        try {
                            time = simpleDateFormat.parse(s).getTime(); //解析字符串值转long 作为时间戳
                        } catch (ParseException e) {
                        }
                        return time;
                    }
                });
        SingleOutputStreamOperator<String> streamOperator = dataStreamSource.assignTimestampsAndWatermarks(watermarkStrategy);
        SingleOutputStreamOperator<String> processed = streamOperator.process(new ProcessFunction<String, String>() {
            @Override
            public void processElement(String value, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception {
                long currentWatermark = ctx.timerService().currentWatermark();
                long processingTime = ctx.timerService().currentProcessingTime();
                System.out.println("currentWatermark:" + simpleDateFormat.format(new Date(currentWatermark)));
                System.out.println("currentWatermark:" + currentWatermark); //打印watermark
                System.out.println("processingTime:" + simpleDateFormat.format(new Date(processingTime)));
                out.collect(value);
            }
        });
        processed.print();
        env.execute ();

    }
}

7.浅析watermark 源码

还是以上面示例为例讲解

7.1 准备

主要是 WatermarkStrategy.forMonotonousTimestamps( )和 WatermarkStrategy.forBoundedOutOfOrderness

两者对应的类是都是一样的 均为BoundedOutOfOrdernessWatermarks,只是 WatermarkStrategy.forMonotonousTimestamps( )对应BoundedOutOfOrdernessWatermarks中的参数 outOfOrdernessMillis = 0,后续主要是看BoundedOutOfOrdernessWatermarks类即可

@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

    //最大时间戳 
    private long maxTimestamp;

    //可以延迟的毫秒值 
    private final long outOfOrdernessMillis;

    /**设置初始水位
     */
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }

    // ------------------------------------------------------------------------

    //触发水位的更新
    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }

    //周期的触发,
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/668355.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

优化|如何减小噪声和误差对梯度下降法的影响

编者按&#xff1a; ​ 许多精确算法在理论上能保证我们的目标函数值一直下降。在随机梯度下降以及无导数优化等情况下&#xff0c;目标移动方向受到噪声干扰&#xff0c;与实际下降方向往往会存在偏差。本文将分析噪声和下降偏差对于梯度下降法等算法的影响&#xff0c;并且介…

SpringMVC08:拦截器+文件下载

目录 一、概述 二、自定义拦截器 1、新建一个Moudule&#xff0c;SpringMVC-07-Interceptor&#xff0c;添加web支持&#xff1b; 2、配置web.xml和springmvc-servlet.xml文件 3、编写一个拦截器 4、在springmvc的配置文件中配置拦截器 5、编写一个Controller&#xff0…

【数据库】Mysq备份与恢复

文章目录 一、数据库备份的分类1. 数据备份的重要性2. 数据库备份的分类3. 常见的备份方法 二、Mysql 完全备份与恢复1. Mysql 完全备份2. 数据库完全备份分类2.1 物理冷备份及恢复2.2 mysqldump 备份数据库完全备份一个或多个完整的库&#xff08;包括其中所有的表&#xff09…

基于YOLOv5实现安全帽检测识别

目录 1、作者介绍2、YOLOv5网络模型2.1 算法简介2.2 数据集介绍2.2.1 VOC数据集准备2.2.2 YOLOv5算法检测流程 3、代码实现3.1 数据集划分部分代码3.2 训练阶段3.3 测试阶段3.4 检测结果 4、问题与分析参考链接 1、作者介绍 陈梦丹&#xff0c;女&#xff0c;西安工程大学电子…

【6.20】sleep()和wait()的区别

sleep()和wait()的区别 1、wait()方法 1.1使用场景 当某个线程获取到锁后&#xff0c;却还是不满足执行的条件&#xff0c;就可以调用对象锁的wait方法&#xff0c;进入等待状态。 直到外在条件满足了&#xff0c;就可以由其它线程调用notify或者notifyAll方法&#xff0c;…

在软件研发排期中要求“倒推时间”,项目结束后悲剧了……

有没有遇到某个项目任务的研发周期已被各路boss定下&#xff0c;研发团队都觉得时间不合理&#xff0c;反馈给上级无果&#xff0c;而要求“倒推时间”进行任务排期的情况&#xff1f; 什么是“倒推时间”&#xff1f; 目标倒推法&#xff0c;从剩下的时间反推算出每天该做的事…

【Java】死锁问题及ThreadLocal

什么是死锁分析过程发生死锁的原因避免死锁ThreadLocal 什么是死锁 多个线程同时被阻塞&#xff0c;它们中的一个或者全部都在等待某个资源被释放。由于线程被无限期地阻塞&#xff0c;因此程序不可能正常终止。这是一个最严重的BUG之一。 分析过程 1.一个线程一把锁 一个线…

深入理解TDD(测试驱动开发):提升代码质量的利器

在日常的软件开发工作中&#xff0c;我们常常会遇到这样的问题&#xff1a;如何在繁忙的项目进度中&#xff0c;保证我们的代码质量&#xff1f;如何在不断的迭代更新中&#xff0c;避免引入新的错误&#xff1f;对此&#xff0c;有一种有效的开发方式能帮助我们解决这些问题&a…

14.处理大数据集

14.1 随机梯度下降 假设你正在使用梯度下降来训练一个线性回归模型 当m个样本的m很大时&#xff0c;求和计算量太大了。这种梯度下降算法有另外一个名字叫做批量梯度下降&#xff08;batch gradient desent&#xff09;。这种算法每次迭代需要使用全量训练集&#xff0c;直到算…

【代码阅读软件】Source Insight 4 使用教程 | 很详细——适合新手

目录 一、概述二、常用的几个窗口&#x1f449;2.1 符号窗口&#xff08;Symbol Window&#xff09;&#x1f449;2.2 项目文件窗口&#xff08;Project Window&#xff09;&#x1f449;2.3 关系窗口&#xff08;Relation Window&#xff09;&#x1f449;2.4 上下文窗口&…

STM32--基于固件库(Library Faction)的led灯点亮

目录 一、STM32芯片的简单介绍 二、基于固件库&#xff08;Library Faction&#xff09;的led灯点亮 这是一个学习stm32的开端&#xff0c;我们由简入难&#xff0c;之前学过C51/52或是其他型号的一般都是从led开始&#xff0c;也就是简单的输入输出端口的应用。&#xff08;想…

SpringBoot整合模板引擎Thymeleaf(1)

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl Thymeleaf概述 Thymeleaf是一种用于Web和独立环境的现代服务器端的Java模板引擎&#xff0c;主要目标是将优雅的自然模板带到开发工作流程中&#xff0c;并将HTML在浏览器中…

【kubernetes】Etcd集群部署与验证

前言:二进制部署kubernetes集群在企业应用中扮演着非常重要的角色。无论是集群升级,还是证书设置有效期都非常方便,也是从事云原生相关工作从入门到精通不得不迈过的坎。通过本系列文章,你将从虚拟机准备开始,到使用二进制方式从零到一搭建起安全稳定的高可用kubernetes集…

吐血整理,性能测试Jmeter分布式压测遇坑总结+解决

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 为什么要使用分布…

JSON.parse() 全面用法介绍

JSON 通常用于与服务端交换数据。在接收服务器数据时一般是字符串。我们可以使用 JSON.parse() 方法将数据转换为 JavaScript 对象。 语法 JSON.parse(text[, reviver]) text:必需&#xff0c; 一个有效的 JSON 字符串。 reviver: 可选&#xff0c;一个转换结果的函数&#xf…

SPI协议解析

SPI协议介绍 引言介绍SPI简介物理层协议层通讯的起始和停止信号SPI 模式 优缺点优点缺点 使用例程基于STM32的SPI通信准备硬件连接 软件实现 总结 引言 SPI是串行外设接口的缩写&#xff0c;是一种高速的&#xff0c;全双工&#xff0c;同步的通信总线。由于SPI高速和同步的特…

vite环境变量与模式

环境变量 Vite 在一个特殊的 import.meta.env 对象上暴露环境变量。这里有一些在所有情况下都可以使用的内建变量&#xff1a; import.meta.env.MODE: {string} 应用运行的模式。 import.meta.env.BASE_URL: {string} 部署应用时的基本 URL。他由base 配置项决定。 import.m…

【ESP8266】使用MQTT协议 连接华为云iotDA,实现设备属性上报

相关资料&#xff1a;https://github.com/CQUPTLei/ESP8266 往期文章&#xff1a;【ESP8266】基础AT指令和常用WIF指令 【MQTT 5.0】协议 ——发布订阅模式、Qos、keepalive、连接认证、消息结构 一、华为云iotDA1.1 什么是iotDA1.2 创建 iotDA 产品 二、使用ESP8266上报设备…

【杂谈理解】STM32F10X标准库工程模板

前言 基于STM官网的STM32F10x标准外设库V3.6.0版本&#xff0c;文件的操作流程是参考江科大的。记录下此文方便学习和回忆。文章后也会放置完整的工程文件和意法官网下载STM32F10x标准外设库的压缩包。 流程 到意法官网下载STM32F10x标准外设库的压缩包。先找到压缩包的地址&a…

CMake详解

file文件操作 cmake的file命令_cmake file_物随心转的博客-CSDN博客 set指令 CMake中的set指令详解_cmake set_guanguanboy的博客-CSDN博客 include_directories指令 Cmake命令之include_directories介绍 - 简书 add_subdirectory Cmake命令之add_subdirectory介绍 - 简书…