Flink 窗口概念与join汇总总结
- 1 SQL语法中窗口语法相关(仅仅是flinksql中 窗口的语法)
- 1.1 sql窗口
- 1.2 window topN
- 2 java/SQL join语法与介绍
- 2.1 有界join
- 2.1.1 Window Join
- 2.1.2 Interval Join
- 2.1.3 Temporary Join
- 2.1.4 LoopUp Join
- 2.2 无界join
- 2.2.1 Regular Join
- 2.3 over 窗口聚合
- 3 join优化方案
- 3.1 key相同时共用state
- 3.2 state过大优化
- 3.3 使用外部存储保存state
- 4 Flink SQL 维表 JOIN 的优化
- 维表 JOIN 的常见问题
- 优化点 1:Async I/O
- 优化点 2:维表缓存
- 优化点 3:批量关联
- 优化点 4:延迟关联
- 引用
- 1 窗口概念与分类与相关api(非sql)
- 1.1 窗口相关概念
- 1.2 窗口计算api
- 1.3 窗口指派api
- 2 窗口聚合算子
- 2.1 聚合算子分类
- 2.2 聚合算子代码示例
- 3flink时间概念
- 3.1 分配器(Window Assinger)
- 3.2 触发器(Trigger)
- 3.3 驱逐器(Evictor)
- 4窗口的触发机制
- 4.1 窗口触发原理
- 4.3 对照 EventTimeTrigger 的 ProcessingTimeTrigger
- 4.2 自定义窗口触发器
- 数据延迟处理与超时场景解决方案
- 延迟处理的方案
- 超时场景解决方案
- 1 process function 概述
对于mysql或者hive等计算引擎的join相信大家都有一定了解,两个离线全量数据集根据规则匹配输出结果。这里引出一个概念,像这种数据范围固定的数据,可以称为有界数据,因为数据最大就那么大,100条关联100条 最多的关联结果就是笛卡尔积1w,总数据量是固定的。那么抛出一个问题,如果是两条流式数据做join,怎么做呢,数据集不是全量数据,并且是无界流,没人知道数据多大,不知道a流的数据在b中有没有 什么时候到达,这个join要如何做呢?
flink中做了两类方案来解决:1 用窗口的方式将无界数据转变成有界数据,称为和hive mysql一样的 两个确定的数据集直接的关联操作。2 还是使用无界流来做join,下面详细分析:
1 SQL语法中窗口语法相关(仅仅是flinksql中 窗口的语法)
1.1 sql窗口
目前flinksql支持三种窗口:
- 滚动窗口 (Tumble Windows)
TUMBLE(TABLE t_action,DESCRIPTOR(时间属性字段) , INTERVAL ‘10’ SECONDS[ 窗口长度 ] ) - 滑动窗口 (Hop Windows)
HOP(TABLE t_action, DESCRIPTOR(时间属性字段) , INTERVAL ‘5’ SECONDS[ 滑动步长 ], INTERVAL ‘10’ SECONDS[ 窗口长度 ] ) - 累计窗口(Cumulate Windows)
CUMULATE(TABLE t_action, DESCRIPTOR(时间属性字段) , INTERVAL ‘5’ SECONDS[ 更新步长 ], INTERVAL ‘10’ SECONDS[ 窗口最大长度 ] )
暂时不支持会话窗口 session windows。
语法举例:
select window_start, window_end, channel, count(distinct guid) as uv
from table( tumble(table t_applog,descriptor(rw), interval '5' minutes) )
group by window_start,window_end,channel
1.2 window topN
代码示例,有如下数据表,其中 bidtime 被声明为了 rowtime 属性
-- bidtime,price,item,supplier_id
2020-04-15 08:05:00.000,4.00,C,supplier1
2020-04-15 08:07:00.000,2.00,A,supplier1
2020-04-15 08:09:00.000,5.00,D,supplier2
2020-04-15 08:11:00.000,3.00,B,supplier2
2020-04-15 08:09:00.000,5.00,D,supplier3
2020-04-15 08:11:00.000,6.00,B,supplier3
2020-04-15 08:11:00.000,6.00,B,supplier3
示例1:
-- 10 分钟滚动窗口中的交易金额最大的前 2 笔订单
SELECT *
FROM (SELECT bidtime, price, item, supplier_id
, row_number() over(partition by window_start,window_end order by price desc) as rn
FROM TABLE(TUMBLE(table t_bid,descriptor(rt),interval '10' minute))
)
WHERE rn<=2
示例2:
-- 10 分钟滚动窗口内交易总额最高的前两家供应商,及其交易总额和交易单数
SELECT *
FROM
(SELECT window_start,
window_end,
supplier_id,
price_amt,
bid_cnt,
row_number() over(partition BY window_start,window_end
ORDER BY price_amt DESC) AS rn
FROM
(SELECT window_start,
window_end,
supplier_id,
sum(price) AS price_amt,
count(1) AS bid_cnt
FROM table(tumble(TABLE t_bid,descriptor(rt),interval '10' minutes))
GROUP BY window_start,
window_end,
supplier_id))
WHERE rn<=2
2 java/SQL join语法与介绍
常规 join,flink 底层是会对两个参与 join 的输入流中的数据进行状态存储的; 所以,随着时间的推进,状态中的数据量会持续膨胀,可能会导致过于庞大,从而降低系统的整体效 率;可以如何去缓解:自己根据自己业务系统数据特性(估算能产生关联的左表数据和右表数据到达的最 大时间差),根据这个最大时间差,去设置 ttl 时长;
StreamTableEnvironment tenv = StreamTableEnvironment.create(env); // 设置 table 环境中的状态 ttl 时长 tenv.getConfig().getConfiguration().setLong(“table.exec.state.ttl”,60601000L);
2.1 有界join
2.1.1 Window Join
window join就是将两条流划分出时间窗口,数据已经被划分为窗口,无界数据变为有界数据,就和离线批处理的方式一样了,两个窗口的数据简单的进行关联即可,窗口结束就把数据下发下去,关联到的数据就下发 [A, B],没有关联到的数据取决于是否是 outer join 然后进行数据下发。
- DataStream API
flinkEnv.env()
// A 流
.addSource(new SourceFunction<Object>() {
@Override
public void run(SourceContext<Object> ctx) throws Exception {
}
@Override
public void cancel() {
}
})
// B 流
.join(flinkEnv.env().addSource(new SourceFunction<Object>() {
@Override
public void run(SourceContext<Object> ctx) throws Exception {
}
@Override
public void cancel() {
}
}))
// A 流的 keyby 条件
.where(new KeySelector<Object, Object>() {
@Override
public Object getKey(Object value) throws Exception {
return null;
}
})
// B 流的 keyby 条件
.equalTo(new KeySelector<Object, Object>() {
@Override
public Object getKey(Object value) throws Exception {
return null;
}
})
// 开窗口
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
// 窗口中关联到的数据的处理逻辑
.apply(new JoinFunction<Object, Object, Object>() {
@Override
public Object join(Object first, Object second) throws Exception {
return null;
}
});
public class WindowJoinTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//定义两条流
DataStream<Tuple2<String, Long>> stream1 = env.fromElements(
Tuple2.of("a", 1000L),
Tuple2.of("b", 1000L),
Tuple2.of("a", 2000L),
Tuple2.of("b", 2000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}
)
);
DataStream<Tuple2<String, Long>> stream2 = env.fromElements(
Tuple2.of("a", 3000L),
Tuple2.of("b", 3000L),
Tuple2.of("a", 4000L),
Tuple2.of("b", 4000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}
)
);
stream1
.join(stream2)
.where(data -> data.f0)
.equalTo(data -> data.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
@Override
public String join(Tuple2<String, Long> left, Tuple2<String, Long> right) throws Exception {
return left + "=>" + right;
}
})
.print();
env.execute();
}
}
上述解决方案只支持 inner join,即窗口内能关联到的才会下发,关联不到的则直接丢掉。
如果你想实现 window 上的 outer join,可以使用 coGroup 算子,案例如下:
public class CogroupFunctionDemo02 {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
// A 流
DataStream<Tuple2<String,String>> input1=env.socketTextStream("",9002)
.map(new MapFunction<String, Tuple2<String,String>>() {
@Override
public Tuple2<String,String> map(String s) throws Exception {
return Tuple2.of(s.split(" ")[0],s.split(" ")[1]);
}
});
// B 流
DataStream<Tuple2<String,String>> input2=env.socketTextStream("",9001)
.map(new MapFunction<String, Tuple2<String,String>>() {
@Override
public Tuple2<String,String> map(String s) throws Exception {
return Tuple2.of(s.split(" ")[0],s.split(" ")[1]);
}
});
// A 流关联 B 流
input1.coGroup(input2)
// A 流的 keyby 条件
.where(new KeySelector<Tuple2<String,String>, Object>() {
@Override
public Object getKey(Tuple2<String, String> value) throws Exception {
return value.f0;
}
}).equalTo(new KeySelector<Tuple2<String,String>, Object>() {
// B 流的 keyby 条件
@Override
public Object getKey(Tuple2<String, String> value) throws Exception {
return value.f0;
}
})
// 窗口
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(3)))
.apply(new CoGroupFunction<Tuple2<String,String>, Tuple2<String,String>, Object>() {
// 可以自定义实现 A 流和 B 流在关联不到时的输出数据格式
@Override
public void coGroup(Iterable<Tuple2<String, String>> iterable, Iterable<Tuple2<String, String>> iterable1, Collector<Object> collector) throws Exception {
StringBuffer buffer=new StringBuffer();
buffer.append("DataStream frist:\n");
for(Tuple2<String,String> value:iterable){
buffer.append(value.f0+"=>"+value.f1+"\n");
}
buffer.append("DataStream second:\n");
for(Tuple2<String,String> value:iterable1){
buffer.append(value.f0+"=>"+value.f1+"\n");
}
collector.collect(buffer.toString());
}
}).print();
env.execute();
}
}
或者你还可以使用 connect 算子自定义各种关联操作(connect 算子相比 join、coGroup 算子灵活很多):
// (userEvent, userId)
KeyedStream<UserEvent, String> customerUserEventStream = env
.addSource(kafkaUserEventSource)
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor(Time.hours(24)))
.keyBy(new KeySelector<UserEvent, String>() {
@Override
public String getKey(UserEvent userEvent) throws Exception {
return userEvent.getUserId();
}
});
//customerUserEventStream.print();
final BroadcastStream<Config> configBroadcastStream = env
.addSource(kafkaConfigEventSource)
.broadcast(configStateDescriptor);
final FlinkKafkaProducer010 kafkaProducer = new FlinkKafkaProducer010<EvaluatedResult>(
params.get(OUTPUT_TOPIC),
new EvaluatedResultSerializationSchema(),
producerProps);
DataStream<EvaluatedResult> connectedStream = customerUserEventStream
.connect(configBroadcastStream)
.process(new ConnectedBroadcastProcessFuntion());
- SQL
SELECT
L.num as L_Num
, L.id as L_Id
, R.num as R_Num
, R.id as R_Id
, L.window_start
, L.window_end
FROM (
SELECT *
FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) L
FULL JOIN (
SELECT *
FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
) R
ON L.num = R.num
AND L.window_start = R.window_start
AND L.window_end = R.window_end;
- window join总结
当我们的窗口大小划分的越细时,在窗口边缘关联不上的数据就会越多,数据质量就越差。窗口大小划分的越宽时,窗口内关联上的数据就会越多,数据质量越好,但是产出时效性就会越差。所以小伙伴萌在使用时要注意取舍。
举个例子:以曝光关联点击来说,如果我们划分的时间窗口为 1 分钟,那么一旦出现曝光在 0:59,点击在 1:01 的情况,就会关联不上,当我们的划分的时间窗口 1 小时时,只有在每个小时的边界处的数据才会出现关联不上的情况。
该种解决方案适用于可以评估出窗口内的关联率高的场景,如果窗口内关联率不高则不建议使用。
注意:这种方案由于上面说到的数据质量和时效性问题在实际生产环境中很少使用。
2.1.2 Interval Join
其也是将两条流的数据从无界数据变为有界数据,但是这里的有界和上节说到的 Flink Window Join 的有界的概念是不一样的,这里的有界是指两条流之间的有界。
以 A 流 join B 流举例,interval join 可以让 A 流可以关联 B 流一段时间区间内的数据,比如 A 流关联 B 流前后 5 分钟的数据。
数据已经被划分为窗口,无界数据变为有界数据,就和离线批处理的方式一样了,两个窗口的数据简单的进行关联即可。窗口结束(这里的窗口结束是指 interval 区间结束,区间的结束是利用 watermark 来判断的)就把数据下发下去,关联到的数据就下发 [A, B],没有关联到的数据取决于是否是 outer join 然后进行数据下发。
-
时间区间JOIN:让一条流去JOIN另一条流的前后一段时间内的数据,INTERVAL JOIN可以避免回撤流的产生,在某些场景下,下游输出系统不具备处理回撤流的能力,此时可以借助INTERVAL JOIN
-
INNER INTERVAL JOIN:只有两条流 JOIN 到(满足ON中的条件:两条流的数据在时间区间 + 满足其他等值条件)才输出,输出 +[L, R]
-
LEFT INTERVAL JOIN:流任务中,左流数据到达之后,如果没有JOIN到右流的数据,就会等待(放在 State 中等),如果之后右流之后数据到达之后,发现能和刚刚那条左流数据 JOIN 到,则会输出+[L, R]。事件时间中随着 Watermark 的推进, 如果发现发现左流 State 中的数据过期了,就把左流中过期的数据从 State 中删除,然后输出+[L, R],如果右流 State 中的数据过期了,就直接从 State 中删除
-
RIGHT INTERVAL JOIN:处理逻辑和LEFT INTERVAL JOIN类似
-
FULL INTERVAL JOIN:流任务中,左流或者右流的数据到达之后,如果没有 Join 到另外一条流的数据,就会等待(左流放在左流对应的 State 中等,右流放在右流对应的 State 中等),如果之后另一条流数据到达之后,发现能和刚刚那条数据 Join 到,则会输出+[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间),发现 State 中的数据能够过期了,就将这些数据从 State 中删除并且输出(左流过期输出 +[L, NULL],右流过期输出 -[NULL, R])
-
DataStream API
clickRecordStream
.keyBy(record -> record.getMerchandiseId())
.intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId()))
// 定义 interval 的时间区间
.between(Time.seconds(-30), Time.seconds(30))
.process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
@Override
public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {
collector.collect(StringUtils.join(Arrays.asList(
accessRecord.getMerchandiseId(),
orderRecord.getPrice(),
orderRecord.getCouponMoney(),
orderRecord.getRebateAmount()
), '\t'));
}
})
.print();
- SQL
CREATE TABLE show_log_table (
log_id BIGINT,
show_params STRING,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.show_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE click_log_table (
log_id BIGINT,
click_params STRING,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.click_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
show_log_table.log_id as s_id,
show_log_table.show_params as s_params,
click_log_table.log_id as c_id,
click_log_table.click_params as c_params
FROM show_log_table FULL JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '5' SECOND AND click_log_table.row_time
- Interval Join总结
interval join 的方案比 window join 方案在数据质量上好很多,但是其也是存在 join 不到的情况的。并且如果为 outer join 的话,outer 一测的流数据需要要等到区间结束才能下发。
该种解决方案适用于两条流之间可以明确评估出相互延迟的时间是多久的,这里我们可以使用离线数据进行评估,使用离线数据的两条流的时间戳做差得到一个分布区间。
比如在 A 流和 B 流时间戳相差在 1min 之内的有 95%,在 1-4 min 之内的有 4.5%,则我们就可以认为两条流数据时间相差在 4 min 之内的有 99.5%,这时我们将上下界设置为 4min 就是一个能保障 0.5% 误差的合理区间。
注意:这种方案在生产环境中还是比较常用的。
2.1.3 Temporary Join
首先介绍一个时态表的概念,这是一个随时间不断变化的动态表,它可能包含表的多个快照。对于时态表中的记录,可以追踪、访问其历史版本的表称为版本表,如数据库的 changeLog; 只能追踪、访问最新版本的表称为普通表,如数据库的表。举个例子,外汇订单金额计算,要计算当时的汇率来汇总,这时汇率表用时态表就很合适。
个人理解:应该是 Temporal 时态表只能建立在append only 表上
要义: 左表的数据永远去关联右表数据的对应时间上的最新版本!
- DataStream API
import org.apache.flink.table.functions.TemporalTableFunction;
(...)
// 获取 stream 和 table 环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 提供一个汇率历史记录表静态数据集
List<Tuple2<String, Long>> ratesHistoryData = new ArrayList<>();
ratesHistoryData.add(Tuple2.of("US Dollar", 102L));
ratesHistoryData.add(Tuple2.of("Euro", 114L));
ratesHistoryData.add(Tuple2.of("Yen", 1L));
ratesHistoryData.add(Tuple2.of("Euro", 116L));
ratesHistoryData.add(Tuple2.of("Euro", 119L));
// 用上面的数据集创建并注册一个示例表
// 在实际设置中,应使用自己的表替换它
DataStream<Tuple2<String, Long>> ratesHistoryStream = env.fromCollection(ratesHistoryData);
Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, $("r_currency"), $("r_rate"), $("r_proctime").proctime());
tEnv.createTemporaryView("RatesHistory", ratesHistory);
// 创建和注册时态表函数
// 指定 "r_proctime" 为时间属性,指定 "r_currency" 为主键
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency"); // <==== (1)
tEnv.registerFunction("Rates", rates);
- SQL
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
ON table1.column-name1 = table2.key-name1
定义时态表 要求 (1)主键(2)WATERMARK
2.1.4 LoopUp Join
测试代码在github 上flinktest项目中 flinkcdc/src/main/java/sqllookupjoin.java
Lookup join 跟其它的 join 有较大的不同,其内在原理得从源码说起,在 flinksql 中,所有的 source connector 都实现自 DynamicTableSource 而 DynamicTableSource 下有两个子接口: LookupTableSource 和 ScanTableSource
其中,ScanTableSource 是用的最多的常规 TableSource,它会持续、完整读取源表形成 flink 中的核 心数据抽象:“数据流”; 而 LookupTableSource,则并不对源表持续、完整读取,而是在需要的时候,才根据一个(或多个) 查询 key,去临时性地查询源表得到一条(或多条)数据;
lookup join 为了提高性能,lookup 的连接器会将查询过的维表数据进行缓存(默认未开启此机制), 可以通过参数开启,比如 jdbc-connector 的 lookup 模式下,有如下参数:
lookup.cache.max-rows = (none) 未开启
lookup.cache.ttl = (none) ttl 缓存清除的时长
以 JdbcDynamicTableSource 为例
PUBLIC CLASS JdbcDynamicTableSource
implements ScanTableSource,
LookupTableSource,
SupportsProjectionPushDown,
SupportsLimitPushDown {
它实现了上述两种接口,因而它是两种读取模式的混合封装体 因而,它也实现了上述两个接口中各自的一个重要方法:
getLookupRuntimeProvider
getScanRuntimeProvider
对于 lookupRuntimeProvider 来说,最重要的是其中的: JdbcRowDataLookupFunction
JdbcRowDataLookupFunction:
// LookupFunction 中最重要的方法就是 eval
public void eval(Object... keys) {
RowData keyRow = GenericRowData.of(keys);
// 对于传入的 keys,先从缓存中获取要查询的数据
if (cache != null) {
List<RowData> cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (RowData cachedRow : cachedRows) {
// 如果缓存中拿到了数据,则直接输出
collect(cachedRow);
}
return;
}
}
// 否则,用 jdbc 去查询
for (int retry = 0; retry <= maxRetryTimes; retry++) {
try {
// 构建 jdbc 查询语句
statement.clearParameters();
statement = lookupKeyRowConverter.toExternal(keyRow, statement);
// 执行查询语句,并获得 resultset
try (ResultSet resultSet = statement.executeQuery()) {
if (cache == null) {
while (resultSet.next()) {
collect(jdbcRowConverter.toInternal(resultSet));
}
} else {
ArrayList<RowData> rows = new ArrayList<>();
// 迭代 resultset
while (resultSet.next()) {
// 转成内部数据类型 RowData
RowData row = jdbcRowConverter.toInternal(resultSet);
// 将数据装入一个 list 后一次性输出
rows.add(row);
collect(row);
}
// 并将查到的数据,放入缓存
rows.trimToSize();
cache.put(keyRow, rows);
}
}
} catch (Exception e) {
// 处理异常,重试或者输出错误信息
}
}
}
在关联维度表时。JDBC 连接器可以用在时态表关联中作为一个可 lookup 的 source (又称为维表),当前只支持同步的查找模式。
lookup cache 的主要目的是用于提高时态表关联 JDBC 连接器的性能。默认情况下,lookup cache 不开启,所以所有请求都会发送到外部数据库。
当 lookup cache 被启用时,每个进程(即 TaskManager)将维护一个缓存。
Flink 将优先查找缓存,只有当缓存未查找到时才向外部数据库发送请求,并使用返回的数据更新缓存。
当缓存命中最大缓存行 lookup.cache.max-rows 或当行超过最大存活时间 lookup.cache.ttl 时,缓存中最老的行将被设置为已过期。
缓存中的记录可能不是最新的,用户可以将 lookup.cache.ttl 设置为一个更小的值以获得更好的刷新数据,但这可能会增加发送到数据库的请求数。
所以要做好吞吐量和正确性之间的平衡。
CREATE TEMPORARY TABLE mysql_behavior_conf (
id int
,code STRING
,map_val STRING
,update_time TIMESTAMP(3)
-- ,primary key (id) not enforced
-- ,WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'jdbc'
,'url' = 'jdbc:mysql://localhost:3306/venn'
,'table-name' = 'lookup_join_config'
,'username' = 'root'
,'passwordPA' = '******'
,'lookup.cache.max-rows' = '1000'
,'lookup.cache.ttl' = '1 minute' -- 缓存时间,即使一直在访问也会删除
);
常规 join 的底层实现,是通过在用状态来缓存两表数据实现的 * 所以,状态体积可能持续膨胀,为了安全起见,可以设置状态的 ttl 时长,来控制状态的体积上限
public class Demo18_LookupJoin {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 设置 table 环境中的状态 ttl 时长
tenv.getConfig().getConfiguration().setLong("table.exec.state.ttl", 60 * 60 * 1000L);
/***
* 1,a
* 2,b
* 3,c
* 4,d
* 5,e
*/
DataStreamSource<String> s1 = env.socketTextStream("doitedu", 9998);
SingleOutputStreamOperator<Tuple2<Integer, String>> ss1 = s1.map(s -> {
String[] arr = s.split(",");
return Tuple2.of(Integer.parseInt(arr[0]), arr[1]);
}).returns(new TypeHint<Tuple2<Integer, String>>() {
});
// 创建主表(需要声明处理时间属性字段)
tenv.createTemporaryView("a", ss1, Schema.newBuilder()
.column("f0", DataTypes.INT())
.column("f1", DataTypes.STRING())
.columnByExpression("pt", "proctime()") // 定义处理时间属性字段
.build());
// 创建 lookup 维表(jdbc connector 表)
tenv.executeSql(
"create table b( \n" +
" id int , \n" +
" name string, \n" +
" gender STRING, \n" +
" primary key(id) not enforced \n" +
") with (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://doitedu:3306/flinktest',\n" +
" 'table-name' = 'stu2',\n" +
" 'username' = 'root',\n" +
" 'password' = 'root' \n" +
")"
);
// lookup join 查询
tenv.executeSql("select a.*,c.* from a JOIN b FOR SYSTEM_TIME AS OF a.pt AS c \n" +
"ON a.f0 = c.id").print();
env.execute();
}
}
其实本质上跟使用异步IO加缓存实现的效果相同,lookup join底层也是使用guava 的 LocalCache做缓存,本质约等于异步io,但lookup join 为了提高性能,lookup 的连接器会将查询过的维表数据进行缓存(默认未开启此机制), 可以通过参数开启,比如 jdbc-connector 的 lookup 模式下,有如下参数: lookup.cache.max-rows = (none) 未开启 lookup.cache.ttl = (none) ttl 缓存清除的时长
现在让我们详细看下 LookupJoin 对应的 Operator 是如何进行维表关联的。
前往 CommonExecLookupJoin.translateToPlanInternal() 方法[1],可以看到这个 Operator 的 operatorFactory 由 createAsyncLookupJoin 或者 createSyncLookupJoin 生成,最终生成的 LookupJoinRunner 算子使用用户定义的 LookupFunction 来作为最终访问外部维表的函数。
Lookup JOIN 算子的调用链如下图所示:
LookupTableSource 和 LookupFunction
通过上面的分析,我们知道维表 JOIN 实际上基于 Flink SQL 的 LookupTableSource 实现。LookupTableSource 的 scan 逻辑基于 UDF LookupFunction,当事实表的数据到来时,调用 LookupFunction 的 eval 方法,前往外部数据源进行关联查询。代码详情请关注 LookupTableSource.java。
LookupFunction 的实现通常分为以下几个部分:
- 在 open() 方法中建立并维护与外部系统的连接;
- eval() 方法实现与外部系统的关联逻辑。
2.2 无界join
2.2.1 Regular Join
regular join 还是基于无界数据进行关联,以 A 流 left join B 流举例,A 流数据到来之后,直接去尝试关联 B 流数据。
- 如果关联到了则直接下发关联到的数据
- 如果没有关联到则也直接下发没有关联到的数据,后续 B 流中的数据到来之后,会把之前下发下去的没有关联到数据撤回,然后把关联到的数据数据进行下发。由此可以看出这是基于 Flink SQL 的 retract 机制,则也就说明了其目前只支持 Flink SQL。
两条流的数据会尝试关联,能关联到直接下发,关联不到先下发一个目前的结果数据。
- SQL
CREATE TABLE show_log_table (
log_id BIGINT,
show_params STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.show_params.length' = '3',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE click_log_table (
log_id BIGINT,
click_params STRING
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.click_params.length' = '3',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
show_log_table.log_id as s_id,
show_log_table.show_params as s_params,
click_log_table.log_id as c_id,
click_log_table.click_params as c_params
FROM show_log_table
LEFT JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;
-
实时REGULAR JOIN支持等值JOIN和不等值JOIN,等值JOIN SHUFFLE策略是HASH,非等值JOIN策略是GLOBAL,所有数据发往一个并发,按照非等值条件进行关联
-
REGULAR JOIN会将两条流的所有数据都存储在 State 中,所以 Flink 任务的 State 会无限增大,因此需要为 State 配置合适的 TTL,以防止 State 过大
数据质量和时效性高的原因都是因为 regular join 会保障目前 Flink 任务已经接收到的数据中能关联的一定是关联上的,即使关联不上,数据也会下发,完完全全保障了当前数据的客观性和时效性。
-
Regular Join总结
该种解决方案虽然是目前在产出质量、时效性上最好的一种解决方案,但是在实际场景中使用时,也存在一些问题: -
基于 retract 机制,所有的数据都会存储在 state 中以判断能否关联到,所以我们要设置合理的 state ttl 来避免大 state 问题导致的任务不稳定
-
基于 retract 机制,所以在数据发生更新时,会下发回撤数据、最新数据 2 条消息,当我们的关联层级越多,则下发消息量的也会放大,并且会出现数据回撤导致的udf失效 ,及去重问题。
-
sink 组件要支持 retract,我们不要忘了最终数据是要提供数据服务给需求方进行使用的,所以我们最终写入的数据组件也需要支持 retract,比如 MySQL。如果写入的是 Kafka,则下游消费这个 Kafka 的引擎也需要支持回撤\更新机制。
2.3 over 窗口聚合
row_number( ) over ( ) flinksql 中,over 聚合时,指定聚合数据区间有两种方式
- 方式 1,带时间设定区间
RANGE BETWEEN INTERVAL ‘30’ MINUTE PRECEDING AND CURRENT ROW - 方式 2,按行设定区间
ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
over window 可以单独定义并重复使用,从而简化代码
3 join优化方案
但是我们可以发现,无论是哪一种 Join 方案,Join 的前提都是将 A 流和 B 流的数据先存储在状态中,然后再进行关联。
即在实际生产中使用时常常会碰到的问题就是:大状态的问题。
关于大状态问题业界常见两种解决思路:
- 减少状态大小:在 Flink Join 中的可以想到的优化措施就是减少 state key 的数量。在未优化之前 A 流和 B 流的数据往往是存储在单独的两个 State 实例中的,那么我们的优化思路就是将同 Key 的数据放在一起进行存储,一个 key 的数据只需要存储一份,减少了 key 的数量
- 转移状态至外存:大 State 会导致 Flink 任务不稳定,那么我们就将 State 存储在外存中,让 Flink 任务轻量化,比如将数据存储在 Redis 中,A 流和 B 流中相同 key 的数据共同维护在一个 Redis 的 hashmap 中,以供相互进行关联
3.1 key相同时共用state
将两条流的数据使用 union、connect 算子合并在一起,然后使用一个共享的 state 进行处理。
FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);
flinkEnv.env().setParallelism(1);
flinkEnv.env()
.addSource(new SourceFunction<Object>() {
@Override
public void run(SourceContext<Object> ctx) throws Exception {
}
@Override
public void cancel() {
}
})
.keyBy(new KeySelector<Object, Object>() {
@Override
public Object getKey(Object value) throws Exception {
return null;
}
})
.connect(flinkEnv.env().addSource(new SourceFunction<Object>() {
@Override
public void run(SourceContext<Object> ctx) throws Exception {
}
@Override
public void cancel() {
}
}).keyBy(new KeySelector<Object, Object>() {
@Override
public Object getKey(Object value) throws Exception {
return null;
}
}))
// 左右两条流的数据
.process(new KeyedCoProcessFunction<Object, Object, Object, Object>() {
// 两条流的数据共享一个 mapstate 进行处理
private transient MapState<String, String> mapState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.mapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, String>("a", String.class, String.class));
}
@Override
public void processElement1(Object value, Context ctx, Collector<Object> out) throws Exception {
}
@Override
public void processElement2(Object value, Context ctx, Collector<Object> out) throws Exception {
}
})
.print();
3.2 state过大优化
定期清理state,比如在曝光关联点击的情况下,如果我们能明确一次曝光只有一次点击的话,只要这条曝光或者点击被关联到过,那么我们就可以在 KeyedCoProcessFunction 中自定义逻辑将已经被关联过得曝光、点击的 state 数据进行删除,以减小 state,减轻任务压力。
3.3 使用外部存储保存state
外存 State 到 redis。
此种方案就是完全不使用 Flink 的 state,直接将来的数据存储到 Redis 中进行维护,A 流的数据过来之后,去 Redis 中找 B 流的数据,B 流的数据过来之后,去 Redis 中找 A 流的数据。
某些金融公司内的关联,state 是不能被清理的,比如存储了借款信息之后,这些信息后续还是可能被修改的。所以这种场景下需要存储全量的 state。
4 Flink SQL 维表 JOIN 的优化
维表 JOIN 的常见问题
维表 Join 的默认策略是实时、同步查询维表,每条流数据到来时,在 Flink 算子中直接访问维表数据源来进行关联。这种方式可以保证维表数据是最新的,但是当数据流量过大时,频繁的维表实时查询会对外部系统带来巨大的压力,可能导致连接失败、处理线程打满等情况,出现线程阻塞、数据返回缓慢等后果,影响任务整体的吞吐量。而且这种方案对外部系统能承受的 QPS 要求较高,在大数据实时计算场景下,QPS 远高于普通的后台系统,峰值高达百万甚至千万,导致整体作业处理瓶颈转移到外部系统。
此外,维表并不是永远不变的,而维表的变化可能导致无法关联。例如维表有新增维度,而 JOIN 操作发生在维度新增之前,由于维表 JOIN 只能关联处理时间的快照,就会导致事实数据关联不上。这也是很多用户的使用痛点。
优化点 1:Async I/O
维表 JOIN 默认为同步访问方式,上游每输入一条数据就会前往外部表中查询一次,等待返回后输出关联结果,期间的网络耗时与外部表的查询延迟极大地阻碍了流作业的吞吐,加大了数据处理延迟。为了解决同步访问外部数据源的问题,可以引入异步模式处理查询请求,使得连续的关联请求之间不需要阻塞等待。
同步请求和异步请求外部维表,对比图如下:
基于 Flink Async I/O 和异步客户端,我们可以实现维表 JOIN 的异步化,极大地提高维表 JOIN 的吞吐率。
在 Flink SQL 中,通过继承 AsyncTableFunction,实现异步的 eval() 方法,即可完成异步维表 JOIN。以 HBaseAsyncLookupFunction 为例,简单分析异步化维表 JOIN 的实现:
实际的实现是集成AsyncTableFunction 实现了他的方法,自己在open中缓存一份数据,以hbase维度表为例
public class HBaseRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
@Override
public void open(FunctionContext context) {
// 建立线程池
final ExecutorService threadPool =
Executors.newFixedThreadPool(
THREAD_POOL_SIZE,
new ExecutorThreadFactory(
"hbase-async-lookup-worker", Threads.LOGGING_EXCEPTION_HANDLER));
Configuration config = prepareRuntimeConfiguration();
// 异步建立 HBase 连接
CompletableFuture<AsyncConnection> asyncConnectionFuture =
ConnectionFactory.createAsyncConnection(config);
asyncConnection = asyncConnectionFuture.get();
table = asyncConnection.getTable(TableName.valueOf(hTableName), threadPool);
this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral);
}
public void eval(CompletableFuture<Collection<RowData>> future, Object rowKey) {
Get get = serde.createGet(rowKey);
// 去 HBase 表中查询
CompletableFuture<Result> responseFuture = table.get(get);
responseFuture.whenCompleteAsync(
(result, throwable) -> {
if (throwable != null) {
// 发生异常时,调用 future.completeExceptionally
resultFuture.completeExceptionally(
new RuntimeException("HBase table '" + hTableName + "' not found.",throwable));
} else {
RowData rowData = serde.convertToNewRow(result);
// 正常返回时,调用 future.complete,向下游发送消息
resultFuture.complete(Collections.singletonList(rowData));
}
}
)
}
}
从代码中可以看出,维表 JOIN 异步化的关键点在于:
-
需要支持异步查询的外部数据源客户端;
-
eval 方法中使用 CompletableFuture 处理异步请求的结果。
优化点 2:维表缓存
除了将同步查询改为异步,我们还可以缓存维表中的数据,保存到 Flink 作业 TaskManager 的内存中,流数据到来时,只需要查询本地缓存中的数据,无需与远程数据源进行交互,可以极大提升数据处理的吞吐量。
维表缓存的实现有多种方式,可以用一张表格进行总结:
缓存类型 | 实现细节 | 优点 | 缺点 |
---|---|---|---|
全量缓存 | LookupFunction 的 open() 方法中预加载维表全量数据,并保存到本地缓存中。eval() 方法先查询缓存,无法找到再查询维表外部数据源。 | 1.实现简单;2.有效提高维表 JOIN 的吞吐。 | 1.数据全量保存,无法应对超大维表;2.维表数据更新比较困难。 |
LRU | 缓存 LookupFunction 的 open() 方法中初始化 LRU 缓存。eval() 方法先查询缓存,无法找到再查询维表外部数据源,返回的结果存入缓存以备下次查询。需要设置缓存 TTL 和缓存 Size 来控制缓存数据的失效时间和缓存大小。 | 1.降低数据库的查询压力;2.降低内存消耗。 | 1.QPS 很高的情况下缓存命中率较低;2.需要合理设置 TTL 和缓存大小。 |
Partitioned 缓存 | LookupFunction 的 open() 方法中初始化 LRU/全量 缓存。事实数据关联维表前,先按照 JOIN Key 进行 Hash 操作。 | 每个 Subtask 加载所需的维表数据到缓存,降低内存消耗,提高吞吐。 | Hash 操作消耗额外的网络和CPU资源。 |
全量缓存和 LRU 缓存的实现都比较简单,只需调整 LookupFunction 即可,而 Partitioned 缓存的实现涉及的改动点很多,下面进行详细分析。 |
通过观察作业拓扑和执行计划,我们发现 Cacl 算子和 LookupJoin 算子是 Chain 在一起的。维表 JOIN 是一种等值 JOIN,天然具有 Hash 属性,如果能在 Cacl 算子和 LookupJoin 算子之间生成 Hash 算子,即可实现 Partitioned cache。
方案 1
方案1:在 ExecNodeGraph 生成 Transformation 时进行调整。考虑在 CaclTransformation 和 LookupJoin Transformation 之间添加 PartitionTransformation。
修改 LookupJoin 对应的 ExecNode CommonExecLookupJoin,调整 translateToPlanInternal()方法,在生成的 outputTransformation 和上游的 inputTransformation 之间添加 PartitionTransformation,根据 JOIN Key 进行 Hash。
public Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
// 之前的代码省略
Transformation<RowData> inputTransformation =
(Transformation<RowData>) inputEdge.translateToPlan(planner);
// TODO: 新增 partitionTransformation
int[] hashKeys = lookupKeys.keySet().stream().mapToInt(key -> key).toArray();
final RowDataKeySelector keySelector =
KeySelectorUtil.getRowDataSelector(hashKeys, InternalTypeInfo.of(inputRowType));
final StreamPartitioner<RowData> partitioner =
new KeyGroupStreamPartitioner<>(
keySelector, DEFAULT_LOWER_BOUND_MAX_PARALLELISM);
final Transformation<RowData> partitionTransformation =
new PartitionTransformation<>(inputTransformation, partitioner);
partitionTransformation.setParallelism(inputTransformation.getParallelism());
OneInputTransformation<RowData, RowData> inputTransform = new OneInputTransformation<>(
partitionTransformation,
getDescription(),
operatorFactory,
InternalTypeInfo.of(resultRowType),
partitionTransformation.getParallelism());
inputTransform.setParallelism(partitionTransformation.getParallelism());
inputTransform.setOutputType(InternalTypeInfo.of(resultRowType));
return inputTransform;
}
方案 2
方案 2:在 Logical 优化阶段为节点添加 Hash FlinkRelDistribution Trait,在 Physical 优化阶段该 Trait 会生成 StreamPhysicalExchange Node。
在 StreamPhysicalLookupJoinRule.doTransform() 中将 FlinkLogicalRel 中的默认 FlinkRelDistribution Trait 替换成 Hash。
private def doTransform(
join: FlinkLogicalJoin,
input: FlinkLogicalRel,
temporalTable: RelOptTable,
calcProgram: Option[RexProgram]): StreamPhysicalLookupJoin = {
val joinInfo = join.analyzeCondition
val cluster = join.getCluster
val providedTrait = join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
var requiredTrait = input.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
val options = temporalTable.asInstanceOf[TableSourceTable].catalogTable.getOptions
// 获取维表配置
val enablePartitionedCache = options.getOrDefault("lookup.enable-partitioned-cache", "false").toBoolean
if (enablePartitionedCache) {
val requiredDistribution = FlinkRelDistribution.hash(joinInfo.leftKeys, true)
requiredTrait = input.getTraitSet
// 替换 FlinkRelDistributionTraitDef
.replace(requiredDistribution)
.replace(FlinkConventions.STREAM_PHYSICAL)
}
val convInput = RelOptRule.convert(input, requiredTrait)
new StreamPhysicalLookupJoin(
cluster,
providedTrait,
convInput,
temporalTable,
calcProgram,
joinInfo,
join.getJoinType)
}
优化点 3:批量关联
维表 JOIN 时,攒一批数据以后调用维表的批量查询接口,进行批量关联,可以减少 RPC 的调用次数,提高吞吐量。
批量关联的实现可以分为以下步骤:
添加是否开启 Batch JOIN 对应的配置,设置 Batch Size 和 Batch 触发 TTL;
-
CommonExecLookupJoin 构造 ProcessFunction 时,根据是否开启 Batch JOIN 配置分别构造 LookupJoinRunner 或 BatchLookupJoinRunner;
-
BatchLookupJoinRunner 的 processElement() 方法中实现攒批逻辑,使用 ListState 攒批,通过 timer 触发 批量关联操作;
-
调整 CodeGen 相关类,为 BatchLookupJoinRunner 对应的 generatedFetcher、generatedCollector 和 generatedCalc 赋予正确的输入和输出:List;
-
LookupFunction 的 eval 方法调用批量查询接口。
优化点 4:延迟关联
由于维表 JOIN 只能关联处理时间的快照,可能导致事实数据无法关联更新后的维度,造成关联失败。
对于这种场景,我们可以实现延迟关联功能。如果 Join 没有命中,数据无法关联,可以暂时将事实数据缓存在 Flink State 中,等待一段时间后进行重试,并且可以控制等待时间与重试次数。
延迟关联的实现可以分为以下步骤:
-
添加是否开启 Delay JOIN 对应的配置,设置 Delay Join Intervals 和 RetryTimes;
-
CommonExecLookupJoin 构造 ProcessFunction 时,根据是否开启 Delay JOIN 配置分别构造 LookupJoinRunner 或 DelayedLookupJoinRunner;
-
DelayedLookupJoinRunner 的 processElement() 方法中实现延迟 JOIN 逻辑,如果无法关联则将事实数据保存在 ListState 中,通过设置 timer 和重试次数,延时触发关联操作。
引用
https://www.cnblogs.com/baran/p/15950363.html
https://mp.weixin.qq.com/s/66FyBdXaPtAZHqRXrgPrjQ
1 窗口概念与分类与相关api(非sql)
1.1 窗口相关概念
窗口,就是把无界的数据流,依据一定规则划分成一段一段的有界数据流来计算;
既然划分成有界数据段,通常都是为了"聚合";
Keyedwindow 重要特性:任何一个窗口,都绑定在自己所属的 key 上;不同 key 的数据肯定不会划分 到相同窗口中去!
窗口分为:
- 滚动窗口
- 滑动窗口
- 会话窗口:没有固定的窗口长度,也没有固定的滑动步长,而是根据数据流中前后两个事件的时间 间隔是否超出阈值(session gap)来划分;
1.2 窗口计算api
#KeyedWindows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
#NonKeyedWindows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
1.3 窗口指派api
/*** NonKeyed 窗口,全局窗口 */
// 处理时间语义,滚动窗口
source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)));
// 处理时间语义,滑动窗口
source.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)));
// 事件时间语义,滚动窗口
source.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
// 事件时间语义,滑动窗口
source.windowAll(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)));
// 计数滚动窗口
source.countWindowAll(100);
// 计数滑动窗口
source.countWindowAll(100, 20);
/*** Keyed 窗口 */
KeyedStream<String, String> keyedStream = source.keyBy(s -> s);
// 处理时间语义,滚动窗口
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
// 处理时间语义,滑动窗口
keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)));
// 事件时间语义,滚动窗口
keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));
// 事件时间语义,滑动窗口
keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)));
// 计数滚动窗口
keyedStream.countWindow(1000);
// 计数滑动窗口
keyedStream.countWindow(1000, 100);
2 窗口聚合算子
2.1 聚合算子分类
窗口聚合算子,整体上分为两类:
- 增量聚合算子,如 min、max、minBy、maxBy、sum、reduce、aggregate
- 全量聚合算子,如 apply、process
两类聚合算子的底层区别:
- 增量聚合:一次取一条数据,用聚合函数对中间累加器更新;窗口触发时,取累加器输出结果;
- 全量聚合:数据“攒”在状态容器中,窗口触发时,把整个窗口的数据交给聚合函数;
2.2 聚合算子代码示例
#简单滚动聚合算子
keyedStream.countWindow(5, 2)
/*.max("score") */
// 得到的结果中,除了 score 是符合逻辑的结果外,其他字段是窗口中第一条的值
/*.min("score")*/
/*.maxBy("score") */
// 得到的结果是: 最大 score 所在的那一行数据
/*.minBy("score")*/
// 得到的结果是: 最小 score 所在的那一行数据
/*.sum("score")*/
// 得到的结果中,除了 score 是符合逻辑(score 之和)的结果外,其他字段是窗口中第一条
#reduce 聚合算子(增量聚合)
source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new ReduceFunction<Long>() {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
// TODO: 实现 reduce 函数,根据业务逻辑合并两个元素的值
return value1 + value2;
}
});
#Aggregate 聚合算子示例 1
/**
* 滚动聚合 api 使用示例
* 需求 一 : 每隔 10s,统计最近 30s 的数据中,每个用户的行为事件条数
* 使用 aggregate 算子来实现
*/
SingleOutputStreamOperator<Integer> resultStream = watermarkedBeanStream
.keyBy(EventBean2::getGuid)
// 参数 1: 窗口长度 ; 参数 2:滑动步长
.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))
// reduce :滚动聚合算子,它有个限制 ,聚合结果的数据类型与数据源中的数据类型是一致
/*.reduce(new ReduceFunction<EventBean>() {
@Override
public EventBean reduce(EventBean value1, EventBean value2) throws Exception {
return null;
}
})*/
.aggregate(new AggregateFunction<EventBean2, Integer, Integer>() {
/**
* 初始化累加器
* @return
*/
@Override
public Integer createAccumulator() {
return 0;
}
/**
* 滚动聚合的逻辑(拿到一条数据,如何去更新累加器)
* @param value The value to add
* @param accumulator The accumulator to add the value to
* @return
*/
@Override
public Integer add(EventBean2 value, Integer accumulator) {
return accumulator + 1;
}
/**
* 从累加器中,计算出最终要输出的窗口结算结果
* @param accumulator The accumulator of the aggregation
* @return
*/
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
/**
* 批计算模式下,可能需要将多个上游的局部聚合累加器,放在下游进行全局聚合
* 因为需要对两个累加器进行合并
* 这里就是合并的逻辑
* 流计算模式下,不用实现!
* @param a An accumulator to merge
* @param b Another accumulator to merge
* @return
*/
@Override
public Integer merge(Integer a, Integer b) {
return a + b;
}
});
/*resultStream.print();*/
# Aggregate 聚合算子示例 2
/*** 需求 二 : 每隔 10s,统计最近 30s 的数据中,每个用户的平均每次行为时长
* 要求用 aggregate 算子来做聚合
* 滚动聚合 api 使用示例
*/
SingleOutputStreamOperator<Double> resultStream2 = watermarkedBeanStream
.keyBy(EventBean2::getGuid)
.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.milliseconds(10)))
// 泛型 1: 输入的数据的类型 ; 泛型 2: 累加器的数据类型 ; 泛型 3: 最终结果的类型
.aggregate(new AggregateFunction<EventBean2, Tuple2<Integer, Integer>, Double>() {
@Override
public Tuple2<Integer, Integer> createAccumulator() {
return Tuple2.of(0, 0);
}
@Override
public Tuple2<Integer, Integer> add(EventBean2 eventBean, Tuple2<Integer, Integer> accumulator) {
// accumulator.setField(accumulator.f0+1,0);
// accumulator.setField(accumulator.f1+eventBean.getActTimelong(),1);
// return accumulator;
return Tuple2.of(accumulator.f0 + 1, accumulator.f1 + eventBean.getActTimelong());
}
@Override
public Double getResult(Tuple2<Integer, Integer> accumulator) {
return accumulator.f1 / (double) accumulator.f0;
}
/*** 在批计算模式中,shuffle 的上游可以做局部聚合,然后会把局部聚合结果交给下游去做全局聚合
* 因此,就需要提供 两个局部聚合结果进行合并的逻辑
* 在流式计算中,不存在:上游局部聚合后再交给下游全局聚合的机制!
* 所以,在流式计算模式下,不用实现下面的方法
* @param a An accumulator to merge
* @param b Another accumulator to merge
* @return
*/
@Override
public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
}
});
/*resultStream2.print();*/
# apply 聚合算子(全量聚合)
/**
* 全窗口计算 api 使用示例
* 需求 三 : 每隔 10s,统计最近 30s 的数据中,每个用户的行为事件中,行为时长最长的前 2 条记录
* 要求用 apply 或者 process 算子来实现
**/
// 1. 用 apply 算子来实现需求
SingleOutputStreamOperator<EventBean2> resultStream3 = watermarkedBeanStream
.keyBy(EventBean2::getGuid)
.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))) // 泛型 1: 输入数据类型; 泛型 2:输出结果类型; 泛型 3: key 的类型, 泛型 4:窗口类型
.apply(new WindowFunction<EventBean2, EventBean2, Long, TimeWindow>() {
/**
* @param key 本次传给咱们的窗口是属于哪个 key 的
* @param window 本次传给咱们的窗口的各种元信息(比如本窗口的起始时间,结束时间)
* @param input 本次传给咱们的窗口中所有数据的迭代器
* @param out 结果数据输出器
* @throws Exception
*/
@Override
public void apply(Long key, TimeWindow window, Iterable<EventBean2> input, Collector<EventBean2> out) throws Exception {
// low bi 写法: 从迭代器中迭代出数据,放入一个 arraylist,然后排序,输出前 2 条
ArrayList<EventBean2> tmpList = new ArrayList<>();
// 迭代数据,存入 list
for (EventBean2 eventBean2 : input) {
tmpList.add(eventBean2);
}
// 排序
Collections.sort(tmpList, new Comparator<EventBean2>() {
@Override
public int compare(EventBean2 o1, EventBean2 o2) {
return o2.getActTimelong() - o1.getActTimelong();
}
});
// 输出前 2 条
for (int i = 0; i < Math.min(tmpList.size(), 2); i++) {
out.collect(tmpList.get(i));
}
}
});
/*resultStream3.print();*/
# process 聚合算子
/*** 全窗口计算 api 使用示例
* 需求 三:每隔 10s,统计最近 30s 的数据中,每个用户的行为事件中,行为时长最长的前 2 条记录
* 要求用 apply 或者 process 算子来实现
***/
SingleOutputStreamOperator<String> resultStream4 = watermarkedBeanStream
.keyBy(EventBean2::getGuid)
.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))
.process(new ProcessWindowFunction<EventBean2, String, Long, TimeWindow>() {
@Override
public void process(Long aLong, ProcessWindowFunction<EventBean2, String, Long, TimeWindow>.Context context, Iterable<EventBean2> input, Collector<String> out) throws Exception {
// 本次窗口的元信息
TimeWindow window = context.window();
long maxTimestamp = window.maxTimestamp();// 本窗口允许的最大时间戳 [1000,2000) ,其中 1999 就是允许的最大时间戳; 2000 就是窗口的 end
long windowStart = window.getStart();
long windowEnd = window.getEnd();
// low bi 写法: 从迭代器中迭代出数据,放入一个 arraylist,然后排序,输出前 2 条
ArrayList<EventBean2> tmpList = new ArrayList<>();
// 迭代数据,存入 list
for (EventBean2 eventBean2 : input) {
tmpList.add(eventBean2);
}
// 排序
Collections.sort(tmpList, new Comparator<EventBean2>() {
@Override
public int compare(EventBean2 o1, EventBean2 o2) {
return o2.getActTimelong() - o1.getActTimelong();
}
});
// 输出前 2 条
for (int i = 0; i < Math.min(tmpList.size(), 2); i++) {
EventBean2 bean = tmpList.get(i);
out.collect(
"窗口 start:" + windowStart + "," +
"窗口 end:" + windowEnd + "," +
bean.getGuid() + "," +
bean.getEventId() + "," +
bean.getTimeStamp() + "," +
bean.getPageId() + "," +
bean.getActTimelong()
);
}
}
});
resultStream4.print();
window与windowall 区别
在 Flink 中,windowAll 和 window 都是窗口算子,用于将数据流划分成多个窗口,并对每个窗口中的数据进行计算。
windowAll 是一个全局窗口算子,它会将整个输入流中的数据放入同一个窗口中进行计算。这意味着所有数据都会被汇总到同一个窗口中,而窗口的大小和滑动步长也是全局设置的。
相比之下,window 是一个针对 KeyedStream 的算子,它会根据 KeyedStream 中的键值对将输入数据流划分为多个不同的窗口。因此,window 算子可以更加灵活地处理不同 Key 的数据。
此外,由于窗口的不同,两个算子支持的操作也有所不同。windowAll 支持的操作包括 reduce 和 aggregate 等,而 window 算子还支持 apply 等更加灵活的操作。因此,在实际应用中,需要根据具体的场景选择适合的窗口算子。
3flink时间概念
Flink 在流应⽤程序中三种 Time 概念
Time 类型 | 解释 |
---|---|
Processing Time | 事件被机器处理的系统时间,提供最好的性能和最低的延迟。分支式异步环境下,容易受到事件到达系统的速度,事件在系统内操作流动速度以及中断的影响。 |
Event Time | 一般指数据本身携带的时间戳,能够满足在特定场景下数据准确性的需求。一般而言与 Processing Time 有时间延迟,需要引入水印机制处理事件乱序和时间乱序问题。 |
Ingestion Time | 事件进入 Flink 的时间。一般在 Flink Source 定义,提供给下游窗口计算的触发计算。 |
⼀般来说,在⽣产环境中 Event Time 与 Processing Time 是常用的策略。
watermark最重要的就是与window配合来实现实时数据的计算,对于window的组成主要有三个成员,也是写代码时经常写缺不知道具体含义的三个方法。
3.1 分配器(Window Assinger)
窗口分配器定义了数据流中的元素如何分配到窗口中,通过在分组数据流中调用 .window(…) 或者非分组数据流中调用 .windowAll(…) 时指定窗口分配器(WindowAssigner)来实现。WindowAssigner 负责将每一个到来的元素分配给一个或者多个窗口(window), Flink 提供了一些常用的预定义的窗口分配器,即:滚动窗口、滑动窗口、会话窗口和全局窗口。你也可以通过继承 WindowAssigner 类来自定义自己的分配器。
Assinger | 备注 |
---|---|
GlobalWindows | 所有的数据都分配到同一个窗口。 |
MergingWindowAssigner | 可 Merge 的窗口分配处理。 |
SlidingProcessingTimeWindows | 基于 Processing Time 的滚动窗口分配处理。 |
SlidingEventTimeWindows | 基于 Event Time 的滚动窗口分配处理。 |
TumblingProcessingTimeWindows | 基于 Processing Time 的滑动窗口分配处理。 |
TumblingEventTimeWindows | 基于 Event Time 的滑动窗口分配处理。 |
ProcessingTimeSessionWindows | 基于 Processing Time 且可 merge 的会话窗口分配处理。 |
EventTimeSessionWindows | 基于 Event Time 且可 merge 会话窗口分配处理。 |
3.2 触发器(Trigger)
触发器决定了一个窗口何时可以被窗口函数处理,每一个窗口分配器都有一个默认的触发器,该触发器决定合适计算和清除窗口。如果默认的触发器不能满足你的需要,你可以通过调用 trigger(…)来指定一个自定义的触发器。触发器的接口有5个方法来允许触发器处理不同的事件:
- onElement()方法,每个元素被添加到窗口时调用
- onEventTime()方法,当一个已注册的事件时间计时器启动时调用
- onProcessingTime()方法,当一个已注册的处理时间计时器启动时调用
- onMerge()方法,与状态性触发器相关,当使用会话窗口时,两个触发器对应的窗口合并时,合并两个触发器的状态。
每个触发动作的返回结果⽤ TriggerResult 定。TriggerResult 有四种状态:
- CONTINUE:什么也不做
- FIRE:触发计算
- PUGE:清除窗口中的数据
- FIRE_AND_PURGE:触发计算并清除窗口中的数据
Trigger | 备注 |
---|---|
EventTimeTrigger | 当水印通过窗口末尾时触发的触发器。 |
ProcessingTimeTrigger | 当系统时间通过窗口末尾时触发的触发器。 |
CountTrigger | 窗口元素达到阈值触发的触发器。 |
PurgingTrigger | 作为参数,使其成为带有清除功能触发器。 |
DeltaTrigger | 基于 DeltaFunction 和一个阈值的触发器。 |
3.3 驱逐器(Evictor)
Flink 的窗口模型允许指定一个除了 WindowAssigner 和 Trigger 之外的可选参数 Evitor,这个可以通过调用 evitor(…) 方法来实现。这个驱逐器(evitor)可以在触发器触发之前或者之后,或者窗口函数被应用之前清理窗口中的元素。如果没有定义 Evictor,触发器直接将所有窗⼝元素交给计算函数。
Evictor | 备注 |
---|---|
TimeEvitor | 清除时间戳小于窗口元素中的最大时间戳 - interval的元素。 |
CountEvitor | 只保存指定数量的数据。 |
DeltaEvitor | 通过一个 DeltaFunction 和一个阈值,计算窗口缓存中最近的一个元素和剩余的所有元素的 delta 值,并清除 delta 值大于或者等于阈值的元素。 |
4窗口的触发机制
4.1 窗口触发原理
窗口计算的触发,是由 Trigger 类来决定;当 Trigger 返回需要触发时;窗口算子还要调用 Evictor 来 进行元素移除操作;
Flink 中为各类内置的 WindowsAssigner 都设计了对应的默认 Trigger;
一般情况下不需要自己去重写 Trigger;除非有特别的需求;
查看EventTimeTrigger 源码查看,对比上图,数据到达触发判断,每来一条数据,都会对比数据中的 maxTimestamp 与当前的watermark做比较,窗口结束的最大时间点如果小于wm就立刻触发,否则立刻就注册一个定时器(为了防止没有数据 ,下面不触发,就是这个窗口结束时间到了 ,也要触发的意思),触发时间就是。
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
那么这个定时器的触发函数是谁呢,就是
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
就是 校验 刚刚是不是到了窗口的最大时间,好决定 要不要 触发窗口 比较来,来触发窗口。
4.3 对照 EventTimeTrigger 的 ProcessingTimeTrigger
由上面的 EventTimeTrigger 比较一下 ProcessingTimeTrigger ,可以看到
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE;
}
}
比较一下就能看出来eventtime的窗口触发,是受到事件事件的影响的,如果来了一条事件很大的数据,窗口立刻就会触发,但是process时间的窗口,就完全没有做判断,完全是,到了窗口结束时间,自动触发的类型
4.2 自定义窗口触发器
Evictor 是窗口触发前,或者触发后,对窗口中的数据移除的机制;
- 当触发条件成立时,先调用 Evictor 的 evictBefore( )方法 来进行元素移除;
- 然后,计算;
- 计算完后,还会再调用 Evictor 的 evictAfter( )方法,来进行元素移除;
# Trigger 和 Evictor 的设置代码
SingleOutputStreamOperator<String> sumResult = beanStream
.keyBy(tp -> tp.f0.getGuid())
.window(TumblingEventTimeWindows.of(Time.seconds(10))) // 事件时间滚动窗口,窗口长度为 10
// 设置自定义的 Trigger
.trigger(MyEventTimeTrigger.create())
// 设置自定义的 Evictor ,它会在窗口触发计算前,对窗口中的 e0x 标记事件进行移除
.evictor(MyTimeEvictor.of(Time.seconds(10)))
.apply(new WindowFunction<Tuple2<EventBean2, Integer>, String, Long, TimeWindow>() {
@Override
public void apply(Long aLong, TimeWindow window, Iterable<Tuple2<EventBean2, Integer>> input, Collector<String> out) throws Exception {
// 计算逻辑
}
});
# Trigger 代码示例
背景:不光要按照滚动时间窗口的时间条件触发,还要根据数据中的 eventId=e0x 来触发
class MyEventTimeTrigger extends Trigger<Tuple2<EventBean2,Integer>, TimeWindow> {
private MyEventTimeTrigger() {}
/***
* 来一条数据时,需要检查 watermark 是否已经越过窗口结束点需要触发
*/
@Override
public TriggerResult onElement(Tuple2<EventBean2,Integer> element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
// 如果窗口结束点 <= 当前的 watermark
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
} else {
// 注册定时器,定时器的触发时间为: 窗口的结束点时间
ctx.registerEventTimeTimer(window.maxTimestamp());
// 判断,当前数据的用户行为事件 id 是否等于 e0x,如是,则触发
if ("e0x".equals(element.f0.getEventId())) {
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
}
/***
* 当事件时间定时器的触发时间(窗口的结束点时间)到达了,检查是否满足触发条件
* 下面的方法,是定时器在调用
*/
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
/***
* 当处理时间定时器的触发时间(窗口的结束点时间)到达了,检查是否满足触发条件
*/
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) {
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
ctx.registerEventTimeTimer(windowMaxTimestamp);
}
}
public static MyEventTimeTrigger create() {
return new MyEventTimeTrigger();
}
}
#Evictor 代码示例
背景:不光要按照滚动时间窗口的时间条件移除数据,还要将 eventId=e0x 的数据移除
class MyTimeEvictor implements Evictor<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private final long windowSize;
private final boolean doEvictAfter;
public MyTimeEvictor(long windowSize) {
this.windowSize = windowSize;
this.doEvictAfter = false;
}
public MyTimeEvictor(long windowSize, boolean doEvictAfter) {
this.windowSize = windowSize;
this.doEvictAfter = doEvictAfter;
}
/*** 窗口触发前,调用 */
@Override
public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, TimeWindow window, EvictorContext ctx) {
if (!doEvictAfter) {
evict(elements, size, ctx);
}
}
/*** 窗口触发后,调用 */
@Override
public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, TimeWindow window, EvictorContext ctx) {
if (doEvictAfter) {
evict(elements, size, ctx);
}
}
/*** 元素移除的核心逻辑 */
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
if (!hasTimestamp(elements)) {
return;
}
long currentTime = getMaxTimestamp(elements);
long evictCutoff = currentTime - windowSize;
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {
TimestampedValue<Object> record = iterator.next();
Tuple2<EventBean2, Integer> tuple = (Tuple2<EventBean2, Integer>) record.getValue();
// 加了一个条件: 数据的 eventId=e0x,也移除
if (record.getTimestamp() <= evictCutoff || tuple.f0.getEventId().equals("e0x")) { iterator.remove();
}
}
}
private boolean hasTimestamp(Iterable<TimestampedValue<Object>> elements) {
Iterator<TimestampedValue<Object>> it = elements.iterator();
if (it.hasNext()) {
return it.next().hasTimestamp();
}
return false;
}
/*** 用于计算移除的时间截止点 */
private long getMaxTimestamp(Iterable<TimestampedValue<Object>> elements) {
long currentTime = Long.MIN_VALUE;
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {
TimestampedValue<Object> record = iterator.next();
currentTime = Math.max(currentTime, record.getTimestamp());
}
return currentTime;
}
public static MyTimeEvictor of(Time windowSize) {
return new MyTimeEvictor(windowSize.toMilliseconds());
}
}
数据延迟处理与超时场景解决方案
延迟处理的方案
- 小延迟(乱序),用 watermark 容错 (减慢时间的推进,让本已迟到的数据被认为没有迟到)
- 中延迟(乱序),用 allowedLateness (允许一定限度内的迟到,并对迟到数据重新触发窗口计算,之前输出了一条结果,在规定时间内如果来数据 2s,会重新计算一条结果输出到下游,感觉对于幂等计算很合适。)
- 大延迟(乱序),用 sideOutputLateData (将超出 allowedLateness 的迟到数据输出到一个侧流中)
超时场景解决方案
举例:两个流,一个下订单流 一个揽收流
不能把计算放在存储中解决 缺点:
- 明细数据量极大,存储overhead
- 产品端接口开发工作量大
- 大促期间,数据洪峰,读写双重压力
- OLAP数据库成本本身成本极高
基于消息队列:
- 延迟读取(数据超时未来的手动制造消息触发)
- 延迟下发(数据超时未来的手动制造消息触发)
基于Flink State
- TimerService(数据超时未来的手动制造消息触发–最好用)
- CEP(数据超时未来的取到测流处理)
1 process function 概述
process function 相对于前文所述的 map、flatmap、filter 算子来说,最大的区别是其让开发人员对数据 的 处 理 逻 辑 拥 有 更 大 的 自 由 度 ; 同 时 , ProcessFunction 继 承 了 RichFunction , 因 而 具 备 了 getRuntimeContext() ,open() ,close()等方法;
在不同类型的 datastream 上,(比如 keyed stream、windowedStream、ConnectedStream 等),应用 process function 时,flink 提供了大量不同类型的 process function,让其针对不同的 datastream 拥有更具针对 性的功能;
- ProcessFunction (普通 DataStream 上调 process 时)
- KeyedProcessFunction (KeyedStream 上调 process 时)
- ProcessWindowFunction(WindowedStream 上调 process 时)
- ProcessAllWindowFunction(AllWindowedStream 上调 process 时)
- CoProcessFuntion (ConnectedStreams 上调 process 时)
- ProcessJoinFunction (JoinedStreams 上调 process 时)
- BroadcastProcessFunction (BroadCastConnectedStreams 上调 process 时)
- KeyedBroadcastProcessFunction(KeyedBroadCastConnectedStreams 上调 process 时)
各种算子运算后所生成的 datastream 类型,及各种 datastream 类型之间的互相转换关系:
/**
* @Author: deep as the sea
* @Site: www.51doit.com
* @QQ: 657270652
* @Date: 2022/4/26
* @Desc: process 算子及 ProcessFunction 示例
* 在不同类型的数据流上,调用 process 算子时,所需要传入的 ProcessFunction 也会有不同
*/
public class _17_ProcessFunctions_Demo {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8822);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
env.setParallelism(1);
// id,eventId
DataStreamSource<String> stream1 = env.socketTextStream("localhost", 9998);
/**
* 在普通的 datastream 上调用 process 算子,传入的是 "ProcessFunction"
*/
SingleOutputStreamOperator<Tuple2<String, String>> s1 = stream1.process(new ProcessFunction<String, Tuple2<String, String>>() {
// 可以使用 生命周期 open 方法
@Override
public void open(Configuration parameters) throws Exception {
// 可以调用 getRuntimeContext 方法拿到各种运行时上下文信息
RuntimeContext runtimeContext = getRuntimeContext();
runtimeContext.getTaskName();
super.open(parameters);
}
@Override
public void processElement(String value, ProcessFunction<String, Tuple2<String, String>>.Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
// 可以做测流输出
ctx.output(new OutputTag<String>("s1", Types.STRING), value);
// 可以做主流输出
String[] arr = value.split(",");
out.collect(Tuple2.of(arr[0], arr[1]));
}
// 可以使用 生命周期 close 方法
@Override
public void close() throws Exception {
super.close();
}
});
/**
* 在 keyedStream 上调用 process 算子,传入的是 "KeyedProcessFunction"
* KeyedProcessFunction 中的,泛型 1:流中的 key 的类型;泛型 2:流中的数据的类型;泛型 3:处理后的输出结果的类型
*/
// 对 s1 流进行 keyby 分组
KeyedStream<Tuple2<String, String>, String> keyedStream = s1.keyBy(tp2 -> tp2.f0);
// 然后在 keyby 后的数据流上调用 process 算子
SingleOutputStreamOperator<Tuple2<Integer, String>> s2 = keyedStream.process(new KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<Integer, String>>() {
@Override
public void processElement(Tuple2<String, String> value, KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<Integer, String>>.Context ctx, Collector<Tuple2<Integer, String>> out) throws Exception {
// 把 id 变整数,把 eventId 变大写
out.collect(Tuple2.of(Integer.parseInt(value.f0), value.f1.toUpperCase()));
}
});
s2.print();
env.execute();
}
}