《Flink学习笔记》——第九章 多流转换

news2025/1/24 1:36:09

无论是基本的简单转换和聚合,还是基于窗口的计算,我们都是针对一条流上的数据进行处理的。而在实际应用中,可能需要将不同来源的数据连接合并在一起处理,也有可能需要将一条流拆分开,所以经常会有对多条流进行处理的场景

简单划分(两大类):

  • 分流——把一条数据流拆分成完全独立的两条或多条,一般通过侧输出流来实现
  • 合流——多条数据流合并为一条数据流,如union,connect,join,coGroup

9.1 分流

9.1.1 简单实现

其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流

例子:根据用户进行划分

public class SplitStreamByFilter {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.addSource(new ClickSource());

        SingleOutputStreamOperator<Event> maryStream = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
                return value.getUser().equals("Mary");
            }
        });

        SingleOutputStreamOperator<Event> bobStream = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
                return value.getUser().equals("Bob");
            }
        });

        SingleOutputStreamOperator<Event> otherStream = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
                return !value.getUser().equals("Mary") && !value.getUser().equals("Bob");
            }
        });

        maryStream.print("Mary");
        bobStream.print("Bob");
        otherStream.print("other");

        env.execute();
    }
}

9.1.2 使用侧输出流

public class SplitStreamByOutputTag {
    private static OutputTag<Tuple3<String, String, Long>> MaryTag = new OutputTag<Tuple3<String, String, Long>>("Mary-pv") {
    };
    private static OutputTag<Tuple3<String, String, Long>> BobTag = new OutputTag<Tuple3<String, String, Long>>("Bob-pv") {
    };

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.addSource(new ClickSource());

        SingleOutputStreamOperator<Event> processStream = stream.process(new ProcessFunction<Event, Event>() {
            @Override
            public void processElement(Event event, Context context, Collector<Event> collector) throws Exception {
                if (event.getUser().equals("Mary")) {
                    context.output(MaryTag, new Tuple3<>(event.getUser(), event.getUrl(), event.getTimestamp()));
                } else if (event.getUser().equals("Bob")) {
                    context.output(BobTag, new Tuple3<>(event.getUser(), event.getUrl(), event.getTimestamp()));
                } else {
                    collector.collect(event);
                }
            }
        });
        processStream.getSideOutput(MaryTag).print("Mary");
        processStream.getSideOutput(BobTag).print("Bob");
        processStream.print("other");

        env.execute();

    }
}

9.2 基本合流操作

9.2.1 联合(union)

将多条流合在一起

要求:要联合的流的数据类型必须相同

image-20230407152944874

返回的是DataStream

使用方法:

stream1.union(stream2, stream3, ...)

在事件时间语义下:不同流的水位线快慢不同,如果合并在一起,水位线该以哪个为准呢?

答:多流合并时的水位线以最小的那个为准,和之前介绍的并行任务水位线的传递规则完全一致。

代码略

9.2.2 连接(connect)

流的联合union虽然简单,但是受限于数据类型不能改变。Flink提供了另一种合流操作——连接,它允许不同数据类型的流进行合并

1、连接流

连接流是多条流形式上的“合并”,虽然被放在了同一个流中,但内部仍保持各自的数据形式不变,彼此之间是相互独立的。

image-20230407195601867

stream1.connect(stream2)

返回的是ConnectedStreams

ConnectedStreams在调用.map()方法时,传入的不再是一个简单的 MapFunction, 而是一个 CoMapFunction

ConnectedStreams.map(new CoMapFunction())
2、CoProcessFunction

ConnectedStreams在调用.process()时,需要传入CoProcessFunction

对于连接流 ConnectedStreams 的处理操作,需要分别定义对两条流的处理转换

public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {
    ...
    public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) 
    public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) 
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) 
    public abstract class Context {...}
    ...
}

例子:略

代码略

9.3 基于时间的合流——双流联结(Join)

对于两条流的合并,很多情况我们并不是简单地将所有数据放在一起,而是希望根据某个字段的值将它们联结起来,“配对”去做处理。例如用传感器监控火情时,我们需要将大量温度传感器和烟雾传感器采集到的信息,按照传感器 ID 分组、再将两条流中数据合并起来,如果同时超过设定阈值就要报警。

我们发现,这种需求与关系型数据库中表的 join 操作非常相近。事实上,Flink 中两条流的 connect 操作,就可以通过 keyBy 指定键进行分组后合并,实现了类似于 SQL 中的 join 操作;另外 connect 支持处理函数,可以使用自定义状态和 TimerService 灵活实现各种需求,其实已经能够处理双流合并的大多数场景。

不过处理函数是底层接口,所以尽管 connect 能做的事情多,但在一些具体应用场景下还是显得太过抽象了。比如,如果我们希望统计固定时间内两条流数据的匹配情况,那就需要设置定时器、自定义触发逻辑来实现——其实这完全可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink 的 DataStrema API 提供了两种内置的 join 算子,以及

coGroup 算子。本节我们就来做一个详细的讲解。

注:SQL 中 join 一般会翻译为“连接”;我们这里为了区分不同的算子,一般的合流操作

connect 翻译为“连接”,而把 join 翻译为“联结”。

9.3.1 窗口联结

如果我们希望将两条流的数据进行合并、且同样针对某段时间进行处理和统计,又该怎么做呢?

Flink 为这种场景专门提供了一个窗口联结(window join)算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理

1、窗口联结的调用

窗口联结在代码中的实现,首先需要调用 DataStream 的.join()方法来合并两条流,得到一个 JoinedStreams ;接着通过.where() 和.equalTo() 方法指定两条流中联结的 key ; 然后通过.window()开窗口,并调用.apply()传入联结窗口函数进行处理计算

stream1.join(stream2)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

上面代码中.where()的参数是键选择器(KeySelector),用来指定第一条流中的 key;而.equalTo()传入的 KeySelector 则指定了第二条流中的 key。两者相同的元素,如果在同一窗口中,就可以匹配起来,并通过一个“联结函数”(JoinFunction)进行处理了。

这里.window()传入的就是窗口分配器,之前讲到的三种时间窗口都可以用在这里:滚动窗口(tumbling window)、滑动窗口(sliding window)和会话窗口(session window)。

而后面调用.apply()可以看作实现了一个特殊的窗口函数。注意这里只能调用.apply(),没有其他替代的方法。

传入的 JoinFunction 也是一个函数类接口,使用时需要实现内部的.join()方法。这个方法有两个参数,分别表示两条流中成对匹配的数据

public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable { OUT join(IN1 first, IN2 second) throws Exception;
}

这里需要注意,JoinFunciton 并不是真正的“窗口函数”,它只是定义了窗口函数在调用时对匹配数据的具体处理逻辑。

当然,既然是窗口计算,在.window()和.apply()之间也可以调用可选 API 去做一些自定义, 比如用.trigger()定义触发器,用.allowedLateness()定义允许延迟时间,等等

2、窗口联结的处理流程

JoinFunction 中的两个参数,分别代表了两条流中的匹配的数据。这里就会有一个问题: 什么时候就会匹配好数据,调用.join()方法呢?接下来我们就来介绍一下窗口 join 的具体处理流程。

两条流的数据到来之后,首先会按照 key 分组、进入对应的窗口中存储;当到达窗口结束时间时,算子会先统计出窗口内两条流的数据的所有组合,也就是对两条流中的数据做一个笛卡尔积(相当于表的交叉连接,cross join),然后进行遍历,把每一对匹配的数据,作为参数

(first,second)传入 JoinFunction 的.join()方法进行计算处理,得到的结果直接输出如图 8-8 所示。所以窗口中每有一对数据成功联结匹配,JoinFunction 的.join()方法就会被调用一次,并输出一个结果。

image-20230408102740711

3、窗口联结实例

在电商网站中,往往需要统计用户不同行为之间的转化,这就需要对不同的行为数据流, 按照用户 ID 进行分组后再合并,以分析它们之间的关联。如果这些是以固定时间周期(比如1 小时)来统计的,那我们就可以使用窗口 join 来实现这样的需求

代码略

9.3.2 间隔联结

在有些场景下,我们要处理的时间间隔可能并不是固定的。比如,在交易系统中,需要实时地对每一笔交易进行核验,保证两个账户转入转出数额相等,也就是所谓的“实时对账”。两次转账的数据可能写入了不同的日志流,它们的时间戳应该相差不大,所以我们可以考虑只统计一段时间内是否有出账入账的数据匹配。这时显然不应该用滚动窗口或滑动窗口来处理——因为匹配的两个数据有可能刚好“卡在”窗口边缘两侧,于是窗口内就都没有匹配了;会话窗口虽然时间不固定,但也明显不适合这个场景。 基于时间的窗口联结已经无能为力了。

为了应对这样的需求,Flink 提供了一种叫作“间隔联结”(interval join)的合流操作。顾名思义,间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔, 看这期间是否有来自另一条流的数据匹配。

1、间隔联结的原理

间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作 A)中的任意一个数据元素 a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以 a 的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流(不妨叫 B)中的数据元素 b,如果它的时间戳落在了这个区间范围内,a 和 b 就可以成功配对,进而进行计算输出结果。所以匹配的条件为:

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

这里需要注意,做间隔联结的两条流 A 和 B,也必须基于相同的 key;下界 lowerBound

应该小于等于上界 upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。

image-20230408103926988

下方的流 A 去间隔联结上方的流 B,所以基于 A 的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2 毫秒,上界为 1 毫秒。于是对于时间戳为 2 的 A 中元素,它的可匹配区间就是[0, 3],流 B 中有时间戳为 0、1 的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A 中时间戳为 3 的元素,可匹配区间为[1, 4],B 中只有时间戳为 1 的一个数据可以匹配,于是得到匹配数据对(3, 1)。

所以我们可以看到,间隔联结同样是一种内连接(inner join)。与窗口联结不同的是,interval

join 做匹配的时间段是基于流中数据的,所以并不确定;而且流 B 中的数据可以不只在一个区间内被匹配。

2、间隔联结的调用
stream1
.keyBy(<KeySelector>)
.intervalJoin(stream2.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction());
3、间隔联结实例

在电商网站中,某些用户行为往往会有短时间内的强关联。我们这里举一个例子,我们有两条流,一条是下订单的流,一条是浏览数据的流。我们可以针对同一个用户,来做这样一个联结。也就是使用一个用户的下订单的事件和这个用户的最近十分钟的浏览数据进行一个联结查询

代码略

9.3.3 窗口同组联结

除窗口联结和间隔联结之外,Flink 还提供了一个“窗口同组联结”(window coGroup)操作。它的用法跟 window join 非常类似,也是将两条流合并之后开窗处理匹配的元素,调用时只需要将.join()换为.coGroup()就可以了.

内部的.coGroup()方法,有些类似于 FlatJoinFunction 中.join()的形式,同样有三个参数, 分别代表两条流中的数据以及用于输出的收集器(Collector)。不同的是,这里的前两个参数不再是单独的每一组“配对”数据了,而是传入了可遍历的数据集合。也就是说,现在不会再去计算窗口中两条流数据集的笛卡尔积,而是直接把收集到的所有数据一次性传入,至于要怎样配对完全是自定义的。这样.coGroup()方法只会被调用一次,而且即使一条流的数据没有任何另一条流的数据匹配,也可以出现在集合中、当然也可以定义输出结果了。

所以能够看出,coGroup 操作比窗口的 join 更加通用,不仅可以实现类似 SQL 中的“内连接”(inner join),也可以实现左外连接(left outer join)、右外连接(right outer join)和全外连接(full outer join)。事实上,窗口 join 的底层,也是通过 coGroup 来实现的

代码略

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/942540.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

​LeetCode解法汇总57. 插入区间

目录链接&#xff1a; 力扣编程题-解法汇总_分享记录-CSDN博客 GitHub同步刷题项目&#xff1a; https://github.com/September26/java-algorithms 原题链接&#xff1a;力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 描述&#xff1a; 给你一个 …

python web GUI框架-NiceGUI 教程(一)

python web GUI框架-NiceGUI 教程&#xff08;一&#xff09; streamlit可以在一些简单的场景下仍然推荐使用&#xff0c;但是streamlit实在不灵活&#xff0c;受限于它的核心机制&#xff0c;NiceGUI是一个灵活的web框架&#xff0c;可以做web网站也可以打包成独立的exe。 基…

科研小工具|心输出量(超声)(cardiac output,CO)

​ 简介 心输出量&#xff08;cardiac output&#xff0c;CO&#xff09;是指左或右心室每分钟泵出的血液量。即心率与每搏出量的乘积。如心率以75次/分钟计算&#xff0c;则心排出量在男性为5~6L&#xff0c;女性略低些。心排出量随着机体代谢和活动情况而变化。在肌肉运动、…

783页19万字行政服务中心一网通办政务服务应用平台建设方案

导读&#xff1a;原文《783页19万字行政服务中心一网通办政务服务应用平台建设方案》&#xff08;获取来源见文尾&#xff09;&#xff0c;本文精选其中精华及架构部分&#xff0c;逻辑清晰、内容完整&#xff0c;为快速形成售前方案提供参考。以下是部分内容&#xff0c; 第三…

阿里巴巴FastJson包的使用心得

阿里巴巴FastJson包的使用心得 1.FastJson简介2.FastJson特性3.引入FastJson4.FastJson中的一些对象&#xff08;1&#xff09;JSONObject&#xff08;2&#xff09;JSONArray&#xff08;3&#xff09;SerializeWriter 4.FastJson中的一些操作&#xff08;1&#xff09; 将Jav…

配置uniapp调试环境

目录 uni-app介绍 uni-app开发工具HBuilderX 创建项目前提条件 uni-app项目结构 配置mumu模拟器 uni-app生命周期 1.应用生命周期 小程序规范 2.页面生命周期-小程序规范 3.组件生命周期 vue规范 uni-app登录按钮方法 uni-app发布安卓app uni-app介绍 uni-app 是一个…

深入理解ArrayList的动态扩容机制及应用

在java编程中&#xff0c;数据结构起着至关重要的作用&#xff0c;而ArrayList作为一种常用的动态数组&#xff0c;为我们在处理数据时提供了便利。其中&#xff0c;其独特的动态扩容机制更是为其赢得了广泛的应用。我们不管在工作还是面试中&#xff0c;都会遇到ArrayList&…

Docker拉取RocketMQ及可视化界面

本文介绍Docker拉取RocketMQ及可视化界面操作步骤 Linux下安装Docker请参考&#xff1a;Linux安装Docker 文章目录 安装namesrv创建挂载目录授权相关权限拉取镜像运行容器查看运行情况 安装Broker创建挂载目录及配置文件目录授权相关权限创建配置文件运行容器查看运行情况 安装…

网络编程套接字(3): 简单的TCP网络程序

文章目录 网络编程套接字(3)4. 简单的TCP网络程序4.1 服务端创建(1) 创建套接字(2) 绑定端口(3) 监听(4) 获取新连接(5) 处理读取与写入 4.2 客户端创建(1)连接服务器 4.3 代码编写(1) v1__简单发送消息(2) v2_多进程版本(3) v3_多线程版本(4) v4_线程池版本 网络编程套接字(3)…

博客系统后端(项目系列2)

目录 前言 &#xff1a; 1.准备工作 1.1创建项目 1.2引入依赖 1.3创建必要的目录 2.数据库设计 2.1博客数据 2.2用户数据 3.封装数据库 3.1封装数据库的连接操作 3.2创建两个表对应的实体类 3.3封装一些必要的增删改查操作 4.前后端交互逻辑的实现 4.1博客列表页 …

中国建筑出版传媒许少辉博士八一新书乡村振兴战略下传统村落文化旅游设计日京东当当畅销榜自由营九三学

中国建筑出版传媒许少辉博士八一新书乡村振兴战略下传统村落文化旅游设计日京东当当畅销榜自由营九三学

大数据Flink(六十九):SQL 数据类型

文章目录 SQL 数据类型 一、原子数据类型 二、​​​​​​复合数据类型 SQL 数据类型 在介绍完一些基本概念之后,我们来认识一下

二级MySQL(二)——编程语言,函数

SQL语言又称为【结构化查询语言】 请使用FLOOR&#xff08;x&#xff09;函数求小于或等于5.6的最大整数 请使用TRUNCATE&#xff08;x&#xff0c;y&#xff09;函数将数字1.98752895保留到小数点后4位 请使用UPPER&#xff08;&#xff09;函数将字符串‘welcome’转化为大写…

SOLIDWORKS工程图转DWG图层映射技巧

DWG格式的图纸在工程制图中有着非常重要的地位&#xff0c;工程实践中常常就需要将SOLIDWORKS工程图进行转换。对于两者之间数据衔接的妥善处理&#xff0c;是提升工作效率的有效手段。基于此目的&#xff0c;本次我们将介绍数据衔接的一个有效解决方案&#xff1a;图层数据的映…

中国建筑出版传媒许少辉八一新书乡村振兴战略下传统村落文化旅游设计日

中国建筑出版传媒许少辉八一新书乡村振兴战略下传统村落文化旅游设计日

Jmeter(二十九):Jmeter常用场景梳理

一、每秒钟固定调用次数 如果想控制每秒发送请求数量,仅仅通过线程数与循环次数是不够的,因为这只能控制发送总数,而要控制每秒发送数量,需要线程数与常数吞吐量控制器的搭配使用,这种场景在性能测试中使用不多。 例如每秒钟调用30次接口,那么把线程数设置为30,将常数…

c# modbus CRC计算器(查表法)

一、简介&#xff1a; 本案例为crc计算器&#xff0c;通过查表法计算出结果 1.窗体后台源代码 using Crc; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text…

Matlab图像处理-除法运算

除法运算 除法运算可用于去除数字化器灵敏度随空间变化造成的影响。图像的除法运算给出的是两幅图像相应像素值的变化比率&#xff0c;而不是每个像素的绝对差异&#xff0c;因而图像除法也称为比率变换&#xff0c;常用于校正成像设备的非线性影响。 在 MATLAB图像处理工具箱…

IDEA集成Git相关操作知识(pull、push、clone)

一&#xff1a;集成git 1&#xff1a;初始化git&#xff08;新版本默认初始化&#xff09; 老版本若没有&#xff0c;点击VCS&#xff0c;选中import into Version Controller中的Create git Repository(创建git仓库)&#xff0c;同理即可出现git符号。 也可查看源文件夹有没有…

IT和OT:如何弥合差距?

两则企业故事 我们曾经碰到一家食品工厂由于订单数据转换不正确&#xff0c;导致加载的产品不正确&#xff0c;将错误的液体装满了卡车。因为IT和OT不能很好地协同工作。因此&#xff0c;它必须被抽回去&#xff0c;导致大量的时间和金钱损失。将新的业务系统集成到整个IT环境…