Flink多流处理之Broadcast(广播变量)

news2024/12/22 22:20:17

写过Spark批处理的应该都知道,有一个广播变量broadcast这样的一个算子,可以优化我们计算的过程,有效的提高效率;同样在Flink中也有broadcast,简单来说和Spark中的类似,但是有所区别,首先Spark中的broadcast是静态的数据,而Flink中的broadcast是动态的,也就是源源不断的数据流.在Flink中会将广播的数据存到state中.
在这里插入图片描述
在Flink中主流数据可以获取state中的所有状态数据,使用过window的应该都清楚,当两个streamData中的数据到达窗口的时间刚好错过时就会发生关联不上的情况,如window2S,sreamData1到达窗口的时间刚好卡在这个2S窗口的尾端,而streamData到达窗口时,这个窗口已经结束了,这种情况就算这两条数据有相同id也无法进行关联了.
但是broadcast会将到达的数据都存储在state中,这样主流到达的每一条数据都可以和state中的广播流数据进行关联比较.
在这里插入图片描述
流程图内容可能不够准确,只是为了看起来方便理解.

  • 数据源
    # 主流数据
    ➜  ~ nc -lk 1234
    101,浏览商品,2023-08-02
    102,浏览商品,2023-08-02
    103,查看商品价格,2023-08-04
    101,商品加入购物车,2023-08-03
    101,从购物车删除商品,2023-08-03
    102,下单,2023-08-02
    102,申请延期发货,2023-08-03
    103,点击商品详情页,2023-08-04
    104,点击收藏,2023-08-05
    104,下单,2023-08-05
    104,付款,2023-08-06
    105,浏览商品,2023-08-07
    106,浏览商品,2023-08-07
    106,加入购物车,2023-08-08
    107,浏览商品,2023-08-10
    
    # 广播流数据
    ➜  ~ nc -lk 5678
    101,小明
    102,张丽
    103,公孙飞天
    104,王二虎
    106,李四
    108,赵屋面
    
  • 代码
    import org.apache.flink.api.common.state.BroadcastState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.streaming.api.datastream.*;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
    import org.apache.flink.util.Collector;
    
    /**
     * @Author: J
     * @Version: 1.0
     * @CreateTime: 2023/8/11
     * @Description: 多流操作-广播流
     **/
    public class FlinkBroadcast {
        public static void main(String[] args) throws Exception {
            // 构建流环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置并行度
            env.setParallelism(3);
            // 数据集源1作为主流数据(用户行为日志[id,behavior,date])
            DataStreamSource<String> sourceStream1 = env.socketTextStream("localhost", 1234);
            // 将字符串切割处理
            SingleOutputStreamOperator<Tuple3<String, String, String>> mainSourceStream = sourceStream1.map(str -> Tuple3.of(str.split(",")[0], str.split(",")[1], str.split(",")[2])).returns(new TypeHint<Tuple3<String, String, String>>() {
            });
            // 数据源2作为广播流数据(用户信息(id,name))
            DataStreamSource<String> sourceStream2 = env.socketTextStream("localhost", 5678);
            // 将字符串切割处理
            SingleOutputStreamOperator<Tuple2<String, String>> mapStream2 = sourceStream2.map(str -> Tuple2.of(str.split(",")[0], str.split(",")[1])).returns(new TypeHint<Tuple2<String, String>>() {
            });
            // 将广播流数据源进行广播
            /**
             *参数说明
             * 这里需要我们传入一个MapStateDescriptor,其实就是一个Map结构的数据<k,v>
             * <String, Tuple2<String, String>>,第一个String类型就是广播流和主流连接的字段,在这个代码中就是id,由实际业务决定
             * <String, Tuple2<String, String>>,第二个Tuple2<String, String>就是实际广播数据流的数据,由实际业务决定
             * "userInfo"就是给一个名字,这个自定义无强制要求
             **/
            // 先构建一个状态,后面也会使用
            MapStateDescriptor<String, Tuple2<String, String>> userInfoState = new MapStateDescriptor<>("userInfo", TypeInformation.of(String.class), TypeInformation.of(new TypeHint<Tuple2<String, String>>() {
            }));
            BroadcastStream<Tuple2<String, String>> userInfoBroadStream = mapStream2.broadcast(userInfoState);
    
            // 将主流数据和广播流数据使用connect连接
            /**
             * 我们将数据转变成广播流之后,在Flink中也不知哪个数据流需要使用这个广播流(userInfoBroadStream),
             * 这个时候就需要我们自己将主流数据和该广播流数据进行连接
             **/
            BroadcastConnectedStream<Tuple3<String, String, String>, Tuple2<String, String>> connectedStream = mainSourceStream.connect(userInfoBroadStream);
    
            /**
             * 在process()中有两类函数供我们选择,KeyedBroadcastProcessFunction和BroadcastProcessFunction,
             * 这里要注意当"connectedStream"是KeyedStream时选择KeyedBroadcastProcessFunction
             * 当"connectedStream"不是KeyedStream时选择BroadcastProcessFunction就可以.
             * 使用keyBy算子返回的就是KeyedStream
             **/
            SingleOutputStreamOperator<String> resultStream = connectedStream.process(new BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>() {
    
                // 这个方法写主流数据处理逻辑
                @Override
                public void processElement(Tuple3<String, String, String> value, BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {
                    /**
                     * 要注意,这里我们最好从ReadOnlyContext来获取广播状态数据,因为获取只读的状态数据可以保证数据的安全性,
                     * 如果是通过成员变量的方式获取可修改的状态数据,就会存在数据不安全的问题,如在代码逻辑中出现了对状态数据
                     * 修改的代码,那么共享此状态的并行算子可能看到的状态数据不一致,就会导致数据错误或者代码报错.
                     * 而使用ReadOnlyContext就可以保证processElement这个方法中我们只对状态数据进行读取.
                     **/
                    ReadOnlyBroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoState);
                    if (broadcastState != null) {
                        // 通过主流中的ID作为key获取广播变量中的用户信息
                        Tuple2<String, String> userInfo = broadcastState.get(value.f0);
                        // 输出数据的形式(id,behavior,date,name)
                        if (userInfo == null) {
                            out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + "NULL");
                        } else {
                            out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + userInfo.f1);
                        }
                    } else {
                        out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + "NULL");
                    }
    
                }
    
                // 这个方法写广播流数据处理逻辑
                @Override
                public void processBroadcastElement(Tuple2<String, String> value, BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>.Context ctx, Collector<String> out) throws Exception {
                    // 使用Context获取状态
                    BroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoState);
    
                    // 将数据存入到状态中
                    broadcastState.put(value.f0, value);
                }
            });
            // 打印结果
            resultStream.print();
    
            env.execute("Flink broadcast");
        }
    }
    
  • 结果
    3> 101,浏览商品,2023-08-02,小明
    3> 101,商品加入购物车,2023-08-03,小明
    3> 102,申请延期发货,2023-08-03,张丽
    3> 104,下单,2023-08-05,王二虎
    3> 106,浏览商品,2023-08-07,李四
    1> 102,浏览商品,2023-08-02,张丽
    1> 101,从购物车删除商品,2023-08-03,小明
    1> 103,点击商品详情页,2023-08-04,公孙飞天
    1> 104,付款,2023-08-06,王二虎
    1> 106,加入购物车,2023-08-08,李四
    2> 103,查看商品价格,2023-08-04,公孙飞天
    2> 102,下单,2023-08-02,张丽
    2> 104,点击收藏,2023-08-05,王二虎
    2> 105,浏览商品,2023-08-07,NULL
    2> 107,浏览商品,2023-08-10,NULL
    
    代码内容就不进行详细解释了,注释基本都写清楚了,如有疑问可评论提问,共同探讨.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/866482.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

docker-compose redis 一直启动失败

环境&#xff1a; centos 8.x 背景 使用docker-compose 来启动redis docker-compose.yml 如下&#xff1a; version: 3.3 services:redis:image: redis:latestrestart: alwayscontainer_name: redisports:- 6379:6379volumes:- ./data:/redis/data- ./redis.conf:/redis/re…

JMeter 查看 TPS 数据,详细指南

TPS 是软件测试结果的测量单位。一个事务是指一个客户机向服务器发送请求然后服务器做出反应的过程。客户机在发送请求时开始计时&#xff0c;收到服务器响应后结束计时&#xff0c;以此来计算使用的时间和完成的事务个数。在 JMeter 中&#xff0c;我们可以使用以下方法查看 T…

AI一键生成数字人

AI一键生成数字人,不玩虚的 阅读时长&#xff1a;10分钟 本文内容&#xff1a; 结合开源AI&#xff0c;一键生成短视频发布到常见的某音&#xff0c;某手平台&#xff0c;狠狠赚一笔 前置知识&#xff1a; 基本的 python 编程知识Jupyter Notebook 使用过Linux 使用过 先上源码…

Linux Idea启动项目打印堆栈日志(JMX监控日志)

说明 Idea更新至新版本&#xff08;2023.1&#xff09;后&#xff0c; 在Linux环境下默认会开启JMX监控并输出日志。 关闭JMX监控 打开Configurations配置面板。打开Modify options(ALt M)选项面板。勾选Disable JMX endpoints。 修改Configurations模板 确定不需要打印…

【算法题】螺旋矩阵I (求解n阶螺旋矩阵问题)

一、问题的提出 螺旋矩阵是一种常见的矩阵形式&#xff0c;它的特点是按照螺旋的方式排列元素。n阶螺旋矩阵是指矩阵的大小为nn&#xff0c;其中n为正整数。 二、解决的思路 当N1时&#xff0c;矩阵为; 当N2时&#xff0c;矩阵为; 当N>2(N为偶数如N4)时&#xff0c;矩阵…

通达OA SQL注入漏洞【CVE-2023-4166】

通达OA SQL注入漏洞【CVE-2023-4166】 一、产品简介二、漏洞概述三、影响范围四、复现环境POC小龙POC检测工具: 五、修复建议 免责声明&#xff1a;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损…

简单网络-跨网段通信

跨网段通信 1.为PC1&#xff0c;PC2配置IP地址、子网掩码和网关后用PC1 ping PC2&#xff0c;并用wireshark抓包g0/0/2的数据包。结果发现不能ping通&#xff0c;而PC1在发送广播包&#xff0c;寻找10.0.1.254网关&#xff0c;说明PC1找不到网关 2.给路由器e0/0/0端口配置网关1…

git一次错误merge的回滚

场景&#xff1a;提交到sit的代码&#xff0c;结果解决冲突merge了DEV的代码&#xff0c;所以要回滚到合并之前的代码 &#xff08;原因是我再网页上处理了冲突&#xff0c;他就自动merge了,如图—所以还是idea处理冲突&#xff0c;可控&#xff09; 方式二&#xff1a; &…

【Java多线程】CompletableFuture 异步多线程

1. 回顾 Future 一些业务场景我们需要使用多线程异步执行任务&#xff0c;加快任务执行速度。 JDK5新增了Future接口&#xff0c;用于描述一个异步计算的结果。 虽然 Future 以及相关使用方法提供了异步执行任务的能力&#xff0c;但是对于结果的获取却是很不方便&#xff0…

docker安装Nacos的《小白专用》详细教程

1.CentOS安装docker 安装docker yum -y install docker 设置开机自启 systemctl enable docker 启动docker systemctl start docker 查看docker当前的版本 docker version做到这里呢基本上你的docker就安装了一大部分了&#xff0c;当然也有那些无法安装的人&#xff0c;那我建…

prometheus监控k8s服务并告警到钉钉

一、监控k8s集群 要监控k8s集群需要使用到以下服务用于收集监控的资源信息&#xff0c;node_exporter用于监控k8s集群节点的资源信息&#xff0c;kube-state-metrics用于监控k8s集群的deployment、statefulset、daemonset、pod等的状态&#xff0c;cadvisor用于监控k8s集群的p…

爆肝整理,Python自动化测试-Pytest参数化实战封装,一篇打通...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 参数化&#xff1…

Gradio:交互式Python数据应用程序的新前沿

一、说明 什么是Gradio以及如何使用Gradio在Python中创建DataApp或Web界面&#xff1f;使用 Gradio 将您的 Python 数据科学项目转换为交互式应用程序。 摄影&#xff1a;Elijah Merrell on Unsplash Gradio是一个Python库&#xff0c;允许我们快速为机器学习模型创建可定制的接…

工程英语翻译怎样做效果比较好

我们知道&#xff0c;高质量的工程翻译可以有效指导工程项目操作的执行&#xff0c;但市场上专业的工程英语翻译人才严重不足。那么&#xff0c;工程英语翻译难吗&#xff0c;怎样翻译工程英语比较好&#xff1f; 业内人士指出&#xff0c; 工程翻译具有用词专业、涉及领域广、…

Python(八十一)字符串的常用操作——字符串判断的相关方法

❤️ 专栏简介&#xff1a;本专栏记录了我个人从零开始学习Python编程的过程。在这个专栏中&#xff0c;我将分享我在学习Python的过程中的学习笔记、学习路线以及各个知识点。 ☀️ 专栏适用人群 &#xff1a;本专栏适用于希望学习Python编程的初学者和有一定编程基础的人。无…

Python3 安装、环境变量配置、PyCharm新建Python项目

一、安装包下载 Pyhton官网下载>>最新稳定版的安装包&#xff1a; 找到合适的版本进行下载&#xff1a; 如果下载较慢&#xff0c;此处提供一个3.10.11的稳定版本的安装包&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/16GnWjkGFuSfWfaI9UVX8qA?pwd4u5o 提取…

案例13 Spring MVC参数传递案例

基于Spring MVC实现HttpServletRequest、基本数据类型、Java Bean、数组、List、Map、JSON方式的参数传递。 1. 创建项目 选择Maven快速构建web项目&#xff0c;项目名称为case13-springmvc02。 2. 配置Maven依赖 <?xml version"1.0" encoding"UTF-8&quo…

【JavaScript】怎么测试方法的兼容性

利用网站测试方法的兼容性 打开网站&#xff1a;https://caniuse.com在里面输入要检测的方法&#xff0c;红色代表不支持&#xff0c;绿色代码支持。

Linux:Shell编程之正则表达式

目录 绪论 1、正则表达式 1.1 通配符 1.2 正则表达式分类 1.3 基本正则 1.4 正则表达式中表示次数的表达式 1.5 位置锚定 1.5.1 词首锚定和词尾锚定 1.6 分组&#xff08;&#xff09; 1.7 逻辑或 1.8 扩展正则 绪论 正则表达式&#xff1a;有一类特殊字符以及文本…

10-1_Qt 5.9 C++开发指南_Data Visualization实现数据三维显示

Data Visualization 是 Qt 提供的用于数据三维显示的模块。在 Qt 5.7 以前只有商业版才有此模块&#xff0c;而从Qt5.7 开始此模块在社区版本里也可以免费使用了。Data Visualization 用于数据的三维显示&#xff0c;包括三维柱状图、三维空间散点、三维曲面等。Data Visualiza…