1.概述
Flink支持三种与流数据处理相关的时间概念:Processing Time、Event Time和Ingestion Time。具体如下图所示:
当前Flink仅支持Processing Time和Event Time
- EventTime:您提供的事件时间(通常是数据的最原始的创建时间)。
- Processing Time(Proctime):系统对事件进行处理的本地系统时间,单位为毫秒。
2.类型详解
2.1 处理时间(Processing Time)
Processing Time是指正在执行相应操作的机器的系统时间,即物理现实时间。当一个实时计算依赖ProcTIme时间列运行时,所有基于时间的操作(如Window窗口)将使用运行实时计算机器的系统时钟。若Window窗口函数基于ProcTime,且开窗间隔为1 小时,则Flink会自动将任务启动时间划分在某一整点区间内,而非从启动时间开始间隔一小时进行开窗操作。
例如,如果实时计算任务设定开窗间隔为1小时且在9:15am开始运行,则第一个Window窗口将包括在9:15 am和10:00 am之间处理的事件(自动将任务划分在9~10点这一整点区间),下一个窗口将包括在10:00 am和11:00 am之间处理的事件,依此类推。
基于Processing Time 时间概念,Flink 的程序性能相对较高,延迟也比较低,对接入到系统中的数据时间相关的计算完全交给算子内部决定。虽然性能和易用性上有优势,但在处理数据乱序时,Processing Time 不是最优的选择,数据本身不乱序,如果每台机器本身的时钟不同步也会导致数据处理过程中出现数据乱序,Processing Time 适用于时间计算精度不是特别高的计算场景。
2.2 事件时间(Event Time)
事件时间是每个独立事件在产生它的设备上发生的时间,这个时间在事件进入Flink之前就已经嵌入到事件中,时间顺序取决于事件产生的地方,和下游数据处理系统的时间无关。
Event Time也称为Row Time。EventTime时间属性必须在源表DDL中声明,可以将源表中的某一字段声明成Event Time。目前只支持将 TIMESTAMP 类型(将来会支持LONG类型)声明成RowTime字段。如果源表中需要声明为Event Time的列不是 TIMESTAMP 类型,需要借助计算列,基于现有列构造出一个TIMESTAMP 类型的列。
由于数据本身的乱序、网络的抖动(网络堵塞导致的数据传输延迟的变化)或者其它原因,导致了数据到达的顺序和被处理的顺序,可能是不一致的(乱序)。因此定义一个RowTime字段,需要明文定义一个Watermark计算方法。
2.3 接入时间(Ingestion Time)
接入时间是数据进入Flink系统的时间,接入时间依赖Source Operator 所在主机的系统时钟。因为接入时间在数据接入过程生成后,时间戳不再发生变化,和后续处理数据的Operator所在机器的时钟没有关系,所以不会因为某台机器时钟不同步或网络延迟而导致计算结果不准确的问题。相比于Event Time,Ingestion Time 不能处理乱序事件,因此不用生成对应的Watermarks。
当前Flink暂不支持接入时间,因此仅理解概念即可。
3.窗口函数示例
3.1 处理时间(Processing Time)
CREATE TABLE mq_stream (
a VARCHAR,
b VARCHAR,
c BIGINT,
d AS PROCTIME() --在数据源表的声明中明文定义一个Processing Time列
) WITH (
type = 'mq',
topic = '<yourTopic>',
accessId = '<yourAccessId>',
accessKey = '<yourAccessSecret>'
);
CREATE TABLE rds_output (
id VARCHAR,
c TIMESTAMP,
f TIMESTAMP,
cnt BIGINT
) with (
type = 'rds',
url = '<yourDatebaseURL>',
tableName = '<yourDatabasTableName>',
userName = '<yourUserName>',
password = '<yourPassword>'
);
INSERT INTO rds_output
SELECT
a AS id,
SESSION_START(d, INTERVAL '1' SECOND) AS c,
SESSION_END(d, INTERVAL '1' SECOND) AS f,
COUNT(a) AS cnt
FROM mq_stream
GROUP BY SESSION(d, INTERVAL '1' SECOND), a
3.2 事件时间(Event Time)
CREATE TABLE FullLinkTest(
after_id int AS id,
after_userid varchar AS userid,
after_username varchar AS username,
after_prodid varchar AS prodid,
after_price double AS price,
after_amount int AS amount,
after_discount double AS discount,
after_tm bigint AS tm,
WATERMARK FOR tm AS withOffset(tm,30000) --Watermark计算方法。
)WITH(
type ='kafka11',
bootstrapServers ='<yourbootstrapServers>',
zookeeperQuorum ='<yourzookeeperQuorum>',
offsetReset ='latest',
topic ='<yourtopicname>',
timezone='<yourtimezone>',
topicIsPattern ='false',
parallelism ='1'
);
CREATE TABLE totalSales(
totalSales DOUBLE,
tms TIMESTAMP,
tme TIMESTAMP
)WITH(
type ='mysql',
url ='<yourmysqlurl>',
userName ='<youruserName>',
password ='<yourpassword>',
tableName ='<yourtableName>',
parallelism ='1'
);
insert into totalSales
select
sum(price * amount * discount) as totalSales,
TIMESTAMPADD(HOUR,8,TUMBLE_START( ROWTIME,INTERVAL '10' SECOND)) as tms,
TIMESTAMPADD(HOUR,8,TUMBLE_END (ROWTIME,INTERVAL '10' SECOND)) as tme
from FullLinkTest
group by TUMBLE( ROWTIME,INTERVAL '10' SECOND);
4.EventTime和Processing Time比较
相较于Event Time,Processing Time有如下特点:
- 简单易行,不用考虑实时计算任务和机器之间的延迟问题
- 高性能,低延迟
EventTime通常需要在源数据中指定业务时间字段,而Processing Time不需要。
所以,通常使用Processing Time进行处理。需要特殊指定某个业务字段作为时间字段的场景,则使用EventTime。