Flink时间属性

news2025/1/23 7:23:26

1.概述

Flink支持三种与流数据处理相关的时间概念:Processing Time、Event Time和Ingestion Time。具体如下图所示:

当前Flink仅支持Processing Time和Event Time

  • EventTime:您提供的事件时间(通常是数据的最原始的创建时间)。
  • Processing Time(Proctime):系统对事件进行处理的本地系统时间,单位为毫秒。

2.类型详解

2.1 处理时间(Processing Time)

Processing Time是指正在执行相应操作的机器的系统时间,即物理现实时间。当一个实时计算依赖ProcTIme时间列运行时,所有基于时间的操作(如Window窗口)将使用运行实时计算机器的系统时钟。若Window窗口函数基于ProcTime,且开窗间隔为1 小时,则Flink会自动将任务启动时间划分在某一整点区间内,而非从启动时间开始间隔一小时进行开窗操作。

例如,如果实时计算任务设定开窗间隔为1小时且在9:15am开始运行,则第一个Window窗口将包括在9:15 am和10:00 am之间处理的事件(自动将任务划分在9~10点这一整点区间),下一个窗口将包括在10:00 am和11:00 am之间处理的事件,依此类推。

基于Processing Time 时间概念,Flink 的程序性能相对较高,延迟也比较低,对接入到系统中的数据时间相关的计算完全交给算子内部决定。虽然性能和易用性上有优势,但在处理数据乱序时,Processing Time 不是最优的选择,数据本身不乱序,如果每台机器本身的时钟不同步也会导致数据处理过程中出现数据乱序,Processing Time 适用于时间计算精度不是特别高的计算场景。

2.2 事件时间(Event Time)

事件时间是每个独立事件在产生它的设备上发生的时间,这个时间在事件进入Flink之前就已经嵌入到事件中,时间顺序取决于事件产生的地方,和下游数据处理系统的时间无关。

Event Time也称为Row Time。EventTime时间属性必须在源表DDL中声明,可以将源表中的某一字段声明成Event Time。目前只支持将 TIMESTAMP 类型(将来会支持LONG类型)声明成RowTime字段。如果源表中需要声明为Event Time的列不是 TIMESTAMP 类型,需要借助计算列,基于现有列构造出一个TIMESTAMP 类型的列。

由于数据本身的乱序、网络的抖动(网络堵塞导致的数据传输延迟的变化)或者其它原因,导致了数据到达的顺序和被处理的顺序,可能是不一致的(乱序)。因此定义一个RowTime字段,需要明文定义一个Watermark计算方法。

2.3 接入时间(Ingestion Time)

接入时间是数据进入Flink系统的时间,接入时间依赖Source Operator 所在主机的系统时钟。因为接入时间在数据接入过程生成后,时间戳不再发生变化,和后续处理数据的Operator所在机器的时钟没有关系,所以不会因为某台机器时钟不同步或网络延迟而导致计算结果不准确的问题。相比于Event Time,Ingestion Time 不能处理乱序事件,因此不用生成对应的Watermarks。

当前Flink暂不支持接入时间,因此仅理解概念即可。

3.窗口函数示例

3.1 处理时间(Processing Time)

CREATE TABLE mq_stream (
    a VARCHAR,
    b VARCHAR,
    c BIGINT,
    d AS PROCTIME() --在数据源表的声明中明文定义一个Processing Time列
) WITH (
    type = 'mq',
    topic = '<yourTopic>',
    accessId = '<yourAccessId>',
    accessKey = '<yourAccessSecret>'
);
CREATE TABLE rds_output (
    id VARCHAR,
    c TIMESTAMP,
    f TIMESTAMP,
    cnt BIGINT
) with (
    type = 'rds',
    url = '<yourDatebaseURL>',
    tableName = '<yourDatabasTableName>',
    userName = '<yourUserName>',
    password = '<yourPassword>'
);

INSERT INTO rds_output
SELECT 
    a AS id,
    SESSION_START(d, INTERVAL '1' SECOND) AS c,
    SESSION_END(d, INTERVAL '1' SECOND) AS f,
    COUNT(a) AS cnt
FROM mq_stream
GROUP BY SESSION(d, INTERVAL '1' SECOND), a

3.2 事件时间(Event Time)

CREATE TABLE FullLinkTest(
    after_id int AS id,
    after_userid varchar AS userid,
    after_username varchar AS username,
    after_prodid varchar AS prodid,
    after_price double AS price,
    after_amount int AS amount,
    after_discount double AS discount,
    after_tm bigint AS tm,
    WATERMARK FOR tm AS withOffset(tm,30000) --Watermark计算方法。
)WITH(
    type ='kafka11',
    bootstrapServers ='<yourbootstrapServers>',
    zookeeperQuorum ='<yourzookeeperQuorum>',
    offsetReset ='latest',
    topic ='<yourtopicname>',
    timezone='<yourtimezone>',
    topicIsPattern ='false',
    parallelism ='1'
);

CREATE TABLE totalSales(
    totalSales DOUBLE,
    tms TIMESTAMP,
    tme TIMESTAMP
)WITH(
    type ='mysql',
    url ='<yourmysqlurl>',
    userName ='<youruserName>',
    password ='<yourpassword>',
    tableName ='<yourtableName>',
    parallelism ='1'
);

insert into totalSales
select
    sum(price * amount * discount) as totalSales,
    TIMESTAMPADD(HOUR,8,TUMBLE_START( ROWTIME,INTERVAL '10' SECOND)) as tms,
    TIMESTAMPADD(HOUR,8,TUMBLE_END (ROWTIME,INTERVAL '10' SECOND)) as tme
from FullLinkTest
group by TUMBLE( ROWTIME,INTERVAL '10' SECOND);

4.EventTime和Processing Time比较

相较于Event Time,Processing Time有如下特点:

  • 简单易行,不用考虑实时计算任务和机器之间的延迟问题
  • 高性能,低延迟

EventTime通常需要在源数据中指定业务时间字段,而Processing Time不需要。

所以,通常使用Processing Time进行处理。需要特殊指定某个业务字段作为时间字段的场景,则使用EventTime。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/440098.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MongoDB 之 updateMany

MongoDB 之 updateMany 使用 https://www.mongodb.com/docs/v6.0/reference/operator/update/#update-operators db.collection.updateMany(<filter>,<update>,{upsert: <boolean>,writeConcern: <document>,collation: <document>,arrayFilters…

NIFI从MySql中离线读取数据再导入到MySql中_03_来吧用NIFI实现_数据分页获取功能---大数据之Nifi工作笔记0038

之前使用querydatabasetable处理器来获取mysql中的数据,我们只能写死一个sql的查询语句,但是 实际引用环境中,我们的一张mysql的表,可能有上千万的数据,那么,不可能,我们把sql查询语句写死,这样一次性如果获取所有数据,那么压力太大了,我们怎么弄呢?找了很久没有找到相关教程…

基于卷积神经网络的分类算法

基于卷积神经网络的分类算法 基于卷积神经网络的分类算法运行环境Python环境PyTorch环境Django环境数据预处理 基于卷积神经网络的分类算法 应用机器学习模型采用卷积神经网络&#xff0c;部署在Web环境中&#xff0c;通过Fashion-MNIST数据集进行模型训练和改进&#xff0c;实…

Scrum敏捷研发和项目管理

Scrum是全球运用最广泛的敏捷管理框架&#xff0c;Leangoo基于Scrum框架提供了一系列的流程和模板&#xff0c;可以帮助敏捷团队快速启动Scrum敏捷开发。 Leangoo完美支持Scrum敏捷框架&#xff0c;它提供了灵活的敏捷模板和极致的协作体验&#xff0c;可以让团队快速上手&am…

勒索病毒-特洛伊木马变种

​一、病毒简介 文件名称&#xff1a; 457d9e4773f45954449ee5913d068fdbb3d8e5689019688e7bce901467e5473a 文件类型(Magic)&#xff1a; PE32 executable (GUI) Intel 80386, for MS Windows, UPX compressed 文件大小&#xff1a; 410.00KB SHA256&#xff1a; 457d9e4773f…

从界面设计谈系统的贯穿性

系统的贯穿性&#xff1f; 在日常的开发中。单个模块之间的编码和设计起来相对比较简单。但是作为“软件工程”中的一个环节&#xff0c;系统的贯穿性往往被忽视。 现在系统关于贯穿性存在的问题 开发过程往往按照模块划分&#xff0c;分为不同的人开发。针对开发的者来说&…

【UE】将存档的值显示在控件蓝图上

上一篇博客&#xff08;【UE】保存游戏的demo&#xff09;已经实现了存档功能&#xff0c;本篇博客介绍的是如何将存档的值显示在控件蓝图上。 效果 可以看到我们存档的值显示在文本控件上 步骤 1. 新建一个蓝图类&#xff0c;父类为“HUD” 命名为“NewHudClassBP” 2. 在世…

Linux设备驱动开发 - 虚拟时钟Clock驱动示例

By: fulinux E-mail: fulinuxsina.com Blog: https://blog.csdn.net/fulinus 喜欢的盆友欢迎点赞和订阅&#xff01; 你的喜欢就是我写作的动力&#xff01; 目录 1. 概述2. virtual clock设计3. 虚拟时钟驱动3.1. provider驱动3.1.1. provider platform device部分3.1.2. prov…

645. 错误的集合|||697. 数组的度|||448. 找到所有数组中消失的数字

645. 错误的集合 题目 集合 s 包含从 1 到 n 的整数。不幸的是&#xff0c;因为数据错误&#xff0c;导致集合里面某一个数字复制了成了集合里面的另外一个数字的值&#xff0c;导致集合 丢失了一个数字 并且 有一个数字重复 。 给定一个数组 nums 代表了集合 S 发生错误后的…

JVM系列(七) JVM 垃圾收集器

我们知道JVM会回收垃圾,但是每种垃圾收集器的收集机制和收集的方法都不一样,今天我们讨论下几种垃圾回收机制 1.按照垃圾区域划分垃圾收集器 我们可以按照垃圾存在的区域来划分垃圾收集器,垃圾在堆内的区域分为 新生代垃圾老年代垃圾新生代老年代混合垃圾 按照这三种区域类…

一个Linux驱动工程师必知的内核模块知识

最简单的驱动 #include <linux/init.h> #include <linux/kernel.h> #include <linux/module.h>static int __init my_init(void) {printk("my_init\n");return 0; }static void __exit my_exit(void) {printk("my_exit\n"); }module_in…

数据结构总结——Java

1 链表(Linked List) 1.1 单项链表(Singly Linked List) 1.1.1 图例 1.1.2 Java实现 public class ListNode {// 保存值int val;// 保存指针ListNode next;// 构造函数们public ListNode() {}public ListNode(int val) {this.val val;}public ListNode(int val, ListNode n…

Linux基础命令-scp远程复制文件

Linux基础命令-seq打印数字序列 前言 有时候不可避免的需要将文件复制到另外一台服务器上&#xff0c;那么这时就可以使用scp命令远程拷贝文件&#xff0c;scp命令是基于SSH协议&#xff0c;在复制的过程中数据都是加密过的&#xff0c;会比明文传输更为安全。 一.命令介绍 …

Vue ElementUI Axios 前后端案例(day02) 之 ElementUI

ElementUI Element&#xff0c;一套为开发者、设计师和产品经理准备的基于 Vue 2.0 的桌面端组件库 组件 1.Layout 布局 通过基础的 24 分栏&#xff0c;迅速简便地创建布局。 就是这样分了24个格子 基础布局 使用单一分栏创建基础的栅格布局。 通过 row 和 col 组件&…

keepalived+nginx安装

欢迎使用ShowDoc&#xff01; 1、安装基础包&#xff1a; yum -y install libnl libnl-devel 2、上传包&#xff1a; tar -zxvf keepalived-2.0.20.tar.gz -C /data/imas/base_soft mkdir -p /data/imas/base_soft/keepalived cd /data/imas/base_soft/keepalived-2.0.20 .…

基于Netty开发IM即时通讯之群聊功能

本篇涉及的群聊核心功能&#xff0c;大致如下所示&#xff1a; 1&#xff09;登录&#xff1a;每个客户端连接服务端的时候&#xff0c;都需要输入自己的账号信息&#xff0c;以便和连接通道进行绑定&#xff1b;2&#xff09;创建群组&#xff1a;输入群组 ID 和群组名称进行…

【云原生进阶之容器】第六章容器网络6.5.2--Calico网络架构详述

《云原生进阶之容器》专题索引: 第一章Docker核心技术1.1节——Docker综述第一章Docker核心技术1.2节——Linux容器LXC第一章Docker核心技术1.3节——命名空间Namespace第一章Docker核心技术1.4节——chroot技术第一章Docker核心技术1.5.1节——cgroup综述

从 Dev 和 Ops 视角出发,聊聊 DevSecOps 的 What / Why / How

近日&#xff0c;极小狐和 TA 的朋友们相聚上海&#xff0c;开展了一场技术 Meetup&#xff0c;从 DevSecOps 的 What、Why、How 出发&#xff0c;通过分享真实应用案例&#xff0c;与参会者交流 DevSecOps 的实践过程和落地经验。 本文整理自极狐(GitLab) 资深云原生架构师郭旭…

爬虫日常-selenium登录12306,绕过验证

文章目录 前言代码设计 前言 hello兄弟们&#xff0c;这里是无聊的网友。愉快的周末过去了&#xff0c;欢迎回到学习频道。书接上文&#xff0c;我们说到了再用selenium登录12306时遇到了滑块验证的问题。当前的网站几乎每家都会在登录模块添加一个认证&#xff0c;来规避各种…

js 同步与异步

一、js 执行机制 JavaScript语言的一大特点就是单线程&#xff0c;即&#xff08;同一时间只能做一件事情&#xff09;。因为JavaScript是为了处理页面中用户的交互&#xff0c;以及操作DOM而诞生的。比如对某个DOM元素进行添加和删除操作。不能同时进行&#xff0c;应该先进行…