Flink多流转换(1)—— 分流合流

news2024/9/29 11:22:13

目录

分流

代码示例

使用侧输出流

合流

联合(Union)

连接(Connect)


简单划分的话,多流转换可以分为“分流”和“合流”两大类

目前分流的操作一般是通过侧输出流(side output)来实现,而合流的算子比较丰富,根据不同的需求可以调用 union、connect、join 以及 coGroup 等接口进行连接合并操作

分流

将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,得到完全平等的多个子DataStream

代码示例

调用.filter()方法进行筛选,将符合条件的数据拣选出来放到对应的流里

public class SplitStreamByFilter {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());
        
        // 筛选Mary的浏览行为放入MaryStream流中
        DataStream<Event> MaryStream = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
                return value.user.equals("Mary");
            }
        });
        
        // 筛选Bob的购买行为放入BobStream流中
        DataStream<Event> BobStream = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
                return value.user.equals("Bob");
            }
        });
        
        // 筛选其他人的浏览行为放入elseStream流中
        DataStream<Event> elseStream = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
                return !value.user.equals("Mary") && !value.user.equals("Bob") ;
            }
        });

        MaryStream.print("Mary pv");
        BobStream.print("Bob pv");
        elseStream.print("else pv");
        
        env.execute();
    }
}

缺点:上述操作将原始流复制了三份,对每一份分别进行筛选,因此代码冗余,不够高效

解决:①.split()方法(但限制了数据类型转换,已经废弃)

②测输出流

使用侧输出流

改进后的代码如下:

public class SplitStreamByOutputTag {
    // 定义输出标签,侧输出流的数据类型为三元组(user, url, timestamp)
    private static OutputTag<Tuple3<String, String, Long>> MaryTag = new OutputTag<Tuple3<String, String, Long>>("Mary-pv"){};
    private static OutputTag<Tuple3<String, String, Long>> BobTag = new OutputTag<Tuple3<String, String, Long>>("Bob-pv"){};
    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());

        SingleOutputStreamOperator<Event> processedStream = stream.process(new ProcessFunction<Event, Event>() {
            @Override
            public void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {
                if (value.user.equals("Mary")){
                    ctx.output(MaryTag, new Tuple3<>(value.user, value.url, value.timestamp));
                } else if (value.user.equals("Bob")){
                    ctx.output(BobTag, new Tuple3<>(value.user, value.url, value.timestamp));
                } else {
                    out.collect(value);
                }
            }
        });

        processedStream.getSideOutput(MaryTag).print("Mary pv");
        processedStream.getSideOutput(BobTag).print("Bob pv");
        processedStream.print("else");

        env.execute();
    }
}

①定义OutputTag作为标签

②使用ctx.output方法将符合筛选条件的数据写入侧输出流

③使用getSideOutput方法从侧输出流中获得数据

合流

对于来源不同的多条流中的数据进行联合处理(与分流相比,合流操作更为普遍)

联合(Union)

直接将多条流合在一起(要求必须流中的数据类型必须相同),合并之后的新流会包括所有流中的元素,数据类型不变

操作:基于 DataStream 直接调用.union()方法

参数:其他 DataStream

返回值:一个 DataStream

stream1.union(stream2, stream3, ...)

水位线时效性:多流合并时处理的时效性是以最慢的那个流为准的(多条流的合并,某种意义上也可以看作是多个并行任务向同一个下游任务汇合的过程)

代码示例:

public class UnionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);



        SingleOutputStreamOperator<Event> stream1 = env.socketTextStream("hadoop102", 7777)
                .map(data -> {
                    String[] field = data.split(",");
                    return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );
        stream1.print("stream1");

        SingleOutputStreamOperator<Event> stream2 = env.socketTextStream("hadoop103", 7777)
                .map(data -> {
                    String[] field = data.split(",");
                    return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        stream2.print("stream2");

        // 合并两条流
        stream1.union(stream2)
                .process(new ProcessFunction<Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        out.collect("水位线:" + ctx.timerService().currentWatermark());
                    }
                })
                .print();


        env.execute();
    }
}

测试:

分别在两台机器上输入以下数据:

hadoop102 :Alice, ./home, 1000
hadoop103 :Alice, ./home, 2000
hadoop102 :Alice, ./home, 3000

水位线的推进如下:

连接(Connect)

连接操作允许流的数据类型不同

连接流(ConnectedStreams)

连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的

要想得到新的 DataStream,还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型

代码实现

①基于一条 DataStream 调用.connect()方法,传入另外一条 DataStream 作为参数,将两条流连接起来,得到一个 ConnectedStreams

②调用同处理方法得到 DataStream(可以的调用的同处理方法有.map()/.flatMap(),以及.process()方法)

public class ConnectTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Integer> stream1 = env.fromElements(1,2,3);
        DataStream<Long> stream2 = env.fromElements(1L,2L,3L);

        ConnectedStreams<Integer, Long> connectedStreams = stream1.connect(stream2);
        SingleOutputStreamOperator<String> result = connectedStreams.map(new CoMapFunction<Integer, Long, String>() {
            @Override
            public String map1(Integer value) {
                return "Integer: " + value;
            }

            @Override
            public String map2(Long value) {
                return "Long: " + value;
            }
        });

        result.print();

        env.execute();
    }
}

①ConnectedStreams有两个类型参数,分别是stream1和stream2的类型;

②map方法中实现了一个CoMapFunction,表示分别对两条流中的数据执行 map 操作

类型参数<IN1, IN2, OUT>,分别表示第一条流、第二条流,以及合并后的流中的数据类型

这里我们将一条 Integer 流和一条 Long 流合并,转换成 String 输出。所以当遇到第一条流输入的整型值时,调用.map1();而遇到第二条流输入的长整型数据时,调用.map2():最终都转换为字符串输出,合并成了一条字符串流

③补充:ConnectedStreams 也可以直接调用.keyBy()进行按键分区的操作,得到的还是一个 ConnectedStreams

connectedStreams.keyBy(keySelector1, keySelector2);

传入的参数是两条流中各自的键选择器

这样的操作就是把两条流中key相同的数据放到了一起,然后针对来源的流各自进行处理;

同样也可以在合并之前先使用KeyBy进行分区,然后基于两条KeyedStream进行连接操作;

要注意两条流定义的键的类型必须相同,否则会抛出异常


CoProcessFunction

对于连接流 ConnectedStreams 的处理操作,需要分别定义对两条流的处理转换,因此接口中就会有两个相同的方法需要实现,用数字“1”“2”区分,在两条流中的数据到来时分别调用。我们把这种接口叫作“协同处理函数”(co-process function)

例如:CoMapFunction、CoFlatMapFunction、CoProcessFunction

CoProcessFunction源码如下:

public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {
...
public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;

public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}

public abstract class Context {...}
...
}

简单示例:实现一个实时对账的需求,也就是app 的支付操作和第三方的支付操作的一个双流 Join。App 的支付事件和第三方的支付事件将会互相等待 5 秒钟,如果等不来对应的支付事件,那么就输出报警信息

// 实时对账
public class BillCheckExample {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 来自app的支付日志
        SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream = env.fromElements(
                Tuple3.of("order-1", "app", 1000L),
                Tuple3.of("order-2", "app", 2000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
                        return element.f2;
                    }
                })
        );

        // 来自第三方支付平台的支付日志
        SingleOutputStreamOperator<Tuple4<String, String, String, Long>> thirdpartStream = env.fromElements(
                Tuple4.of("order-1", "third-party", "success", 3000L),
                Tuple4.of("order-3", "third-party", "success", 4000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<String, String, String, Long>>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String, String, String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple4<String, String, String, Long> element, long recordTimestamp) {
                        return element.f3;
                    }
                })
        );

        // 检测同一支付单在两条流中是否匹配,不匹配就报警
        appStream.connect(thirdpartStream)
                .keyBy(data -> data.f0, data -> data.f0)
                .process(new OrderMatchResult())
                .print();

        env.execute();
    }

    // 自定义实现CoProcessFunction
    public static class OrderMatchResult extends CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>{
        // 定义状态变量,用来保存已经到达的事件
        private ValueState<Tuple3<String, String, Long>> appEventState;
        private ValueState<Tuple4<String, String, String, Long>> thirdPartyEventState;

        @Override
        public void open(Configuration parameters) throws Exception {
            appEventState = getRuntimeContext().getState(
                    new ValueStateDescriptor<Tuple3<String, String, Long>>("app-event", Types.TUPLE(Types.STRING, Types.STRING, Types.LONG))
            );

            thirdPartyEventState = getRuntimeContext().getState(
                    new ValueStateDescriptor<Tuple4<String, String, String, Long>>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG))
            );
        }

        @Override
        public void processElement1(Tuple3<String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
            // 看另一条流中事件是否来过
            if (thirdPartyEventState.value() != null){
                out.collect("对账成功:" + value + "  " + thirdPartyEventState.value());
                // 清空状态
                thirdPartyEventState.clear();
            } else {
                // 更新状态
                appEventState.update(value);
                // 注册一个5秒后的定时器,开始等待另一条流的事件
                ctx.timerService().registerEventTimeTimer(value.f2 + 5000L);
            }
        }

        @Override
        public void processElement2(Tuple4<String, String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
            if (appEventState.value() != null){
                out.collect("对账成功:" + appEventState.value() + "  " + value);
                // 清空状态
                appEventState.clear();
            } else {
                // 更新状态
                thirdPartyEventState.update(value);
                // 注册一个5秒后的定时器,开始等待另一条流的事件
                ctx.timerService().registerEventTimeTimer(value.f3 + 5000L);
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            // 定时器触发,判断状态,如果某个状态不为空,说明另一条流中事件没来
            if (appEventState.value() != null) {
                out.collect("对账失败:" + appEventState.value() + "  " + "第三方支付平台信息未到");
            }
            if (thirdPartyEventState.value() != null) {
                out.collect("对账失败:" + thirdPartyEventState.value() + "  " + "app信息未到");
            }
            appEventState.clear();
            thirdPartyEventState.clear();
        }
    }}

运行结果如下:

运行结果解析:
①在CoProcessFunction的实现中,声明了两个状态变量用来保存App的支付信息和第三方的支付信息

②App支付信息到达之后,触发processElement1中的操作,检查第三方的支付信息是否已经到达(如果先到达会保存在相应的状态变量中);如果已经到达,则对账成功;如果没有到达,则等待5s,仍未到达则对账失败;

③第三方支付信息到达后,流程同②

④对于order-1,时间戳为1000的数据(App)到达后,第三方支付信息未到达,等待5s,接着时间戳未3000的数据(第三方)到达后,发现App支付信息已经到达,因此对账成功

⑤对于order-2和order-3,均是等待5s后没有检测到App(第三方)数据到达而发出报警信息

广播连接流(BroadcastConnectedStream)

DataStream 调用.connect()方法时可以传入一个广播流(BroadcastConnectedStream)

这种连接方式往往用在需要动态定义某些规则或配置的场景。因为规则是实时变动的,所以我们可以用一个单独的流来获取规则数据;而这些规则或配置是对整个应用全局有效的,所以不能只把这数据传递给一个下游并行子任务处理,而是要“广播”(broadcast)给所有的并行子任务。而下游子任务收到广播出来的规则,会把它保存成一个状态,这就是所谓的“广播状态”(broadcast state)

如何创建广播流


基于DataStream调用.broadcast()方法,传入一个“映射状态描述器”(MapStateDescriptor),说明状态的名称和类型;

因为广播状态底层是用一个“映射”(map)结构来保存的

MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);

数据流和广播流的连接

得到“广播连接流”(BroadcastConnectedStream),然后基于广播连接流调用.process()方法,就可以同时获取规则和数据,进行动态处理

DataStream<String> output = stream
 .connect(ruleBroadcastStream)
 .process( new BroadcastProcessFunction<>() {...} );
BroadcastProcessFunction

BroadcastProcessFunction 与 CoProcessFunction 类似,同样是一个抽象类,需要实现两个方法,针对合并的两条流中元素分别定义处理操作。区别在于这里一条流是正常处理数据,而另一条流则是要用新规则来更新广播状态,所以对应的两个方法叫作.processElement().processBroadcastElement()

学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1407568.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【后端技术】术有千法,道本归一

目录 1.概述 2.机器的问题 2.1.计算 2.2.存储 2.3.传输 3.人的问题 3.1.代码工程的管理 3.2.过程的把控 4.总结 1.概述 术有千法&#xff0c;道本归一。 之所以这样说&#xff0c;是因为当前出现的纷繁复杂的后端技术&#xff0c;其本质其实都是为了解决同一套问题。…

蓝桥杯题目解析 --矩形切割

先看题&#xff1a; 题目中的例子解析&#xff1a; 5*3 切一刀最大的&#xff0c;肯定是3*3&#xff0c;切完后只剩2*3&#xff0c;切一刀最大的&#xff0c;肯定是2*2&#xff0c;切完后只剩2*1&#xff0c;切一刀最大的&#xff0c;肯定是1*1&#xff0c;切完后只剩1*1&…

浅谈手机APP测试(流程)

前言 APP测试是一个广泛的概念&#xff0c;根据每个app的应用场景不一样&#xff0c;测试的方向也略微的不同&#xff0c;在测试过程中需要灵活应用自身所知的测试手段。 今天就跟大家简单聊聊手机APP测试的一些相关内容。 APP开发流程 &#xff08;1&#xff09; 拿到需求分…

【STM32】USB程序烧录需要重新上电 软件复位方法

文章目录 一、问题二、解决思路2.1 直接插拔USB2.2 给芯片复位 三、解决方法3.1 别人的解决方法3.2 在下载界面进行设置 一、问题 最近学习STM32的USB功能&#xff0c;主要是想要使用虚拟串口功能&#xff08;VCP&#xff09;&#xff0c;发现每次烧录之后都需要重新上电才可以…

Parade Series - Android Studio

硬件支持 CPU i7 RAM 16Gb -------------- ------- Java 3Gb Android 33GbJava Enviroment C:\ ├─ Java │ ├─ jdk1.8.0_181 │ ├─ jre1.8.0_181 │ ├─ maven-3.8.5 │ └─ gradle-6.5 └─ Cache├─ gr…

Python实现力扣经典面试题——删除有序数组中的重复项

题目&#xff1a;删除有序数组中的重复项 给你一个 非严格递增排列 的数组 nums &#xff0c;请你 原地 删除重复出现的元素&#xff0c;使每个元素 只出现一次 &#xff0c;返回删除后数组的新长度。元素的 相对顺序 应该保持 一致 。然后返回 nums 中唯一元素的个数。考虑 nu…

在Word中插入高亮/好看代码

Md2All 一个markdown工具 Md2All 网址 代码一定要用code 高亮主题可选 atom-one-light > 复制到word > 调节字体可选Cnsolas, 间距等 效果 另一个高亮工具 效果

算法/结构/理论复习1---理论基础----更新中

算法/结构/理论 雪花算法CAP理论BASE理论分布式事务的解决方案数据结构树(Tree)二叉树二叉查找树平衡查找树红黑树(重点)BTree(重点)BTree 雪花算法 雪花算法主要是为了解决在分布式中id的生成问题 分布式id的生成规则是:全局唯一,不可以出现重复的id号,趋势递增 雪花算法指的…

ITK编译及安装

文章目录 前言CMake配置选项说明运行VS2015编译及安装VTK转ITKITK转VTK参考文献 最近想利用ITK读取整个Dicom图像到内存&#xff0c;再将读取到的ITK数据转换到VTK。于是乎&#xff0c;开始了一段ITK编译之路。以下将记录一些有用的信息&#xff0c;以备后用。 前言 DICOM图像…

Spring扩展点在微服务应用(待完善)

ApplicationListener扩展 nacos注册服务&#xff0c; 监听容器发布事件 # 容器发布事件 AbstractAutoServiceRegistration#onApplicationEvent # 接收事件吗&#xff0c;注册服务到nacos NacosServiceRegistry#register Lifecycle扩展 #订阅服务实例更改的事件 NamingService#…

超实用桌面助手!时间、日期、天气,一目了然!完全免费!

文章目录 &#x1f4d6; 介绍 &#x1f4d6;&#x1f3e1; 环境 &#x1f3e1;&#x1f4d2; 使用方法 &#x1f4d2;⚓️ 相关链接 ⚓️ &#x1f4d6; 介绍 &#x1f4d6; 这是一款我根据自己的需求写的一个桌面小工具&#xff0c;自己一直在用&#xff0c;现在分享给需要的朋…

RabbitMQ发布确认

生产者将信道设置成 confirm 模式&#xff0c;一旦信道进入 confirm 模式&#xff0c; 所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始)&#xff0c;一旦消息被投递到所有匹配的队列之后&#xff0c; broker就会发送一个确认给生产者(包含消息的唯一 ID)&…

【每日一题】最长交替子数组

文章目录 Tag题目来源解题思路方法一&#xff1a;双层循环方法二&#xff1a;单层循环 写在最后 Tag 【双层循环】【单层循环】【数组】【2024-01-23】 题目来源 2765. 最长交替子数组 解题思路 两个方法&#xff0c;一个是双层循环&#xff0c;一个是单层循环。 方法一&am…

Likeshop单商户SaaS商城源码系统-商家用过都说太香啦!

在互联网快速发展的时代&#xff0c;拥有一个个性化、功能丰富的在线商城是企业拓展市场、提高用户粘性的重要手段。 我是一名电商从业者&#xff0c;同时也是一个热衷于DIY的人&#xff0c;我总喜欢在自己的店铺中加入自己的一些想法和创意。然而&#xff0c;一般的电商平台无…

【思路合集】talking head generation+stable diffusion

1 以DiffusionVideoEditing为baseline&#xff1a; 改进方向 针对于自回归训练方式可能导致的漂移问题&#xff1a; 训练时&#xff0c;在前一帧上引入小量的面部扭曲&#xff0c;模拟在生成过程中自然发生的扭曲。促使模型查看身份帧以进行修正。在像VoxCeleb或LRS这样的具…

EasyX的安装与使用(VisualStudio C++免费绘图库)

EasyX Graphics Library 是针对 Visual C 的免费绘图库 安装教程 安装到Visual C 2010 EasyX 安装完毕。 在VC2010中建立控制台工程 工程建好后&#xff0c;鼠标右键点击工程名&#xff0c;并选择属性 安装到Visual C 2010 EasyX 安装完毕。 安装示例程序 easyxdemo.cpp 在VC…

Vulnhub-dc4

靶场下载 https://download.vulnhub.com/dc/DC-4.zip 信息收集 判断目标靶机的存活地址: # nmap -sT --min-rate 10000 -p- 192.168.1.91 -oN port.nmap Starting Nmap 7.94 ( https://nmap.org ) at 2024-01-21 16:36 CST Stats: 0:00:03 elapsed; 0 hosts completed (1 up…

机器学习 | 掌握Matplotlib的可视化图表操作

Matplotlib是python的一个数据可视化库&#xff0c;用于创建静态、动态和交互式图表。它可以制作多种类型的图表&#xff0c;如折线图、散点图、柱状图、饼图、直方图、3D 图形等。以渐进、交互式方式实现数据可视化。当然博主也不能面面俱到的讲解到所有内容&#xff0c;详情请…

装完32G内存条 电脑飞跃提升!

我是南城余&#xff01;阿里云开发者平台专家博士证书获得者&#xff01; 欢迎关注我的博客&#xff01;一同成长&#xff01; 一名从事运维开发的worker&#xff0c;记录分享学习。 专注于AI&#xff0c;运维开发&#xff0c;windows Linux 系统领域的分享&#xff01; 大家…

如何使用阿里云CDN服务?

如何使用阿里云CDN服务 一、开通阿里云CDN服务 注册自己阿里云账号&#xff0c;找到CDN服务&#xff0c;进行加速即可 二、配置域名信息 1、各配置参数的含义 添加加速域名&#xff1a; 如果需要使用CDN加速指定网站上的业务&#xff0c;则需要将该网站作为源站&#xff0…