Flink Flink中的分流

news2024/12/30 3:22:47

一、什么是分流

所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。
在这里插入图片描述

二、基于filter算子的简单实现分流

其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。
案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。

package com.flink.DataStream.SplitStream;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkSplitStreamByFilter {
    public static void main(String[] args) throws Exception {
        //TODO 创建Flink上下文执行环境
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment
                .createLocalEnvironmentWithWebUI(new Configuration().set(RestOptions.BIND_PORT, "8081"));
        //.getExecutionEnvironment();
        //TODO 设置全局并行度为2
        streamExecutionEnvironment.setParallelism(2);
        DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);
        //TODO 先将输入流转为Integer类型
        SingleOutputStreamOperator<Integer> mapResult = dataStreamSource.map((input) -> {
            int i = Integer.parseInt(input);
            return i;
        });
        //TODO 使用匿名函数分流偶数流
        SingleOutputStreamOperator<Integer> ds1 = mapResult.filter(new FilterFunction<Integer>() {
            @Override
            public boolean filter(Integer a) throws Exception {
                return a % 2 == 0;
            }
        });
        //TODO 使用lamda表达式分流奇数流
        SingleOutputStreamOperator<Integer> ds2 = mapResult.filter((a) -> a % 2 == 1);
        ds1.print("偶数流");
        ds2.print("奇数流");
        streamExecutionEnvironment.execute();
    }
}

执行结果

奇数流:1> 1
偶数流:2> 2
偶数流:1> 2
偶数流:2> 4
奇数流:1> 3
奇数流:2> 1

Process finished with exit code 130 (interrupted by signal 2: SIGINT)

这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流 stream 复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?

三、使用测输出流

关于处理函数中侧输出流的用法,我们已经在 7.5 节做了详细介绍。简单来说,只需要调用上下文 ctx 的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的 id 和类型。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1247551.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Windows核心编程 进程

目录 一、进程概述 二、创建进程相关API Winexec ShellExecute CreateProcess 三、进程退出相关API ExitProcess TerminateProcess GetCurrentProcess GetExitCodeProcess 四、如何理解虚拟内存空间 五、关于UAC 一、进程概述 进程&#xff1a;正在运行的程序 程…

虾皮插件:优化Shopee商家店铺运营的利器

在如今竞争激烈的电商市场中&#xff0c;如何提升Shopee商家店铺的运营效率和销售业绩成为了摆在每个商家面前的一道难题。然而&#xff0c;幸运的是&#xff0c;虾皮插件-知虾的出现为商家们带来了一种全新的解决方案。本文将介绍虾皮插件的用途和优势&#xff0c;并详细介绍其…

Unity Android FireBase bugly报错查询

报错如下图&#xff0c;注意&#xff0c;标红的三处 使用的il2cpp和架构是arm64-v8a 那我们就可以根据这些去找对应的符号表&#xff0c;在unity安装目录下 Unity2020.3.33f1\Editor\Data\PlaybackEngines\AndroidPlayer\Variations\il2cpp\Release\Symbols\arm64-v8a 找到l…

NVMe-oF E-JBOF设计解析:WD RapidFlex网卡、OpenFlex Data24

OpenFlex Data24 NVMe-oF Storage Platform WD的SN840 NVMeSSD新品并没有太吸引我注意&#xff0c;因为它还是PCIe 3.0接口的&#xff0c;要知道Intel的PCIe 4.0 SSD都已经推出了。 但上面这个NVMe-oF&#xff08;NVMe over Fabric&#xff09;EBOF&#xff08;区别于普通JBO…

评测|PolarDB MySQL 版 Serverless

评测&#xff5c;PolarDB MySQL 版 Serverless 目录 一、测试背景 1.1、云原生数据库 PolarDB Serverless新架构概念 1.2、Serverless资源弹性扩缩触发条件 二、PolarDB的Serverless能力与同类型产品进行对比 三、动态弹性升降资源的能力测试 3.1、测试资源 3.2、测试一…

Linux时间命令—— 显示时间,日历等

目录 1.date显示时间 1.1 常用的标记列表&#xff1a; 1.2 设定时间&#xff1a; 2.cal显示日历 3.时间戳 1.date显示时间 date 用法&#xff1a;date [OPTION] ... [FORMAT] 1.1 常用的标记列表&#xff1a; %H : 小时 (00..23) %M : 分钟 (00..59) %S : 秒 (00..61…

Java【XML 配置文件解析】

前言 最近考试周忙得要死&#xff0c;但我却不紧不慢&#xff0c;还有三天复习时间&#xff0c;考试科目几乎都还没学呢。今天更新一个算是工具类-XML文件的解析&#xff0c;感觉还是挺有用的&#xff0c;之后可以融进自己的项目里。 XML 配置文件解析 0、导入依赖 有点像我…

Vue3 设置点击后滚动条移动到固定的位置

需求&#xff1a; 点击不通过按钮&#xff0c;显示红框中表单&#xff0c;且滚动条滚动到底部 &#xff08;显示红框中表单默认不显示&#xff09; <el-button click"onApprovalPass">不通过</el-button> <div class"item" v-if"app…

计算机组成原理-磁盘存储器

文章目录 总览外存储器磁盘存储器磁盘的性能指标磁盘地址磁盘的工作过程磁盘阵列 总结 总览 外存储器 磁盘存储器 写是利用电流产生磁场从而写磁盘 读是利用载磁体移动时产生的电场从而得到数据 磁性材质易受外界磁场干扰 下图中 载磁体上N S的前后顺序代表对应存储二进制的比…

pwn:[SWPUCTF 2021 新生赛]nc签到

题目 linux环境下显示为 配合题目的下载附件&#xff0c;发现过滤了一些&#xff0c;一旦输入这些会自动关闭程序 ls被过滤了&#xff0c;可以使用l\s cat和空格都被过滤了&#xff0c;cat可以换成c\at ,空格可以换成$IFS$9

HarmonyOS ArkTS 给应用添加动画(十一)

1 概述 属性动画&#xff0c;是最为基础的动画&#xff0c;其功能强大、使用场景多&#xff0c;应用范围较广。常用于如下场景中&#xff1a; 页面布局发生变化。例如添加、删除部分组件元素。页面元素的可见性和位置发生变化。例如显示或者隐藏部分元素&#xff0c;或者将部…

什么是合封芯片工艺,合封芯片工艺工作原理、应用场景、技术要点

芯片封装技术不断进步&#xff0c;其中合封芯片工艺作为一种先进的芯片封装技术&#xff0c;“超”广泛应用于各类电子设备中。 本文将从合封芯片工艺的工作原理、应用场景、技术要点等方面进行深入解读。 一、合封芯片工艺 合封芯片工艺是一种将多个芯片或不同的功能的电子模…

Stable Video Diffusion重磅发布,快来看看哪些功能

本周&#xff0c;有关 OpenAI 宫斗的报道占据了Ai圈版面的主导地位&#xff0c;吃够了奥特曼的大瓜。我们来看看Stability AI刚发布的Stable Video Diffusion&#xff0c;这是一种通过对现有图像进行动画处理来生成视频的 AI 模型。基于 Stability 现有的Stable Diffusion文本到…

HCIA-RS基础:动态路由协议基础

摘要&#xff1a;本文介绍动态路由协议的基本概念&#xff0c;为后续动态路由协议原理课程提供基础和引入。主要讲解常见的动态路由协议、动态路由协议的分类&#xff0c;以及路由协议的功能和自治系统的概念。文章旨在优化标题吸引力&#xff0c;并通过详细的内容夯实读者对动…

可视化工作流管理流程及工具

Leangoo领歌是一款永久免费的专业的敏捷开发管理工具&#xff0c;提供端到端敏捷研发管理解决方案&#xff0c;涵盖敏捷需求管理、任务协同、进展跟踪、统计度量等。 Leangoo领歌上手快、实施成本低&#xff0c;可帮助企业快速落地敏捷&#xff0c;提质增效、缩短周期、加速创新…

爆款软文写作三大技巧,请查收

好的软文可以吸引目标受众&#xff0c;提升品牌知名度&#xff0c;但对于缺乏软文写作经验的人来说&#xff0c;往往是头快想秃了文案都写不出一句话&#xff0c;今天媒介盒子就来分享&#xff1a;爆款软文写作技巧。 一、 了解受众 做营销不管是线上营销还是线下营销&#x…

深眸科技聚焦AI机器视觉检测,驱动3C电子行业集成创新实现新需求

随着消费的升级及国家政策的助推&#xff0c;国内3C电子市场不断扩大&#xff0c;行业实现高速发展。近年来&#xff0c;3C电子产品持续迭代&#xff0c;生产工艺也逐渐复杂化&#xff0c;相关生产线定位组装、零部件检测、整机产品检测等环节&#xff0c;亟需使用具备较强适应…

HDX读卡器牛羊管理RFID设备品牌

半双工HDX&#xff08;Half Duplex&#xff09;技术是ISO11784/5中规定的另一种标签与读写器之间的通讯方式&#xff0c;与全双工工&#xff08;FDX&#xff09;相比&#xff0c;HDX通常识别能力更强&#xff0c;有更大的识别距离。在HDX读写器的射频场与HDX标签响应期间关闭&a…

友思特分享 | Neuro-T:零代码自动深度学习训练平台

来源&#xff1a;友思特 智能感知 友思特分享 | Neuro-T&#xff1a;零代码自动深度学习训练平台 欢迎关注虹科&#xff0c;为您提供最新资讯&#xff01; 工业自动化、智能化浪潮涌进&#xff0c;视觉技术在其中扮演了至关重要的角色。在汽车、制造业、医药、芯片、食品等行业…