Flink Flink中的合流

news2025/1/11 9:49:30

一、Flink中的基本合流操作

在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以 Flink 中合流的操作会更加普遍,对应的 API 也更加丰富。

二、联合(Union)

最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。
在这里插入图片描述
在代码中,我们只要基于 DataStream 直接调用.union()方法,传入其他 DataStream 作为参数,就可以实现流的联合了;得到的依然是一个 DataStream:

stream1.union(stream2, stream3, ...)

注意:union()的参数可以是多个 DataStream,所以联合操作可以实现多条流的合并。

代码实现:我们可以用下面的代码做一个简单测试:

package com.flink.DataStream.UnionStream;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkUnionStream {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecutionEnvironment.setParallelism(1);
        SingleOutputStreamOperator<Integer> source1 = streamExecutionEnvironment
                .socketTextStream("localhost", 1111)
                .map(a -> Integer.parseInt(a));
        SingleOutputStreamOperator<Integer> source2 = streamExecutionEnvironment
                .socketTextStream("localhost", 2222)
                .map(a -> Integer.parseInt(a));
        DataStreamSource<String> source3 = streamExecutionEnvironment.fromElements("3", "4", "5");
        DataStream<Integer> unionResult = source1.union(source2, source3.map(Integer::valueOf));
        unionResult.print();
        streamExecutionEnvironment.execute();
    }
}

在这里插入图片描述
在这里插入图片描述

三、连接(Connect)

为了处理更加灵活,连接操作允许流的数据类型不同。但我们知道一个DataStream中的数据只能有唯一的类型,所以连接得到的结果并不是DataStream,而是一个“连接流”。连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。要想得到新的DataStream,还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型。所以整体上来,两条流的连接就像是“一国两制”,两条流可以保持各自的数据类型、处理方式也可以不同,不过最终还是会统一到同一个DataStream中。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1265050.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

EZDML基本介绍

一、表结构设计器(EZDML) 这是一个数据库建表的小软件&#xff0c;可快速的进行数据库表结构设计&#xff0c;建立数据模型。类似大家常用的数据库建模工具如PowerDesigner、ERWIN、ER-Studio和Rational-Rose等的超级精简版。 官方下载地址&#xff1a;http://www.ezdml.com/d…

解析javascript数组方法 find 和 filter 有何区别

首先用一个案例可以很直观的看到 find 和 filter 的区别&#xff1b; 相同点&#xff1a; 两者分别可以接受三个参数&#xff1a;当前元素、当前索引、整个数组&#xff1b;两者都可以用来查找数组中符合条件的元素&#xff1b; 不同点&#xff1a; find&#xff1a; 用于查…

docker镜像管理命令

镜像管理命令 docker build : 命令用于使用 Dockerfile 创建镜像 docker build [OPTIONS] PATH | URL | - OPTIONS说明&#xff1a; --add-host :向hosts文件中添加自定义 host:ip 映射 --build-arg[] :设置镜像创建时的变量&#xff1b; --cache-from :指定镜像用作当前构建…

华为ospf和isis双点双向路由重分布的次优路径和环路终极解决方案

r5上直接导入直连路由 r3和r2进行双点双向路由重分布 查看R3去往R5产生了次优路径&#xff1a; 因为是R2先互相引入的isis和ospf&#xff0c;所以R3会产生次优路径&#xff0c;如果是R3先相互引入ospf和isis&#xff0c;那就是R2去R5会产生次优路径&#xff0c;而R3本身不会。…

华为设备使用python实现文件自动保存下载

实验目的&#xff1a; 公司有一台CE12800的设备&#xff0c;管理地址为172.16.1.2&#xff0c;现在需要编写自动化脚本&#xff0c;STELNET实现设备的自动保存配置文件&#xff0c;使用SFTP实现设备的文件下载。 实验拓扑&#xff1a; 实验步骤&#xff1a; 步骤1&#xff1…

Spark---资源、任务调度

一、Spark资源调度源码 1、Spark资源调度源码过程 Spark资源调度源码是在Driver启动之后注册Application完成后开始的。Spark资源调度主要就是Spark集群如何给当前提交的Spark application在Worker资源节点上划分资源。Spark资源调度源码在Master.scala类中的schedule()中进行…

揭开 BFC 的神秘面纱:前端开发必知必会

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

【多线程】-- 05 Lambda表达式

多线程 4 Lambda表达式 λ是希腊字母表中排序第十一位的字母&#xff0c;英语名称为Lambda是为了避免匿名内部类定义过多实质属于函数式编程的概念 为什么要使用Lambda表达式&#xff1f; 避免匿名内部类定义过多可以让代码看起来很简洁去掉了一堆没有意义的代码&#xff0…

【LeetCode】128. 最长连续序列——哈希的应用(3)

文章目录 1、思路2、解题方法3、复杂度时间复杂度:空间复杂度: 4、Code Problem: 128. 最长连续序列 1、思路 我会用一种做题者的思路来去看待这道题。 我们在乍一看到这道题的时候&#xff0c;看到它的时间复杂度要求为O(N)&#xff0c;然后又要求去找序列(就是让你判断这个…

高频Latex公式速查表,写论文技术博客不愁了

常见上下标X_{2}X^{2}\hat{X}\bar{X}\frac{1}{X}常见希腊字母\alpha \beta \gamma \delta \varepsilon \eta \theta \rho \sigma \phi \varphi \omega常见数学符号\leq \geq \neq\approx 其他\sum \prod \int \bigoplus \forall \exists \times \setminus \bigotimes \bigodot …

五分钟 k8s 实战-应用探针

Probe.png 今天进入 kubernetes 的运维部分&#xff08;并不是运维 kubernetes&#xff0c;而是运维应用&#xff09;&#xff0c;其实日常我们大部分使用 kubernetes 的功能就是以往运维的工作&#xff0c;现在云原生将运维和研发关系变得更紧密了。 今天主要讲解 Probe 探针相…

集成IDE开发环境,Java开发工具IntelliJ IDEA 2023中文

IntelliJ IDEA 2023是一款功能强大的软件&#xff0c;其为程序员提供了一款先进的集成开发环境。它以智能、高效和人性化为主要特点&#xff0c;致力于提高开发人员的生产力&#xff0c;帮助程序员更快、更好地编写代码。IntelliJ IDEA 2023支持多种语言和框架&#xff0c;包括…

iOS 通用链接的配置(Universal Links)

一、打开Associated Domains 1.首先登录 苹果开发者网站 2.Certificates, Identifiers & Profiles 下的Identifiers 找到要配追的Identifiers 点进去 3.打开Associated Domains然后保存 二、更新Profile文件 如果我们使用自动的&#xff0c;可以忽略这一步&#xff0c;…

泛微E-Office SQL注入漏洞复现

0x01 产品简介 泛微E-Office是一款标准化的协同 OA 办公软件&#xff0c;泛微协同办公产品系列成员之一,实行通用化产品设计&#xff0c;充分贴合企业管理需求&#xff0c;本着简洁易用、高效智能的原则&#xff0c;为企业快速打造移动化、无纸化、数字化的办公平台。 0x02 漏…

00.本地搭建 threejs 文档网站(网页版是外网比较慢)

three官网 https://threejs.org/ 下载代码 进入官网 可以选择github去下载 或者 下载压缩包 github 下载https链接地址 https://github.com/mrdoob/three.js.git git clone https://github.com/mrdoob/three.js.git安装依赖启动程序 安装依赖 npm i 或者 pnpm i 或者 …

通过git上传文件到github仓库

一、新建github仓库 访问github官网&#xff1a;GitHub: Let’s build from here GitHub 点击个人头像&#xff0c;在右侧栏选择Your repositories。 点击New&#xff0c;新建一个github仓库。 创建Repository name仓库名&#xff0c;如果这个仓库名已经创建过的话&#xff…

开始使用Spring Boot Admin吧-使用Nacos注册SBA

什么是 Spring Boot Admin&#xff08;SBA&#xff09;? Spring Boot Admin 是 codecentric 公司开发的一款开源社区项目&#xff0c;目标是让用户更方便的管理以及监控 Spring Boot 应用。 应用可以通过我们的Spring Boot Admin客户端&#xff08;通过HTTP的方式&#xff0…

Vue项目的创建、运行与端口号修改

前言&#xff1a;Vue-cli是Vue官方提供的一个脚手架&#xff0c;用于快速生成一个Vue的项目模板&#xff0c;依赖于NodeJS环境 NodeJS下载&#xff1a;NodeJS安装下载 Vue-cli下载&#xff1a;Vue-cli下载 一.Vue图形化创建项目 1.建立一个文件夹&#xff0c;保存Vue项目 2.在该…

华为P40无法链接adb的解决记录

真的很讨厌华为的设备&#xff0c;很多东西啥设备都能跑得好好的&#xff0c;就华为会出问题&#xff0c;简直就是手机界的IE。 情况&#xff1a;突然无法链接adb到P40&#xff0c;拔插无效&#xff0c;关闭开发人员选项再打开也无效&#xff0c;撤销USB调试授权也无效&#x…

xcode opencv

1、导入报错 Undefined symbols: linker command failed with exit code 1 (use -v to see invocation) 直接添加如下图内容即可