Flink DataStream之Connect合并流

news2025/1/8 1:33:51
  • 新建类
package test01;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

public class TestConnection {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
        executionEnvironment.setParallelism(1);

        //Connect可以将不同数据类型的流进行合并,但形成的是ConnectedStream,并不是DataStream,也就是说对外是一个整体的合并后的流,但其实内部是各自处理各自的数据。

        //创建流1:数字流,但是由于我们输入时是字符串,所以这里需要将字符串进行类型转换,转换为数值类型的.
        SingleOutputStreamOperator<Integer> dataSource = executionEnvironment
                .socketTextStream("localhost", 7777)
                .map(value -> Integer.parseInt(value));

        //创建流2:字符串流
        DataStreamSource<String> stringSource = executionEnvironment.socketTextStream("localhost", 8888);

        //合并流,与union不同的是,union可以在一个source的后面多次调用union()合并多个stream,但是在connect中只能单次调用connect()进行合并
        ConnectedStreams<Integer, String> connect = dataSource.connect(stringSource);

        /**
         * 注意ConnectedStreams中没有print(),有map()、process()等方法用来对合并后的流中得到不同类型流进行分别处理.
         * 这里使用map(),CoMapFunction的参数一指的是调用connect()方法的数据流类型,参数二指的是被调用的数据流类型,
         * 也就是connect()括号中的数据流类型,参数三是最终合并后的数据流类型,可以看到参数一和参数二已经根据前面我们调用connect时的两个数据流类型
         * 自动帮我们获取到了数据类型,参数三初始是Object类型,这里我们想要使合并后的数据流类型变成String类型,所以参数三设置为String。
         */
        SingleOutputStreamOperator<String> outputStream = connect.map(new CoMapFunction<Integer, String, String>() {
            //重写map1()和map2(),map1()指的就是参数一对应的数据流,map2()指的是参数二对应的数据流
            @Override
            public String map1(Integer integer) throws Exception {
                //在map1()方法中对数据进行处理,使之返回值为String
                return "原始的数值流:" + integer.toString();
            }

            @Override
            public String map2(String s) throws Exception {
                return "原始的字符串流:" + s;
            }
        });

        outputStream.print();

        executionEnvironment.execute();

    }
}
  • 启动两个窗口

  •  启动程序

此时在窗口中输入数据,注意在7777端要输入数字,8888端输入字符串,然后观察控制台输出。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/736673.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

flutter开发实战-指纹、面容ID验证插件实现

flutter开发实战-指纹、面容ID验证插件实现 在iOS开发中&#xff0c;经常出现需要指纹、面容ID验证的功能。 指纹、面容ID是一种基于用生物识别技术&#xff0c;通过扫描用户的面部特征来验证用户身份。 一、效果图 二、iOS指纹、面容ID验证 在iOS中实现指纹、面容ID验证功能…

一同感受C++模版的所带来的魅力

文章目录 一、泛型编程思想二、函数模版1、函数模板概念2、函数模板格式3、函数模板的原理4、函数模板的实例化5、模板参数的匹配原则 三、类模版1、类模板的定义格式2、类模板的实例化 四、总结与提炼 一、泛型编程思想 首先我们来看一下下面这三个函数&#xff0c;如果学习过…

磁盘与文件系统管理

磁盘结构及分区表示 硬盘 Hard Disk Drive &#xff0c;简称 HDD 是计算机常用的存储设备之一。 1 磁盘基础 1.1 硬盘的结构 1.1.1 数据结构 ①扇区&#xff1a;磁盘上的每个磁道被等分为若干个弧段,这些弧段便是硬盘的扇区(Sector)。硬盘的第一个扇区&#xff0c;叫做引导…

【mysql】-【锁】

文章目录 概述MySQL并发事务访问相同记录读-读情况写-写情况读-写或写-读情况并发问题的解决方案 锁的不同角度分类 概述 事务的隔离性由这章讲述的锁来实现。 MySQL并发事务访问相同记录 并发事务访问相同记录的情况大致可以划分为3种: 读-读情况 读-读情况&#xff0c;…

使用vite创建vue3的Cesium基础项目

使用vite创建vue3的Cesium基础项目 使用vite创建vue3项目&#xff1a;可以参考官方文档Vite官方中文文档 1.1 在指定文件夹路径下使用npm&#xff08;前提是已经安装好了node&#xff09;&#xff1a; bash npm create vitelatest 1.2 cd到创建的项目文件夹&#xff1a; bash c…

符号化的正确姿势

GUI方式 将 .ips crash report 文件拖放到 Xcode > Window > Devices and Simulators > View Device Logs中, 然后导出 .crash 符号化文件. 使用条件: crash report 对应的 Archive 包是在本机构建的 symbolicatecrash symbolicatecrash 是一个 exec (可执行文件), …

常见的BUG---1、虚拟机启动之后,突然发现没有ens33网卡

1、问题描述 今天一开启虚拟机&#xff0c;发现用XShell连接不上我的一台虚拟机&#xff0c;其他虚拟机是正常可以连接的&#xff0c;我稍微看了一下XShell的配置和Windows中的映射文件&#xff08;hosts&#xff09;&#xff0c;都没有啥问题&#xff0c;然后我就知道应该是虚…

javap反编译字节码文件

javap -v main.class{public static void main(String[] args) {int a10;int b10;int cab;return;} } Classfile /F:/myCode/java/jvm/0710_demo01/untitled/target/classes/org/example/main.classLast modified 2023年7月10日; size 447 bytesMD5 checksum 675a0d673d66326b…

基于SpringBoot+Vue的影城管理系统设计与实现

博主介绍&#xff1a; 大家好&#xff0c;我是一名在Java圈混迹十余年的程序员&#xff0c;精通Java编程语言&#xff0c;同时也熟练掌握微信小程序、Python和Android等技术&#xff0c;能够为大家提供全方位的技术支持和交流。 我擅长在JavaWeb、SSH、SSM、SpringBoot等框架…

服务器中了malox勒索病毒的解决办法流程与解密方案

随着网络科技技术的不断发展&#xff0c;越来越多的企业开始重视数据&#xff0c;数字化办公已经成为众多企业工作的常态&#xff0c;因此数据的安全性受到了额外重视。但网络科技技术的发展不仅方便了我们的工作&#xff0c;也给企业的数据安全带来了很大威胁。近期&#xff0…

基于SpringBoot+Vue的疫情网课管理系统设计与实现

博主介绍&#xff1a; 大家好&#xff0c;我是一名在Java圈混迹十余年的程序员&#xff0c;精通Java编程语言&#xff0c;同时也熟练掌握微信小程序、Python和Android等技术&#xff0c;能够为大家提供全方位的技术支持和交流。 我擅长在JavaWeb、SSH、SSM、SpringBoot等框架…

【H5】Promise的用法

系列文章 C#底层库–记录日志帮助类 本文链接&#xff1a;https://blog.csdn.net/youcheng_ge/article/details/124187709 文章目录 系列文章前言一、技术介绍二、项目源码2.1 Promise的状态2.2 Promise的结果2.3 Promise的then方法参数2.4 Promise的then方法获取数据 三、效果…

5G时代的材料新宠——液晶高分子聚合物

液晶高分子聚合物时80年代初期发展起来的一种新型高性能工程塑料&#xff0c;英文名为&#xff1a;Liquid Crystal Polymer 简称为LCP。 聚合方法以熔融缩聚为主&#xff0c;全芳香族LCP多辅以固相缩聚以制得高分子量产品。非全芳香族LCP常采用一步或二步熔融聚合制取产品。近年…

jwt介绍与使用

0.介绍 JWT(JSON Web Token)是一种开放标准&#xff0c;用于在双方之间安全地传输编码为 JSON 对象的信息。它是一个紧凑和自包含的方式&#xff0c;用于作为 JSON 对象在各方之间安全地传输信息。此信息可以进行验证和信任&#xff0c;因为它是经过数字签名的。JWT 可以使用机…

Python零基础入门(七)——Python中的选择和循环语句

系列文章目录 个人简介&#xff1a;机电专业在读研究生&#xff0c;CSDN内容合伙人&#xff0c;博主个人首页 Python入门专栏&#xff1a;《Python入门》欢迎阅读&#xff0c;一起进步&#xff01;&#x1f31f;&#x1f31f;&#x1f31f; 码字不易&#xff0c;如果觉得文章不…

大学英语四新视野 课后习题+答案翻译 Unit1~Unit8

Unit 1 Text A: Words in use 2022年6月16日 20:57 1 As the gender barriers crumbled, the number of women working as lawyers, doctors, or bankers began to increase significantly from the mid-20th century. 随着性别障碍的消除&#xff0c;从20世纪中期开始&am…

C语言陷阱——无符号数和有符号数的大小比较

C语言易错知识点——无符号数和有符号数的大小比较 我们来看两串代码 代码一&#xff1a; #include<stdio.h>int main() {int a -1;if (a > sizeof(int)){printf(">\n");}else{printf("<\n");}return 0; }代码二&#xff1a; #include…

学生用啥台灯最好?适合暑假学习的台灯推荐

孩子们终于迎来了他们的暑假&#xff0c;肯定不少孩子都已经计划好每天该玩什么游戏&#xff0c;该看什么电视了吧。这也是最让家长们头疼的一段时间&#xff0c;不仅每天要监督他们不要玩太久电子产品&#xff0c;花时间学习之外&#xff0c;还要担心他们视力健康。说到学习&a…

C++图形开发(11):小球碰到方块的判定

文章目录 1.有哪些情况&#xff1f;1.1 小球在方块左侧1.2 小球在方块上面1.3 小球在方块右侧 2.解决 1.有哪些情况&#xff1f; 今天来实现下小球碰到方块的判定 那么我们首先要明确的就是在什么时候&#xff0c;小球会碰到方块&#xff1f; 1.1 小球在方块左侧 第一个就是…