Flink多流处理之connect拼接流

news2025/1/20 3:38:19

Flink中的拼接流connect的使用其实非常简单,就是leftStream.connect(rightStream)的方式,但是有一点我们需要清楚,使用connect后并不是将两个流给串联起来了,而是将左流和右流建立一个联系,作为一个大的流,并且这个大的流可以使用相同的逻辑处理leftStreamrightStream,也可以使用不同的逻辑处理leftStreamrightStream.
如下图:
在这里插入图片描述

下面的演示代码也可以通过这个图结合来看,其实connect算子最主要的作用就是共享状态,如常用的广播状态.

  • 代码
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

import java.util.Arrays;

/**
 * @Author: J
 * @Version: 1.0
 * @CreateTime: 2023/8/7
 * @Description: 多流操作-流连接
 **/
public class FlinkConnect {
    public static void main(String[] args) throws Exception {
        // 构建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(3);
        // 添加数据源1
        DataStreamSource<String> sourceStream1 = env.fromCollection(Arrays.asList("a", "b", "c", "d"));
        // 添加数据源2
        DataStreamSource<Double> sourceStream2 = env.fromCollection(Arrays.asList(22.2, 11.0, 6.0, 98.0, 100.0));

        // 拼接数据流
        ConnectedStreams<String, Double> connectedStream = sourceStream1.connect(sourceStream2);

        // 这里使用map算子作为演示
        SingleOutputStreamOperator<String> resultStream = connectedStream.map(new CoMapFunction<String, Double, String>() {
            /**
             * map1作为左流
             **/
            @Override
            public String map1(String value) throws Exception {
                return "字符串: " + value;
            }

            /**
             * map2作为右流
             **/
            @Override
            public String map2(Double value) throws Exception {
                return "数字: " + (value * 100);
            }
        });

        // 打印结果
        resultStream.print();

        env.execute("Connect Operator");
    }
}

  • 结果
3> 字符串: b
1> 数字: 600.0
2> 字符串: a
3> 数字: 1100.0
2> 数字: 2220.0
2> 字符串: d
2> 数字: 9800.0
3> 数字: 10000.0
1> 字符串: c

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/842974.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Leetcode】(自食用)删除链表中倒数第k个结点

step by step. 题目&#xff1a; 给你一个链表&#xff0c;删除链表的倒数第 n 个结点&#xff0c;并且返回链表的头结点。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5], n 2 输出&#xff1a;[1,2,3,5]示例 2&#xff1a; 输入&#xff1a;head [1], n 1 输出&a…

Golang之路---04 并发编程——互斥锁和读写锁

互斥锁和读写锁 面对并发问题&#xff0c;我们始终应该优先考虑使用信道&#xff0c;如果通过信道解决不了的&#xff0c;不得不使用共享内存来实现并发编程的&#xff0c;那 Golang 中的锁机制&#xff0c;就是你绕不过的知识点了。 在 Golang 里有专门的方法来实现锁&#x…

【C++】哈希闭散列

一.哈希的概念 在前面学习了二叉搜索树、AVL树、红黑树之后&#xff0c;我们得知顺序结构以及平衡树中&#xff0c;元素关键码与其存储位置之间没有对应的关系&#xff0c;因此在查找一个元素时&#xff0c;必须经过关键码的多次比较。顺序查找的时间复杂度为 O(N)&#xff0c…

0.CLIP

目录 前言背景缘起/摘要数据集拟解决问题 精读IntroductionModel2.1自然语言监督2.2 创建一个有效的大数据集选择一个有效的预训练方法2.4 选择模型&#xff08;选择Encoder&#xff09;2.5训练小结 实验 复现&#xff08;略&#xff09; 前言 本课程来自深度之眼《多模态》训…

找免费商用的图片素材就上这6个网站。

分享6个免费商用的高清图片素材库&#xff0c;你想要找到这里都能找到&#xff0c;赶紧收藏起来吧~ 菜鸟图库 https://www.sucai999.com/pic.html?vNTYwNDUx 网站主要是为新手设计师提供免费素材的&#xff0c;素材的质量都很高&#xff0c;类别也很多&#xff0c;像平面、UI…

Zhang-Suen骨架提取算法

前言 本专栏针对的目标物体为物体裂缝量化&#xff0c;提取裂缝的骨架有助于裂缝长度的求解&#xff0c;故这一篇也是本专栏的开篇。 细化算法选择与分析 裂缝骨架的提取是十分有必要&#xff0c;如果我们能够得到裂缝的骨架图那么就很容易获得整条裂缝的长度。在当前经典的…

机器学习深度学习——序列模型(NLP启动!)

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位即将上大四&#xff0c;正专攻机器学习的保研er &#x1f30c;上期文章&#xff1a;机器学习&&深度学习——卷积神经网络&#xff08;LeNet&#xff09; &#x1f4da;订阅专栏&#xff1a;机器学习&&深度…

地理信息系统空间分析实验教程 第三版 第八章示例与练习 学校选址

学校选址 背景 合理的学校空间位置布局有利于学生的上课与生活。学校的选址问题需要考虑地理 E八位置、学生娱乐场所配套设施、与现有学校的距离等因素&#xff0c;从总体上把握这些国素能够确定出适宜性比较好的学校选址区 目的 通过练习&#xff0c;熟悉 ArcGIS 栅格数据…

无涯教程-Perl - endnetent函数

描述 此功能告诉系统您不再希望使用getnetent从网络列表中读取条目。 语法 以下是此函数的简单语法- endnetent返回值 此函数不返回任何值。 例 以下是显示其基本用法的示例代码- #!/usr/bin/perluse Socket;while ( ($name, $aliases, $addrtype, $net) getnetent() )…

VUE框架:vue2转vue3全面细节总结(3)路由组件传参

大家好&#xff0c;我是csdn的博主&#xff1a;lqj_本人 这是我的个人博客主页&#xff1a; lqj_本人_python人工智能视觉&#xff08;opencv&#xff09;从入门到实战,前端,微信小程序-CSDN博客 最新的uniapp毕业设计专栏也放在下方了&#xff1a; https://blog.csdn.net/lbcy…

element表格+表单+表单验证结合运用

目录​​​​​​​ 一、结果展示 二、实现代码 一、结果展示 1、图片 2、描述 table中放form表单&#xff0c;放输入框或下拉框或多选框等&#xff1b; 点击添加按钮&#xff0c;首先验证表单&#xff0c;如果存在没填的就验证提醒&#xff0c;都填了就向下添加一行表单表…

Redis中BigKey、HotKey的发现与处理

Redis中BigKey、HotKey的发现与处理 内容详情&#xff1a; 阿里云开发者社区(点击跳转) 参考自&#xff1a; https://developer.aliyun.com/article/788271?utm_contentm_1000291945#slide-1

【数据结构OJ题】删除有序数组中的重复项

原题链接&#xff1a;https://leetcode.cn/problems/remove-duplicates-from-sorted-array/ 目录 1. 题目描述 2. 思路分析 3. 代码实现 1. 题目描述 2. 思路分析 用双指针算法&#xff0c;定义两个变量src和dst&#xff0c;一开始让src和dst指向num[ ]数组的第一个元素&a…

Cadence学习

Cadence学习 Cadence内容涵盖Cadence主要功能Cadence功能模块Allegro Design Entry CIS 和 OrCAD Capture CIS 的区别Cadence 公司简介Allegro Design Entry CISOrCAD Capture CIS OrCAD中part和database part区别OrCAD中不同页面的连接关系应该怎么处理&#xff08;1&#xff…

Matlab之利用MarkerFaceColor来填充marker

matlab画图在加一些marker的时候, 有实心的圆圈, 比如: plot(x,y,.r,MarkerSize,20)但是如果想要一个很大的marker, 就需要把这个markersize调得很大, 比如MarkerSize20 但是也可以用空心的圆圈然后把中间涂上颜色, 这样调整起来更方便. 比如: plot(x,y,or,MarkerSize,5,Mar…

拆分PDBQT文件并将其转换为PDB格式

拆分PDBQT文件转为PDB格式 1. vina_split拆分PDBQT文件 假设你用AutoDock Vina做了对接&#xff0c;那么所有预测的结合构象都被放入一个多构象 PDBQT 文件中&#xff0c;如果需要拆分后进行可视化分析&#xff0c;那么Vina官方自带了vina_split来进行拆分。下面是vina_split…

TS协议之PES(ES数据包)

TS协议之PAT&#xff08;节目关联表&#xff09;TS协议之PMT&#xff08;节目映射表&#xff09;TS协议之PES&#xff08;ES数据包&#xff09; 该文档已上传&#xff1a;下载地址 1. 概要 1.1 TS数据包&#xff08;PES&#xff09;协议数据组成 TSTS头PES头ES。TS&#xf…

在 Ubuntu 上安装 Docker 桌面

Ubuntu 22.04 (LTS) 安装 Docker 桌面 要成功安装 Docker Desktop&#xff0c;您必须&#xff1a; 满足系统要求拥有 64 位版本的 Ubuntu Jammy Jellyfish 22.04 (LTS) 或 Ubuntu Impish Indri 21.10。对于非 Gnome 桌面环境&#xff0c;必须安装 gnome-terminal&#xff1a;…

springsecurity初稿

springsecurity 课程 课程目标 权限管理简介【了解】权限管理解决方案【掌握】初识Spring Security【了解】Spring Security 认证配置【掌握】Spring Security 鉴权配置【掌握】Spring Security 底层原理【掌握】Spring Security 退出操作【重点】Spring Security整合JWT【重…

参考RabbitMQ实现一个消息队列

文章目录 前言小小消息管家1.项目介绍2. 需求分析2.1 API2.2 消息应答2.3 网络通信协议设计 3. 开发环境4. 项目结构介绍4.1 配置信息 5. 项目演示 前言 消息队列的本质就是阻塞队列&#xff0c;它的最大用途就是用来实现生产者消费者模型&#xff0c;从而实现解耦合以及削峰填…