详解 Flink 的时间语义和 watermark

news2024/11/27 18:31:09

一、Flink 时间语义类型

在这里插入图片描述

  • Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳
  • Ingestion Time :是数据进入 Flink 的时间
  • Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是 Processing Time

二、EventTime 引入

Flink 默认是按照 ProcessingTime 来处理数据的

/**
	在 Flink 的流式处理中,绝大部分情况推荐使用 eventTime,一般只在 eventTime 无法使用时,才会被迫使用 ProcessingTime 或者 Ing estionTime 。使用 EventTime ,需要先引入 EventTime 的时间属性
*/
public class EventTimeTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        //引入 EvenetTime
        //TimeCharacteristic 是一个枚举类,有 ProcessingTime、IngestionTime 和 EventTime 三个属性
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        
    }
}

三、Watermark

1. 数据乱序情况

在这里插入图片描述

  • 正常情况下,Flink 接收到的事件应该要是按照事件的产生时间 (EventTime) 的先后顺序排列的
  • 实际情况下,事件从产生到进入 source 再到触发 operator,其中间是有一个过程和时间的,而且由于网络、分布式等原因会造成 Flink 接收到的事件的先后顺序不是严格按照事件的 EventTime 顺序排列的,即所谓的乱序数据
  • 乱序数据的问题会造成窗口触发关闭的时间混乱,计算不准确
  • Flink 处理乱序数据的机制:Watermark + allowedLateness + sideOutputLateData

2. Watermark 介绍

  • Watermark 是一种使用延迟触发 window 执行来处理乱序数据的机制
  • 原理:当设置 Watermark = t 时 (即延迟时长为 t),则 Flink 每一次都会获取已经到达的数据中的最大的 EventTime,然后判断 maxEventTime - t 是否等于某一个窗口的触发时间,如果相等则认为属于这个窗口的所有数据都已经到达,这个窗口被触发执行关闭,也可能存在数据丢失
  • 在数据有序的流中,相当于 Watermark = 0,即已经到达的数据中的最大的 EventTime 等于某一个窗口的触发时间,则这个窗口被触发执行关闭
  • 一般将 Watermark 设置为乱序数据流中最大的迟到时间差

3. Watermark 特点和行为

  • 水位线 (Watermark) 是作为一个特殊的数据插入到数据流中的一个标记
  • 水位线 (Watermark) 在 Flink 程序中是一个常量类,有一个时间戳属性,用来表示当前事件时间的进展
  • 水位线 (Watermark) 是基于数据的 EventTime 时间戳生成的
  • 水位线 (Watermark) 的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进

4. Watermark 在任务间的传递

任务并行度不为 1;Watermark 设置的位置越靠近 Source 端越好

在这里插入图片描述

  • 一个任务会接收上游多个并行任务的数据,也会向下游多个并行任务发送数据
  • 从上游多个并行任务接收 Watermark:使用 Partition WM 分别存储接收到的不同分区任务的 Watermark,并以其中最小的 Watermark 作为自己当前的事件时间
  • 向下游多个并行任务发送 Watermark:采取广播的分区策略,向下游的每一个任务都发送一份 Watermark,如果后续 Watermark 没有变更则不会重复发送

5. Watermark 引入

5.1 核心代码
/**
	方法签名:
		DataStream.assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T>)
		DataStream.assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T>)
	
	参数:
		1.AssignerWithPeriodicWatermarks:继承 TimestampAssigner 接口,周期性的生成 watermark,常用实现类为:BoundedOutOfOrdernessTimestampExtractor 和 AscendingTimestampExtractor
		2.AssignerWithPunctuatedWatermarks:继承 TimestampAssigner 接口,间断式地生成 watermark
*/
public class WatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        //引入 EvenetTime       
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        
        DataStream<String> dataStream = env.socketTextStream("localhost", 7777);
        
        DataStream<SensorReading> inputStream = dataStream.map(new MapFunction<SensorReading>() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            }
        });
        
        //有序数据设置事件时间戳(毫秒数)和watermark
        //不需要传递watermark延迟时间,默认是当前事件时间戳 - 1ms 作为watermark
        inputStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {
            @Override
            public long extractAscendingTimestamp(SensorReading element) {
                return element.getTimestamp() * 1000L;
            }
        });
        
        //乱序数据设置事件时间戳(毫秒数)和watermark
        //BoundedOutOfOrdernessTimestampExtractor 构造方法必须传入watermark延迟时间
        //生成的watermark时间戳 = 当前所有事件的最大时间戳 - 延迟时间
        inputStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
            @Override
            public long extractTimestamp(SensorReading element) {
                return element.getTimestamp() * 1000L;
            }
        });
        
        env.execute();
        
    }
}
5.2 AssignerWithPeriodicWatermarks

系统会周期性地生成 watermark 并插入到数据流中,默认周期是 200 毫秒

/**
	设置watermark生成周期:env.getConfig.setAutoWatermarkInterval(milliseconds);
	产生watermark的逻辑:每隔 0.2 秒钟,Flink 会调用 AssignerWithPeriodicWatermarks 的 getCurrentWatermark() 方法获取一个时间戳,如果大于之前水位的时间戳,新的 watermark 会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于等于之前水位的时间戳,则不会产生新的 watermark
	自定义watermark周期生成器:实现 AssignerWithPeriodicWatermarks 接口,并重写 getCurrentWatermark 和 extractTimestamp 方法
*/
public class MyPeriodicAssigner implements AssignerWithPeriodicWatermarks<SensorReading> {
    private Long bound = 60 * 1000L;  // watermark延迟时间
    private Long maxTs = Long.MIN_VALUE;  // 当前最大时间戳
    
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
    	return new Watermark(maxTs - bound);
    }
    
    @Override
    public long extractTimestamp(SensorReading element, long previousElementTimestamp) {
        maxTs = Math.max(maxTs, element.getTimestamp()); //获取当前最大的事件时间戳
        return element.getTimestamp();
    }
}
5.3 AssignerWithPunctuatedWatermarks

间断式地生成 watermark,可以根据需要对每条数据进行条件判断筛选来确定是否生成 watermark

public class MyPunctuatedAssigner implements AssignerWithPunctuatedWatermarks<SensorReading> {
    private Long bound = 60 * 1000L;  // 延迟时间
    
    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(SensorReading lastElement, long extractedTimestamp) {
        if(lastElement.getId().equals("sensor_1")) {
        	return new Watermark(extractedTimestamp - bound);
        } else {
        	return null;
        }
    }
    
    @Override
    public long extractTimestamp(SensorReading element, long previousElementTimestamp) {
    	return element.getTimestamp();
    }
}

四、EventTime 的 window 操作

1. 滚动时间窗口操作

/**
	需求:统计 15 秒内的最小温度值,设置 2 秒的延迟
*/
public class TumblingEventTimeWindowTest {
 	public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        
        /*
          sensor_1,1547718199,35.8
          sensor_6,1547718201,15.4
          sensor_7,1547718202,6.7
          sensor_10,1547718205,38.1
          sensor_1,1547718207,36.3
          sensor_1,1547718209,32.8
          sensor_1,1547718212,37.1
          ...
        */
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
        
        DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<SensorReading>() {
            @Override
            public SensorReading map(String value) {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            }
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
            @Override
            public long extractTimestamp(SensorReading element) {
                return element.getTimestamp() * 1000L;
            }
        });
        
        //开窗聚合
       SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id").timeWindow(Time.seconds(15)).minBy("temperature");
        
        minTempStream.print("minTemp");
        
        /**
        	输出的结果分析:
        		1.在接收到 sensor_1,1547718212,37.1 时,触发了一个窗口关闭,此时数据的 EventTime 为 1547718212,由于 watermark 延迟时间设置为 2,所以该窗口触发关闭的时间戳为 1547718212 - 2 = 1547718210,该窗口的范围为 [1547718195,1547718210)
        		2.当前第一个窗口是 [1547718195,1547718210),其起始点的确定规则为:
        			2.1 滚动时间窗口使用的窗口分配器为 TumblingEventTimeWindows 类
        			2.2 TumblingEventTimeWindows 的 assignWindows 方法中调用 getWindowStartWithOffset 方法获取起始点
        			2.3 getWindowStartWithOffset(timestamp, offset, windowSize):方法逻辑为 timestamp - (timestamp - offset + windowSize) % windowSize,默认 offset 为 0,所以最终得到的起始点应该是 windowSize 的整数倍,在本例中的起始点为 1547718199 - (1547718199-0+15)%15 = 1547718195
        		3.偏移量 offset:一般是用来处理不同时区的数据
        */
        
        env.execute();
        
    }   
}

2. 迟到数据处理

/**
	需求:统计 15 秒内的最小温度值,设置 2 秒的延迟,并允许 1 分钟的迟到数据,1 分钟后的数据写入侧输出流
*/
public class TumblingEventTimeWindowDelayTest {
 	public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
        
        DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<SensorReading>() {
            @Override
            public SensorReading map(String value) {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            }
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
            @Override
            public long extractTimestamp(SensorReading element) {
                return element.getTimestamp() * 1000L;
            }
        });
        
        OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late"){};
        
        //开窗聚合
       SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
           .timeWindow(Time.seconds(15))
           .allowedLateness(Time.minutes(1));
           .sideOutputLateData(outputTag)
           .minBy("temperature");
        
        minTempStream.print("minTemp");
        minTempStream.getSideOutput(outputTag).print("late");
        
        /**
        	依次输入数据:
              sensor_1,1547718199,35.8
              sensor_1,1547718206,36.3
              sensor_1,1547718210,34.7
              sensor_1,1547718211,31
              sensor_1,1547718209,34.9
              sensor_1,1547718212,37.1
              sensor_1,1547718213,33
              sensor_1,1547718206,34.2
              sensor_1,1547718202,36
              ...
              sensor_1,1547718272,34
              sensor_1,1547718203,30.6
        
        	输出的结果分析:
        		1.在接收到 sensor_1,1547718212,37.1 时,触发 [1547718195,1547718210) 窗口执行,此时输出数据 sensor_1,1547718209,34.9,此时 2 秒内的延迟数据能被处理  
        		2.在接收到 sensor_1,1547718206,34.2 时,由于设置了允许 1 分钟迟到,所以 [1547718195,1547718210) 窗口仍然没有关闭,此时会更新数据为 sensor_1,1547718206,34.2,此时的系统时间戳为 1547718213 - 2 = 1547718211 - 1547718210 < 60
        		3.在接收到 sensor_1,1547718202,36 时,[1547718195,1547718210) 窗口仍然会更新输出一次数据 sensor_1,1547718206,34.2
        		4.在接收到 sensor_1,1547718272,34 时,属于 [1547718210,1547718225) 窗口的数据会输出 sensor_1,1547718211,31,此时的系统时间戳为 1547718272 - 2 = 1547718270,由于 1547718270 - 1547718210 >= 60,所以 [1547718195,1547718210) 窗口会真正的关闭
        		5.在之后接收到 sensor_1,1547718203,30.6 时,会把数据输出到侧输出流中
        */
        
        env.execute();
        
    }   
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1802831.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

✔️Vue基础+

✔️Vue基础 文章目录 ✔️Vue基础computed methods watchcomputed计算属性methods计算属性computed计算属性 VS methods方法计算属性的完整写法 watch侦听器&#xff08;监视器&#xff09;watch侦听器 Vue生命周期Vue生命周期钩子 工程化开发和脚手架脚手架Vue CLI 项目目录介…

基于Python的北京天气数据可视化分析

项目用到库 import numpy as np import pandas as pd import datetime from pyecharts.charts import Line from pyecharts.charts import Boxplot from pyecharts.charts import Pie,Grid from pyecharts import options as opts from pyecharts.charts import Calendar 1.2…

【数据结构】栈的应用

目录 0 引言 1 栈在括号匹配中的应用 2 栈在表达式求值中的应用 2.1 算数表达式 2.2 中缀表达式转后缀表达式 2.3 后缀表达式求值 3 栈在递归中的应用 3.1 栈在函数调用中的作用 3.2 栈在函数调用中的工作原理 4 总结 0 引言 栈&#xff08;Stack&#xff09;是一…

Python001

Python 是一种高级编程语言。它具有以下显著特点&#xff1a;1. 简单易学&#xff1a;语法相对简洁明了&#xff0c;对初学者很友好。2. 丰富的库&#xff1a;拥有大量强大的内置库和第三方库&#xff0c;可用于各种领域&#xff0c;如数据分析、机器学习、Web 开发等。3. 可读…

Python应用开发——30天学习Streamlit Python包进行APP的构建(5)

上几次我们已经将一些必备的内容进行了快速的梳理,让我们掌握了streanlit的凯快速上手,接下来我们将其它的一些基础函数再做简单的梳理,以顺便回顾我们未来可能用到的更丰富的函数来实现应用的制作。 st.write_stream 将生成器、迭代器或类似流的序列串流到应用程序中。 …

【网络编程开发】8.TCP连接管理与UDP协议 9.IP协议与ethernet协议

8.TCP连接管理与UDP协议 三次握手 三次握手的过程在TCP/IP网络通信中起着至关重要的作用&#xff0c;它不仅确保了数据的可靠传输&#xff0c;还为两端的数据传输提供了稳定的连接初始化过程。这一过程涉及到几个关键步骤&#xff0c;每个步骤都有其特定的目的和功能。 步骤&…

你可以直接和数据库对话了!DB-GPT 用LLM定义数据库下一代交互方式,数据库领域的GPT、开启数据3.0 时代

✨点击这里✨&#xff1a;&#x1f680;原文链接&#xff1a;&#xff08;更好排版、视频播放、社群交流、最新AI开源项目、AI工具分享都在这个公众号&#xff01;&#xff09; 你可以直接和数据库对话了&#xff01;DB-GPT 用LLM定义数据库下一代交互方式&#xff0c;数据库领…

前端开发高频面试题

好的&#xff0c;以下是对您提出的问题的详细回答&#xff1a; 说说vue动态权限绑定渲染列表&#xff08;权限列表渲染&#xff09; Vue中动态权限绑定渲染列表通常涉及以下步骤&#xff1a; 首先&#xff0c;通过API请求从服务器获取当前用户的权限数据。在Vue组件中&#xff…

Excel 生成所在月份的每一天列表

Excel 的 A2 格是日期 A1Fecha201/03/24 需要生成该日期所在月份的每一天的列表 A1WholeMonth201/03/24302/03/24403/03/24504/03/24605/03/24706/03/24807/03/24908/03/241009/03/241110/03/241211/03/241312/03/241413/03/241514/03/241615/03/241716/03/241817/03/241918…

【稳定检索/投稿优惠】2024年智慧金融与财务管理国际会议(SFFM 2024)

2024 International Conference on Smart Finance and Financial Management 2024年智慧金融与财务管理国际会议 【会议信息】 会议简称&#xff1a;SFFM 2024 截稿时间&#xff1a;以官网为准 大会地点&#xff1a;中国广州 会议官网&#xff1a;www.iacsffm.com 会议邮箱&am…

【python】OpenCV—Cartoonify and Portray

参考来自 使用PythonOpenCV将照片变成卡通照片 文章目录 1 卡通化codecv2.medianBlurcv2.adaptiveThresholdcv2.kmeanscv2.bilateralFilter 2 肖像画cv2.divide 1 卡通化 code import cv2 import numpy as npdef edge_mask(img, line_size, blur_value):gray cv2.cvtColor(…

idea2023如何创建普通maven工程项目

解决 1.创建新项目 1.进入创建项目 File -> new -> project 2&#xff0c;project 中有 build system 选择maven 2.在已有项目中创建普通maven工程 1.右键项目选择 new -> Module 2.选择 new Module 其实与新建maven工程没什么区别 em:问题 idea以前的版本是在Mav…

【一百一十】【算法分析与设计】[SDOI2009] HH的项链,树状数组应用,查询区间的种类数,树状数组查询区间种类数

P1972 [SDOI2009] HH的项链 [SDOI2009] HH的项链 题目描述 HH 有一串由各种漂亮的贝壳组成的项链。HH 相信不同的贝壳会带来好运&#xff0c;所以每次散步完后&#xff0c;他都会随意取出一段贝壳&#xff0c;思考它们所表达的含义。HH 不断地收集新的贝壳&#xff0c;因此&am…

第十二届蓝桥杯C++青少年组中/高级组选拔赛2020年11月22日真题解析

一、编程题 第1题&#xff1a;求和 【题目描述】 输入一个正整数 N(N < 100)&#xff0c;输出 1 到 N(包含 1 和 N)之间所有奇数的和。 【输入描述】 输入一个正整数 N(N < 100) 【输出描述】 输出 1 到 N 之间的所有奇数的和 【输入样例】 3【输出样例】 4答案&…

Llama模型家族之拒绝抽样(Rejection Sampling)(九) 强化学习之Rejection Sampling

LlaMA 3 系列博客 基于 LlaMA 3 LangGraph 在windows本地部署大模型 &#xff08;一&#xff09; 基于 LlaMA 3 LangGraph 在windows本地部署大模型 &#xff08;二&#xff09; 基于 LlaMA 3 LangGraph 在windows本地部署大模型 &#xff08;三&#xff09; 基于 LlaMA…

利用streamlit结合langchain_aws实现claud3的页面交互

测试使用的代码如下 import streamlit as st from langchain_aws import ChatBedrockdef chat_with_model(prompt, model_id):llm ChatBedrock(credentials_profile_name"default", model_idmodel_id, region_name"us-east-1")res llm.invoke(prompt)re…

UiPath发送邮件给多人时需要注意哪些限制?

UiPath发送邮件给多人的步骤&#xff1f;如何使用UiPath发信&#xff1f; 尽管UiPath提供了强大的邮件发送功能&#xff0c;但在批量发送邮件时&#xff0c;有一些限制和注意事项是我们必须了解的。AokSend将详细介绍这些限制&#xff0c;并提供一些优化建议。 UiPath发送邮件…

视频监控管理平台LntonCVS视频汇聚平台充电桩视频监控应用方案

随着新能源汽车的广泛使用&#xff0c;公众对充电设施的安全性和可靠性日益重视。为了提高充电桩的安全管理和站点运营效率&#xff0c;LntonCVS公司推出了一套全面的新能源汽车充电桩视频监控与管理解决方案。 该方案通过安装高分辨率摄像头&#xff0c;对充电桩及其周边区域进…

纷享销客安全体系:安全合规认证

安全合规认证是指组织通过独立的第三方机构对其信息系统和数据进行评估和审查&#xff0c;以确认其符合相关的安全标准、法律法规和行业要求的过程。 安全合规认证可以帮助组织提高信息系统和数据的安全性&#xff0c;并向客户、合作伙伴和监管机构证明其符合相关的安全标准和…

python协程入门实战详解

本章将以通俗易懂、贴合实际的方式介绍以下内容&#xff1a; 协程是什么&#xff0c;有什么特点&#xff0c;协程的优势是什么如何理解事件和事件循环协程的创建方式&#xff0c;如何控制协程的并发量在协程中使用aiohttp发送HTTP请求aiohttp案例协程中的异常处理&#xff0c;…