Flink-使用filter和SideOutPut进行分流操作

news2025/1/12 3:57:14

文章目录

    • 1.什么是分流?
    • 2. 过滤器(filter)
    • 3. 使用侧输出流(SideOutput)

💎💎💎💎💎

更多资源链接,欢迎访问作者gitee仓库:https://gitee.com/fanggaolei/learning-notes-warehouse/tree/master

1.什么是分流?

  所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,得到完全平等的多个子 DataStream。一般来说,我们会定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里

image-20221127142725094

2. 过滤器(filter)

  其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。

import com.atguigu.chapter05.ClickSource;
import com.atguigu.chapter05.Event;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SplitStreamByFilter {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> stream = env
                .addSource(new ClickSource());
        
        // 筛选Mary的浏览行为放入MaryStream流中
        DataStream<Event> MaryStream = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
                return value.user.equals("Mary");
            }
        });
        // 筛选Bob的购买行为放入BobStream流中
        DataStream<Event> BobStream = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
                return value.user.equals("Bob");
            }
        });
        // 筛选其他人的浏览行为放入elseStream流中
        DataStream<Event> elseStream = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
                return !value.user.equals("Mary") && !value.user.equals("Bob") ;
            }
        });

        MaryStream.print("Mary pv");
        BobStream.print("Bob pv");
        elseStream.print("else pv");

        env.execute();

    }
}

image-20221127143748458
  这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流 stream 复制三份,然后对每一份分别做筛选;这明显是不够高效的。

3. 使用侧输出流(SideOutput)

  在 Flink 1.13 版本中,已经弃用了.split()方法,取而代之的是直接用处理函数(process function)的侧输出流(side output)。
  我们知道,处理函数本身可以认为是一个转换算子,它的输出类型是单一的,处理之后得到的仍然是一个 DataStream; 而侧输出流则不受限制,可以任意自定义输出数据,它们就像从“主流”上分叉出的“支流”。尽管看起来主流和支流有所区别,不过实际上它们都是某种类型的 DataStream,所以本质上还是平等的。 利用侧输出流就可以很方便地实现分流操作,而且得到的多条 DataStream 类型可以不同,这就给我们的应用带来了极大的便利

import com.atguigu.chapter05.ClickSource;
import com.atguigu.chapter05.Event;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;


public class SplitStreamByOutputTag {
    // 定义输出标签,侧输出流的数据类型为三元组(user, url, timestamp)
    private static OutputTag<Tuple3<String, String, Long>> MaryTag = new OutputTag<Tuple3<String, String, Long>>("Mary-pv"){};
    
    private static OutputTag<Tuple3<String, String, Long>> BobTag = new OutputTag<Tuple3<String, String, Long>>("Bob-pv"){};
    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> stream = env
                .addSource(new ClickSource());

        SingleOutputStreamOperator<Event> processedStream = stream.process(new ProcessFunction<Event, Event>() {
            
            @Override
            public void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {
                if (value.user.equals("Mary")){
                    ctx.output(MaryTag, new Tuple3<>(value.user, value.url, value.timestamp));
                    
                } else if (value.user.equals("Bob")){
                    ctx.output(BobTag, new Tuple3<>(value.user, value.url, value.timestamp));
                    
                } else {
                    out.collect(value);
                }
            }
        });

        processedStream.getSideOutput(MaryTag).print("Mary pv");
        processedStream.getSideOutput(BobTag).print("Bob pv");
        processedStream.print("else");

        env.execute();
    }
}

image-20221127144037993

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/110602.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

四、网络层(七)网络层设备

目录 7.1 路由器的组成和功能 7.2 路由表与路由转发 7.1 路由器的组成和功能 路由器是一种具有多个输入/输出端口的专用计算机&#xff0c;其任务是连接不同的网络&#xff08;可以是异构的&#xff09;并完成路由转发。在多个逻辑网络&#xff08;即多个广播域&#xff…

Vulnhub靶机:HACKADEMIC_ RTB1

目录介绍信息收集主机发现主机信息探测网站探测Sql注入挂马提权介绍 系列&#xff1a;Hackademic&#xff08;此系列共2台&#xff09; 发布日期&#xff1a;2011年9月6日 难度&#xff1a;初级 运行环境&#xff1a;VMware Workstation 目标&#xff1a;取得 root 权限 flag…

5W2H分析法

什么是5W2H 5W2H分析法又叫七何分析法&#xff0c;是二战中美国陆军兵器修理部首创。简单、方便&#xff0c;易于理解、使用&#xff0c;富有启发意义&#xff0c;广泛用于企业管理和技术活动&#xff0c;对于决策和执行性的活动措施也非常有帮助&#xff0c;也有助于弥补考虑…

【UE4 第一人称射击游戏】07-添加“AK47”武器

素材资料地址&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1epyD62jpOZg-o4NjWEjiyg 密码&#xff1a;jlhr 效果&#xff1a; 步骤&#xff1a; 1.打开“WalkRun_BS”&#xff0c;将内插时间改为1 2.创建一个文件夹&#xff0c;命名为“Weapons” 进入“Weapons”…

可视化数据图表-FineReportJS实现清空控件内容

1. 概述 1.1 问题描述 在使用查询控件时&#xff0c;有时我们希望能够快捷重置控件的内容&#xff0c;或者重置所有控件的内容。效果如下图所示&#xff1a; 重置某个控件的内容&#xff1a;1.2 实现思路 在使用查询控件时&#xff0c;有时我们希望能够快捷重置控件的内容&a…

H3C 二层链路聚合

简介&#xff1a; 它通过将多条以太网物理链路捆绑在一起成为一条逻辑链路&#xff0c;从而实现增加链路带宽的目的。 成员端口&#xff1a; 选中&#xff08;Selected&#xff09;状态&#xff1a;此状态下的成员端口可以参与用户数据的转发&#xff0c;处于此状态的成员端口…

绝!OpenAI 年底上新,单卡 1 分钟生成 3D 点云,text-to 3D 告别高算力消耗时代

内容一览&#xff1a;继 DALL-E、ChatGPT 之后&#xff0c;OpenAI 再发力&#xff0c;于近日发布 PointE&#xff0c;可以依据文本提示直接生成 3D 点云。 关键词&#xff1a;OpenAI 3D 点云 PointE OpenAI 年底冲业绩&#xff0c;半个多月前发布的 ChatGPT 广大网友还没…

政务行业势能厂商 |美创科技入选《嘶吼2022中国网络安全产业势能榜》

近日&#xff0c;网络安全垂直媒体嘶吼网络安全产业研究院正式发布《嘶吼2022中国网络安全产业势能榜》评选结果。凭借在政务数据安全领域的服务深耕以及广泛的市场认可&#xff0c;美创科技入选势能榜“政务篇”&#xff0c;获评政务行业“专精型”安全厂商。 嘶吼安全产业研究…

Apache 之执行 CGI 脚本(Python 实现)

目录前言1 查看并挑选 Python 版本2 用 Python 实现一个简单的 CGI 脚本3 查看 CGI 环境变量总结前言 本文记录了一个搭建 CGI 环境的示例。前文推荐&#xff1a;《Apache 2.4.54 x64 安装及配置》。 【系统环境】 Win10-64bit Apache 2.4.54 x64 Python 3.11.1 1 查看并挑选…

PyInstaller的常用打包命令

学习了pyqt后&#xff0c;设计了界面&#xff0c;并且需要打包为exe程序。 每次打包时&#xff0c;都要查好久资料&#xff0c;故此记录一下常用的命令。 PyInstaller 是一个 Python 应用程序打包工具&#xff0c;它可以将 Python 程序打包为单个独立可执行文件。 要使用 P…

2022星空创造营应用创新大赛圆满落幕,获奖名单出炉!

​12月22日&#xff0c;2022星空创造营应用创新大赛在2022手机创新周暨第十届手机设计大赛颁奖典礼上作为特别专场正式公布获奖名单。2022星空创造营应用创新大赛由联通在线、手机设计大赛天鹅奖组委会联合主办&#xff0c;联通在线音乐公司及工信部赛迪研究院共同承办&#xf…

Vulnhub靶机:HOLYNIX_ V1

目录介绍信息收集主机发现主机信息探测网站探测万能密码文件包含漏洞文件上传提权补充&#xff1a;ip问题介绍 系列&#xff1a;Holynix&#xff08;此系列共2台&#xff09; 发布日期&#xff1a;2010年11月27日 难度&#xff1a;中 运行环境&#xff1a;VMware Workstation …

F9P使用说明

1.介绍 ZED-F9P简易使用说明&#xff0c;只是简单使用无需点击具体的链接。 使用硬件&#xff1a;F9P 软件&#xff1a;ucenter 22.07 2.数据类型 ublox接收机接收到的数据 NEMA数据:https://baike.baidu.com/item/NMEA-0183/1810482UBX数据&#xff1a;二进制的GNSS观测值…

12月更新!EasyOps全平台产品能力再升级,新增22+功能亮点解读~

哈喽伙伴们 又到了优维EasyOps全平台产品每月功能上新时间 转眼就到了2022年的最后一个月份 12月有些事情结束了 有些事情才刚刚开始 闲言少叙 咱们来看看12月上线了哪些新功能吧 HyperInsight 超融合监控 「APM」 支持通过日志采集接口数据和指标数据 丰富APM数据接…

哪里能够找到完整的信息安全标准

写在前面 早年刚参加信息安全工作更多的学点皮毛技术&#xff0c;到处找安全工具&#xff0c;跟踪poc&#xff0c;拿到一个就全网扫一遍&#xff0c;从来没有想过&#xff0c;系统化的安全工作应该怎样搞?我做的工作在安全体系中处于哪个阶段? 后来有机会做企业安全建设&…

刷爆力扣之仅含 1 的子串数

刷爆力扣之仅含 1 的子串数 HELLO&#xff0c;各位看官大大好&#xff0c;我是阿呆 &#x1f648;&#x1f648;&#x1f648; 今天阿呆继续记录下力扣刷题过程&#xff0c;收录在专栏算法中 &#x1f61c;&#x1f61c;&#x1f61c; 该专栏按照不同类别标签进行刷题&#x…

Chrome谷歌浏览器清空缓存并强制刷新页面

在项目开发过程中&#xff0c;用浏览器测试时很多时候会被浏览器自带的缓存搞得很头疼&#xff0c;那么下面会介绍几种方法实现Chrome浏览器清空缓存并强制刷新页面 1.通过快捷键方式 Windows和Linux操作系统&#xff1a;CtrlShiftR Mac OS: CmdShftR 这样就可以使用硬性重…

第十六章 强化学习

16.1 任务与奖赏 上图给出了强化学习的一个简单图示。强化学习任务通常用马尔可夫决策过程&#xff08;MDP&#xff09;来描述&#xff1a; 机器处于环境E中&#xff0c;状态空间为X&#xff0c;其中每个状态x∈X是机器感知到的环境的描述&#xff0c;如在种瓜任务上这就是当前…

java中的反射

反射 类加载的过程 当程序主动使用某个类时&#xff0c;如果该类还未被加载到内存中&#xff0c;则JVM会通过加载、连接、初始化3个步骤来对该类进行初始化。如果没有意外&#xff0c;JVM将会连续完成3个步骤&#xff0c;所以有时也把这个3个步骤统称为类加载或类初始化。加载…

MySQL面试常问问题(索引) —— 赶快收藏

目录 1.能简单说一下索引的分类吗&#xff1f; 2.为什么使用索引会加快查询&#xff1f; 3.创建索引有哪些注意点&#xff1f; 4.索引哪些情况下会失效呢&#xff1f; 5.索引不适合哪些场景呢&#xff1f; 6.索引是不是建的越多越好呢&#xff1f; 7.MySQL索引用的什么数…