【API篇】五、Flink分流合流API

news2024/11/16 1:44:28

文章目录

  • 1、filter算子实现分流
  • 2、分流:使用侧输出流
  • 3、合流:union
  • 4、合流:connect
  • 5、connect案例

分流,很形象的一个词,就像一条大河,遇到岸边有分叉的,而形成了主流和测流。对于数据流也一样,不过是一个个水滴替换成了一条条数据。

在这里插入图片描述

将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

在这里插入图片描述

1、filter算子实现分流

Demo案例:读取一个整数数字流,将数据流划分为奇数流和偶数流。

实现思路:针对同一个流,多次条用filter算子来拆分

public class SplitStreamByFilter {

    public static void main(String[] args) throws Exception {

        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
        SingleOutputStreamOperator<Integer> ds = env.socketTextStream("node01", 9527)
                                                    .map(Integer::valueOf);
                                                    
        //将ds 分为两个流 ,一个是奇数流,一个是偶数流
        //使用filter 过滤两次
        SingleOutputStreamOperator<Integer> ds1 = ds.filter(x -> x % 2 == 0);
        SingleOutputStreamOperator<Integer> ds2 = ds.filter(x -> x % 2 == 1);

        ds1.print("偶数");
        ds2.print("奇数");
        
        env.execute();
    }
}

以上实现的明显缺陷是,同一条数据,被多次处理。以上其实是将原始数据流stream复制两份,然后对每一份分别做筛选,冗余且低效。

2、分流:使用侧输出流

基本步骤为:

  • 使用process算子(Flink分层API中的最底层的处理函数)
  • 定义OutputTag对象,即输出标签对象,用于后面标记和提取侧流
  • 调用上下文ctx的.output()方法
  • 通过主流获取侧流
案例:实现将WaterSensor按照Id类型进行分流

先定义下MapFunction的转换规则,用来将输入的数据转为自定义的WaterSensor对象:

public class WaterSensorMapFunction implements MapFunction<StringWaterSensor>{
	
	@Override
	public WaterSensor map(String value) throws Exception {
	
		String[] strArr = value.split( regex: ",");
		
		//String组装对象
		return new WaterSensor(strArr[0],Long.value0f(strArr[1]),Integer.value0f(strArr[2]));
	}
}

使用侧流:

public class SplitStreamByOutputTag {    
	public static void main(String[] args) throws Exception {
	
	        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	
	        SingleOutputStreamOperator<WaterSensor> ds = env.socketTextStream("node01", 9527)
	              											.map(new WaterSensorMapFunction());
	
			//定义两个输出标签对象,用于后面标记和提取侧流
	        OutputTag<WaterSensor> s1 = new OutputTag<>("s1", Types.POJO(WaterSensor.class));
	        OutputTag<WaterSensor> s2 = new OutputTag<>("s2", Types.POJO(WaterSensor.class));
	        
	       //返回的都是主流
	        SingleOutputStreamOperator<WaterSensor> ds1 = ds.process(new ProcessFunction<WaterSensor, WaterSensor>()
	        {
	            @Override
	            //形参为别为:流中的一条数据、上下文对象、收集器
	            public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
					
	                if ("s1".equals(value.getId())) {
	                    ctx.output(s1, value);
	                } else if ("s2".equals(value.getId())) {
	                    ctx.output(s2, value);
	                } else {
	                    //主流
	                    out.collect(value);
	                }
	
	            }
	        });
	
	        ds1.print("主流");
	        SideOutputDataStream<WaterSensor> s1DS = ds1.getSideOutput(s1);
	        SideOutputDataStream<WaterSensor> s2DS = ds1.getSideOutput(s2);
	
	        s1DS.printToErr("侧流s1");  //区别主流,让控制台输出标红
	        s2DS.printToErr("侧流s2");
	        
	        env.execute();
	 
	}
}

相关传参说明,首先是创建OutputTag对象时的传参:

  • 第一个参数为标签名,用于区分是哪一个侧流
  • 第二个是放入侧流中的数据的类型,且必须是Flink的类型(TypeInfomation,借助Types类)
  • OutputTag的泛型,是流到对应的侧流的数据类型

ProcessFunction接口的泛型中:

  • 第一个是输入的数据类型
  • 第二个是输出到主流上的数据类型

ctx.output方法的形参:

  • 第一个为outputTag对象
  • 第二个为数据,上面代码中传value即直接输出数据本身,也可输出处理后的数据,主流侧流数据类型不用一致

看下运行效果:

在这里插入图片描述

3、合流:union

将来源不同的多条流,合并成一条来联合处理,即合流。最简单的合流操作,就是直接将多条流合在一起,叫作流的联合(union)

在这里插入图片描述

union的条件是:

  • 每条流中要合并的数据类型必须相同(原始不同,可先借助map,在union)
  • 合并之后的新流会包括所有流中的元素,数据类型不变
stream1.union(stream2, stream3, ...)  //可变长参数
public class UnionExample {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStreamSource<Integer> ds1 = env.fromElements(1, 2, 3);
        DataStreamSource<Integer> ds2 = env.fromElements(2, 2, 3);
        DataStreamSource<String> ds3 = env.fromElements("2", "2", "3");

        ds1.union(ds2,ds3.map(Integer::valueOf))
           .print();

        env.execute();
    }
}
//输出:
1
2
3
2
2
3
2
2
3

4、合流:connect

union合并流受限于数据类型,因此还有另一种合流操作:connect

在这里插入图片描述

public class ConnectDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);
        
        //Integer流
        SingleOutputStreamOperator<Integer> source1 = env.socketTextStream("node01", 9527)
                										 .map(i -> Integer.parseInt(i));
		
		//String流
        DataStreamSource<String> source2 = env.socketTextStream("node01", 2795);

        /**
         * 总结: 使用 connect 合流
         * 1、一次只能连接 2条流
         * 2、流的数据类型可以不一样
         * 3、 连接后可以调用 map、flatmap、process来处理,但是各处理各的
         */
        ConnectedStreams<Integer, String> connect = source1.connect(source2);

        SingleOutputStreamOperator<String> result = connect.map(new CoMapFunction<Integer, String, String>() {
            @Override
            public String map1(Integer value) throws Exception {
                return "来源于原source1流:" + value.toString();
            }

            @Override
            public String map2(String value) throws Exception {
                return "来源于原source2流:" + value;
            }
        });

        result.print();

        env.execute();    
	}
}

使用 connect 合流的总结:

  • 一次只能连接 2条流,因为connect返回的是一个ConnectedStreams对象,不再是DataStreamSource或其子类了
  • 两条流中的数据类型可以不一样
  • 连接后可以调用 map、flatmap、process来处理,但是各处理各的

以map为例,其形参是一个CoMapFuntion接口类型,泛型则分别是流1的数据类型、流2的数据类型、合并及处理后输出的数据类型。两个map方法可以看出,虽然两个流合并成一个了,但处理数据时还是各玩各的。

  • .map1()就是对第一条流中数据的map操作
  • .map2()则是针对第二条流

在这里插入图片描述

connect 就类比被逼相亲后结婚,两个人看似成一家了,但实际上各自玩各自的。往大了举例就相当于一国两制。

5、connect案例

和connect以后的map传CoMapFunction一样,process算子也不再传ProcessFunction,而是CoProcessFunction,实现两个方法:

  • processElement1():针对第一条流
  • processElement2():针对第二条流

connect合并后得到的ConnectedStreams也可以直接调用.keyBy()进行按键分区,分区后返回的还是一个ConnectedStreams

connectedStreams.keyBy(keySelector1, keySelector2);
//keySelector1和keySelector2,是两条流中各自的键选择器

ConnectedStreams进行keyBy操作,其实就是把两条流中key相同的数据放到了一起,然后针对来源的流再做各自处理

案例需求:连接两条流,输出能根据id匹配上的数据,即两个流里元组f0相同的数据(类似inner join效果)
public class ConnectKeybyDemo {
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(2);
		
		//二元组流
        DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(
                Tuple2.of(1, "a1"),
                Tuple2.of(1, "a2"),
                Tuple2.of(2, "b"),
                Tuple2.of(3, "c")
        );
        //三元组流
        DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(
                Tuple3.of(1, "aa1", 1),
                Tuple3.of(1, "aa2", 2),
                Tuple3.of(2, "bb", 1),
                Tuple3.of(3, "cc", 1)
        );

        ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);

        // 多并行度下,需要根据 关联条件 进行keyby,才能保证key相同的数据到一起去,才能匹配上
        ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connectKey = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);

        SingleOutputStreamOperator<String> result = connectKey.process(
                new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {
                    // 定义 HashMap,缓存来过的数据,key=id,value=list<数据>
                    Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();
                    Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();

                    @Override
                    public void processElement1(Tuple2<Integer, String> value, Context ctx, Collector<String> out) throws Exception {
                        Integer id = value.f0;
                        // TODO 1.来过的s1数据,都存起来
                        if (!s1Cache.containsKey(id)) {
                            // 1.1 第一条数据,初始化 value的list,放入 hashmap
                            List<Tuple2<Integer, String>> s1Values = new ArrayList<>();
                            s1Values.add(value);
                            s1Cache.put(id, s1Values);
                        } else {
                            // 1.2 不是第一条,直接添加到 list中
                            s1Cache.get(id).add(value);
                        }

                        //TODO 2.根据id,查找s2的数据,只输出 匹配上 的数据
                        if (s2Cache.containsKey(id)) {
                            for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {
                                out.collect("s1:" + value + "<--------->s2:" + s2Element);
                            }
                        }
                    }

                    @Override
                    public void processElement2(Tuple3<Integer, String, Integer> value, Context ctx, Collector<String> out) throws Exception {
                        Integer id = value.f0;
                        // TODO 1.来过的s2数据,都存起来
                        if (!s2Cache.containsKey(id)) {
                            // 1.1 第一条数据,初始化 value的list,放入 hashmap
                            List<Tuple3<Integer, String, Integer>> s2Values = new ArrayList<>();
                            s2Values.add(value);
                            s2Cache.put(id, s2Values);
                        } else {
                            // 1.2 不是第一条,直接添加到 list中
                            s2Cache.get(id).add(value);
                        }

                        //TODO 2.根据id,查找s1的数据,只输出 匹配上 的数据
                        if (s1Cache.containsKey(id)) {
                            for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {
                                out.collect("s1:" + s1Element + "<--------->s2:" + value);
                            }
                        }
                    }
                });

        result.print();

        env.execute();
    }
}

运行效果:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1106912.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于白冠鸡优化的BP神经网络(分类应用) - 附代码

基于白冠鸡优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码 文章目录 基于白冠鸡优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码1.鸢尾花iris数据介绍2.数据集整理3.白冠鸡优化BP神经网络3.1 BP神经网络参数设置3.2 白冠鸡算法应用 4.测试结果&#x…

小型企业团队的理想项目管理软件解决方案

中小型企业对于项目管理软件的需求是什么&#xff1f;中小型企业在选择项目管理软件时有什么特别需要注意的吗&#xff1f;市面上哪些项目管理软件更适合中小型企业团队&#xff1f;本文为您解惑答疑&#xff01; 中小型企业的项目管理需求 在项目管理过程中&#xff0c;每个…

Godot 官方2D C#重构(1):雪花碰撞

前言 Godot 官方 教程 Godot 2d 官方案例C#重构 专栏 Godot 2d 重构 github地址 实现效果 难点介绍 Godot GDScript和C# 对应关系大部分靠猜 文件导入 资源地址&#xff1a;默认为res://开头2D贴图导入类型&#xff1a;Texture2D public Texture2D Bullet_Image new Textu…

【数据结构】排序算法的稳定性分析

&#x1f490; &#x1f338; &#x1f337; &#x1f340; &#x1f339; &#x1f33b; &#x1f33a; &#x1f341; &#x1f343; &#x1f342; &#x1f33f; &#x1f344;&#x1f35d; &#x1f35b; &#x1f364; &#x1f4c3;个人主页 &#xff1a;阿然成长日记 …

学会Docker之---应用场景和基本操作

实体机、VM和容器 实体机&#xff08;Physical Machine&#xff09;是指实际的物理设备&#xff0c;例如我们常见的计算机主机、服务器等。它们是由硬件组成&#xff0c;可以直接运行操作系统和应用程序。 虚拟机&#xff08;Virtual Machine&#xff09;是在一台物理机上通过…

Spring Boot 3.0 已经就绪,您准备好了么?

Java 微服务开发框架王者 Spring 2014 年的 4 月&#xff0c;Spring Boot 1.0.0 正式发布。距离 1.0 版本的发布已经过去了 9 年多的时间&#xff0c;如今 Spring Boot 已经被 Java 开发者广泛使用&#xff0c;正如 JRebel 的 2022 年开发者生产力报告中提到的那样&#xff0c…

apk反编译工具下载

1、jadx https://github.com/skylot/jadx 2、APK签名 https://developer.android.google.cn/studio/command-line/apksigner?hlzh-cn

jdbc设置StatementTimeout后还需要设置socket timeout参数吗

背景 我们设置JDBC参数时&#xff0c;不管有没有在Statement中配置超时时间StatementTimeout&#xff0c;我们都需要配置jdbc的socket timeout参数&#xff0c;那么为什么这个socket timeout参数如此必要&#xff0c;不设置又会怎么样&#xff1f; 问题真相 首先设置了State…

如何使用 MiniGPT-v2

MiniGPT-v2 是一个基于视觉语言模型&#xff08;LLM&#xff09;的多任务学习系统。它可以用于各种视觉语言任务&#xff0c;包括图像描述、图像识别、图像-文本对话等。 本文将介绍如何使用 MiniGPT-v2。 MiniGPT-v2 提供了一个简单的在线演示&#xff0c;可以用于测试模型。…

【AI视野·今日Robot 机器人论文速览 第五十六期】Tue, 17 Oct 2023

AI视野今日CS.Robotics 机器人学论文速览 Tue, 17 Oct 2023 Totally 60 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Robotics Papers Interactive Task Planning with Language Models Authors Boyi Li, Philipp Wu, Pieter Abbeel, Jitendra Malik交互式机器人…

51单片机仿真软件 Proteus 8 Pro 安装步骤

51单片机仿真软件 Proteus 8 Pro 安装步骤 学习 51 单片机的时候&#xff0c;如果手头没有开发板&#xff0c;可以使用仿真软件 Proteus。Proteus 可以仿真 51 单片机及周边元器件&#xff08;例&#xff1a; LED&#xff09; 的运行情况。 可以简单认为&#xff1a;Proteus …

C#字符串操作:拼接、截取、分割等高效处理方法

目录 1.前言2. 字符串拼接 (String Concatenation)3. 字符串截取 (String Substring)4. 字符串分割 (String Split)5. 字符串替换 (String Replace)6. 字符串大小写转换 (String Case Conversion)7. 结论 1.前言 在C#编程中&#xff0c;字符串操作是不可避免的一部分。无论是拼…

React之受控组件和非受控组件以及高阶组件

一、受控组件 受控组件&#xff0c;简单来讲&#xff0c;就是受我们控制的组件&#xff0c;组件的状态全程响应外部数据 举个简单的例子&#xff1a; class TestComponent extends React.Component {constructor (props) {super(props);this.state { username: lindaidai }…

最新百度统计配置图文教程,获取siteId、百度统计AccessToken、百度统计代码教程

一、前言 很多网友开发者都不知道百度统计siteId、百度统计token怎么获取&#xff0c;在网上找的教程都是几年前老的教程&#xff0c;因此给大家出一期详细百度统计siteId、百度统计token、百度统计代码获取详细步骤教程。 二、登录到百度统计 1.1 登录到百度统计官网 使用…

零基础学习HTML5(列表、表格、表单)

01-列表 作用&#xff1a;布局内容排列整齐的区域。 列表分类&#xff1a;无序列表、有序列表、定义列表。 无序列表 作用&#xff1a;布局排列整齐的不需要规定顺序的区域。 标签&#xff1a;ul 嵌套 li&#xff0c;ul 是无序列表&#xff0c;li 是列表条目。 <ul>…

2506. 统计相似字符串对的数目

2506. 统计相似字符串对的数目 js代码&#xff1a; /*** param {string[]} words* return {number}*/ var similarPairs function (words) {// 将字符串数组的每一项去重排序words words.map(item > [...new Set(item)].sort().join())let res 0for (let i 0; i < w…

lvgl模拟器添加图片,编译提示无法解析的外部符号

目录 一、1. v_img_set_src(obj, &img_cogwheel_argb);2. 二、1.2. 一、 1. v_img_set_src(obj, &img_cogwheel_argb); 编译一下&#xff0c;报以下错误 错误原因是img_cogwheel_argb.c 文件中的变量img_cogwheel_argb定义按C编译 const lv_img_dsc_t img_cogwhee…

docker入门加实战—部署Java和前端项目

docker入门加实战—部署Java和前端项目 部署之前&#xff0c;先删除nginx&#xff0c;和自己创建的dd两个容器&#xff1a; docker rm -f nginx dd部署Java项目 作为演示&#xff0c;我们的Java项目比较简单&#xff0c;提供了一个接口&#xff1a; 配置文件连接docker里的m…

自动切割短视频的软件推荐,一键生成1000条短视频,支持六大主流平台矩阵分发,快来免费试用

经过小编的多方测评&#xff0c;今天给大家推荐一款性价比、好评率、专业性全都超高的软件——超级编导批量剪辑软件&#xff0c;更重要的是这款软件支持免费试用&#xff0c;一起来看看超级编导如何帮助大家自动分割视频的吧。 复制视频链接&#xff0c;一键上传视频素材后&am…

JAVA基础(JAVA SE)学习笔记(四)IDEA安装、使用、设置、断点、乱码汇总

前言 1. 学习视频&#xff1a; 尚硅谷Java零基础全套视频教程(宋红康2023版&#xff0c;java入门自学必备)_哔哩哔哩_bilibili 2023最新Java学习路线 - 哔哩哔哩 正文 JAVA基础&#xff08;JAVA SE&#xff09;学习笔记&#xff08;一&#xff09;JAVA学习路线、行业了解…