Flink多流处理之coGroup(协同分组)

news2025/1/11 21:52:10

这篇文章主要介绍协同分组coGroup的使用,先讲解API代码模板,后面会结图解介绍coGroup是如何将流中数据进行分组的.

1 API介绍

  • 数据源
    # 左流数据
    ➜  ~ nc -lk 6666
    101,Tom
    102,小明
    103,小黑
    104,张强
    105,Ken
    106,GG小日子
    107,小花
    108,赵宣艺
    109,明亮
    
    
    # 右流数据
    ➜  ~ nc -lk 7777
    101,,本科,程序员
    102,,本科,程序员
    103,,本科,会计
    104,,大专,安全工程师
    105,,硕士,律师
    106,未知,小本,挖粪使者
    108,,本科,人事
    110,,本科,算法工程师
    
    
  • 代码
    import org.apache.flink.api.common.functions.CoGroupFunction;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple4;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    
    /**
     * @Author: J
     * @Version: 1.0
     * @CreateTime: 2023/8/10
     * @Description: 协同分组
     **/
    public class FlinkCoGroup {
        public static void main(String[] args) throws Exception {
            // 构建流环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置并行度
            env.setParallelism(2);
            // 数据源1(socket数据源),为了方便测试,根据实际情况自行选择
            DataStreamSource<String> sourceStream1 = env.socketTextStream("localhost", 6666);
            // 将数据进行切分返回Tuple2(id,name)
            SingleOutputStreamOperator<Tuple2<String, String>> mapStream1 = sourceStream1.map(value -> {
                String[] split = value.split(",");
                return Tuple2.of(split[0], split[1]);
            }).returns(new TypeHint<Tuple2<String, String>>() {
            });
            // 数据源2(socket数据源),为了方便测试,根据实际情况自行选择
            DataStreamSource<String> sourceStream2 = env.socketTextStream("localhost", 7777);
            // 将数据进行切分返回Tuple4(id,gender,education,job)
            SingleOutputStreamOperator<Tuple4<String, String, String, String>> mapStream2 = sourceStream2.map(value -> {
                String[] split = value.split(",");
                return Tuple4.of(split[0], split[1], split[2], split[3]);
            }).returns(new TypeHint<Tuple4<String, String, String, String>>() {});
            // 数据流协同
            DataStream<Tuple4<String, String, String, String>> coGrouped = mapStream1.coGroup(mapStream2)
                    .where(tup -> tup.f0) // 左流协同分组字段(mapStream1)
                    .equalTo(tup -> tup.f0) // 右流协同分组字段(mapStream2)
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(20))) // 开窗口,以处理时间划分(每20秒一个窗口)
                    .apply(new CoGroupFunction<Tuple2<String, String>, Tuple4<String, String, String, String>, Tuple4<String, String, String, String>>() {
                        @Override
                        public void coGroup(Iterable<Tuple2<String, String>> first, Iterable<Tuple4<String, String, String, String>> second, Collector<Tuple4<String, String, String, String>> out) throws Exception {
                            /**
                             *first 代表左流的迭代器
                             * second 代表右流的迭代器
                             * out 则是返回的数据形式
                             * 具体方法中两个迭代器存数据的原理后续会通过图结合进行解析
                             **/
                            // 这里的逻辑模拟sql中left join
                            // 遍历左流数据(first)
                            for (Tuple2<String, String> left : first) {
                                // 定义右流是否为NULL判断标识
                                boolean flag = false;
                                // 遍历右流数据(second)
                                for (Tuple4<String, String, String, String> right : second) {
                                    // 返回left(id, name) + right(gender, education)
                                    Tuple4<String, String, String, String> tup4 = Tuple4.of(left.f0, left.f1, right.f1, right.f2);
                                    // 输出
                                    out.collect(tup4);
                                    // 修改判断标识
                                    flag = true;
                                }
                                // 如果右流为NULL,则输出左流的数据
                                if (!flag) {
                                    // 这里用字符串"NULL"代替null值,方便观察
                                    Tuple4<String, String, String, String> tup4 = Tuple4.of(left.f0, left.f1, "NULL", "NULL");
                                    // 输出
                                    out.collect(tup4);
                                }
                            }
                        }
                    });
            // 打印结果
            coGrouped.print();
    
            env.execute("Flink CoGroup");
    
        }
    }
    
  • 结果
    2> (102,小明,男,本科)
    1> (106,GG小日子,未知,小本)
    2> (109,明亮,NULL,NULL)
    1> (107,小花,NULL,NULL)
    2> (105,Ken,男,硕士)
    2> (103,小黑,女,本科)
    2> (101,Tom,男,本科)
    2> (108,赵宣艺,女,本科)
    2> (104,张强,男,大专)
    
    从数据源和结果数据可以看到和代码逻辑是完全吻合的.

2 原理解析

我这我们先看一下图解,如下

在这里插入图片描述

  • 无界转有界
    在代码中我们开启window,这也是使用coGroup的必要条件,开启window后实际上就是将我们原本的无界数据流转变成一个以20S为界限的有界数据流.
  • 迭代器分组
    将数据进入到窗口内后,就会根据经我们前面设定的条件也就是.where.equalTo中的内容将mapStream1mapStream2中的数据根据key进行分组存储到不同的iterator中.
  • 逻辑计算
    上面已经将数据根据key都存储到iterator中了,这里就会根据我们在new CoGroupFunction<...>(){...}中的写的逻辑将mapStream1mapStream2中具有相同keyiterator进行计算.
  • 输出
    当一个window结束后,就会将数据按照计算后的结果(在代码中就是Tuple4<String, String, String, String>)输出到下游.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/859037.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Discuz论坛网站防复制代码分享

代码一 <script language"JavaScript">document.onselectstartnew Function("event.returnValuefalse;");</script>或是 代码二 <body onmousemove\HideMenu()\ oncontextmenu"return false" ondragstart"return false&q…

springboot整合JMH做优化实战

这段时间接手项目出现各种问题&#xff0c;令人不胜烦扰。吐槽下公司做项目完全靠人堆&#xff0c;大上快上风格注定留下一地鸡毛&#xff0c;修修补补不如想如何提升同事代码水准免得背锅。偶然看到关于JMH对于优化java代码的直观性&#xff0c;于是有了这篇文章&#xff0c;希…

浮动路由解决单点链路故障问题(第三十三课)

浮动路由解决单点链路故障问题(第三十三课) 理论来源于实践 1 路由的分类 2 直连路由: PC>ping 192.168.5.11Ping 192.168.5.11: 32 data bytes, Press Ctrl_C to break Request timeout! Request timeout! Request timeout! Request timeout! Request timeout!--- 192.16…

电脑ip地址怎么改 ip地址怎么改到别的城市

一、ip地址怎么改到别的城市 1.ip地址怎么改到别的城市&#xff0c;1、重启WIFI路由设备 一般手机或电脑在家或公司上网时都是接入到路由器的WIFI网络,再由路由器分配上网IP地址,如果要更换上网IP那么重启路由器设备后,路由器会向网络运营商进行宽带的重新拨号,此时手机或电脑设…

小程序自动化测试的示例代码

目录 背景 自动化 SDK 还原用户行为 总结 背景 上述描述看似简单&#xff0c;但是中间还是有些难点的&#xff0c;第一个难点就是如何在业务人员操作小程序的时候记录操作路径&#xff0c;第二个难点就是如何将记录的操作路径进行还原。 自动化 SDK 如何将操作路径还原这…

直接在html中引入Vue.js的cdn来实现一个简单的上传图片组件

摘要 当使用 Vue.js 的 CDN 来实现一个简单的上传图片组件时&#xff0c;你可以利用 Vue 的数据绑定和事件处理能力&#xff0c;结合 HTML 和 CSS&#xff0c;轻松地创建一个交互式的图片上传界面。以下是一个示例&#xff1a; 代码结构 index.html <!DOCTYPE html> &…

springcloud3 springcloud stream的学习以及案例

一 springcloud stream的作用 1.1 springcloud stream作用 stream屏蔽底层消息中间件的差异&#xff0c;降低切换成本&#xff0c;统一消息的编程模型。 stream中的消息通信模式遵循了“发布-订阅”模式。 1.2 Binder作用 通过定义绑定器Binder作为中间层&#xff0c;实现…

tomcat7.exe 启动闪退解决

标题tomcat7.exe 启动闪退解决 双击tomcat7.exe启动&#xff0c;但是出现闪退问题&#xff0c;无法启动tomcat 解决&#xff1a; 1.解决 tomcat7.exe 启动闪退解决 第一步&#xff1a;双击打开tomcat7w.exe 文件 如果出现 “指定的服务未安装。 Unable to open the service ‘…

编译iOS系统可用的FFmpeg

在进行编译之前&#xff0c;需要做一些准备工作安装必备文件&#xff1a; 1 安装 gas-preprocessor FFmpeg-iOS-build-script 自动编译脚本需要使用到 gas-preprocessor . 执行 sudo git clone https://github.com/bigsen/gas-preprocessor.git /usr/local/bin/gas sudo c…

工作每天都在用的 DNS 协议,你真的了解么?(文末有惊喜,别错过)

♥ 前 言 我们经常访问一些网址的时候&#xff0c;浏览器里输入类似于 www.baidu.com 这样的地址&#xff0c;那么在浏览器里输入这个地址---> 百度服务器给我们返回这个百度的页面&#xff0c;中间的过程是什么样的呢&#xff1f; 带着这个问题&#xff0c;我们一起来解…

HECI-Securtiy 防火墙路由技术

目录 一、防火墙路由基本原理 1.路由分类 2.路由优先级 3.路由查询先后顺序 4.静态路由基本原理 &#xff08;1&#xff09;指定出接口场景 &#xff08;2&#xff09;指定下一跳地址场景 5.静态路由与多出口 &#xff08;1&#xff09;主备备份 &#xff08;2&#…

FastSAM初体验,比SAM快50倍

一、FastSAM介绍 1.简介 由美国Meta公司提出的能够“分割一切”的视觉基础大模型SAM引起了较大影响&#xff0c;为探索通用视觉大模型提供了一个新的方向。 2023年6月22日&#xff0c;中科院自动化所的研究团队针对“分割一切”任务&#xff0c;提出了FastSAM方法。中科院自动…

Boost开发指南-4.3optional

optional 在实际的软件开发过程中我们经常会遇到“无效值”的情况&#xff0c;例如函数并不是总能返回有效值&#xff0c;很多时候函数正确执行了&#xff0c;但结果却不是合理的值。如果用数学语言来解释&#xff0c;就是返回值位于函数解空间之外。 求一个数的倒数&#xf…

Twitter霸屏:掌握社交流量密码

Twitter群推王发现&#xff0c;Twitter以其简短而有力的信息传递方式而著名&#xff0c;其中字符限制仅有280个。这意味着在Twitter的世界中&#xff0c;迅速高效的沟通至关重要。拥有约321亿月活跃用户的Twitter&#xff0c;成为塑造资源品牌知名度的强大平台。如今&#xff0…

PHP智能人才招聘网站mysql数据库web结构apache计算机软件工程网页wamp

一、源码特点 PHP智能人才招聘网站 是一套完善的web设计系统&#xff0c;对理解php编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。 下载地址 https://download.csdn.net/download/qq_41221322/88199392 视频演示 PH…

Alpine Ridge控制器使其具备多种使用模式 - 英特尔发布雷电3接口:竟和USB Type-C统一了

同时又因为这建立在Type-C的基础上&#xff0c;雷电3也将利用现有的标准Type-C线缆引入有源支持。当使用Type-C的线缆时&#xff0c;雷电的速度就降到了20Gbps全双工——这与普通的Type-C的带宽相同——这是为了成本牺牲了一些带宽。可以比较一下&#xff0c;Type-C线的成本只有…

基于Kubeadm部署k8s集群:下篇

继续上篇内容 目录 7、安装flannel 8、节点管理命令 三、安装Dashboard UI 1、部署Dashboard 2、开放端口设置 3、权限配置 7、安装flannel Master 节点NotReady 的原因就是因为没有使用任何的网络插件&#xff0c;此时Node 和Master的连接还不正常。目前最流行的Kuber…

【内网穿透】实现无公网IP远程连接Linux服务器并安装部署MongoDB数据库

【内网穿透】实现无公网IP远程连接Linux服务器并安装部署MongoDB数据库 ​ 憧憬Blog主页 在强者的眼中&#xff0c;没有最好&#xff0c;只有更好。 全栈开发领域优质创作者&#xff0c;阿里云专家博主 文章目录 【内网穿透】实现无公网IP远程连接Linux服务器并安装部署MongoDB…

智慧工地源码,Spring Cloud+ Vue+UniApp开发,微服务架构

智慧工地源码&#xff0c;智慧工地云平台源码 智慧工地APP源码 智慧工地的核心是数字化&#xff0c;它通过传感器、监控设备、智能终端等技术手段&#xff0c;实现对工地各个环节的实时数据采集和传输&#xff0c;如环境温度、湿度、噪音等数据信息&#xff0c;将数据汇集到云…

扎实推动产学研深入合作,携手共谋高质量发展新篇

近日&#xff0c;华南理工大学自动化学院胡跃明教授等一行4人莅临科东软件&#xff0c;双方就“产学研融合”进行会谈交流。在科东软件总经理余世清等人的陪同下&#xff0c;华工胡教授一行参观了科东软件展厅&#xff0c;对科东软件自主研发的Intewell工业实时操作系统及其在智…