spark 事件总线listenerBus

news2025/1/22 12:57:20

事件总线基本流程

图片来源:https://blog.csdn.net/sinat_26781639/article/details/105012302

LiveListenerBus创建

在sparkContext初始化中创建LiveListenerBus对象。

主要变量有两个

  • queues:事件队列,里面存放四个队列,每个队列中有对应的listener注册
  • queuedEvents:发生的事件


注册listener

内部listener注册

举例DynamicAllocation,在SpackContext初始化的时候创建了ExecutorAllocationManager对象,再调用start方法。

在ExecutorAllocationManager的start方法中,调用listenerBus相关方法完成注册

LiveListenerBus中queues分成四个队列,分别对应不同的注册方法,最终都是调用addToQueue方法

addToQueue比较简单,就是从queues中获取,有AsyncEventQueue就加入listener,没有就创建再加入。加入是调用AsyncEventQueue的addListener方法。

AsyncEventQueue的addListener方法是父类ListenerBus的addListener方法。
将listener加入到listenersPlusTimers中


内部listener是放入了对应队列的listenersPlusTimers中。

外部listener注册

在内部listener注册完成后调用setupAndStartListenerBus注册外部listener

在setupAndStartListenerBus方法中,读取配置spark.extraListeners获取要注册的外部listener集合,使用反射创建listener对象,遍历调用addToSharedQueue加入share queue。最后调用listenerBus的start方法启动listenerBus。

事件总线启动

如上图,在listener全部完成注册后,调用listenerBus的start方法启动。
将started变量修改为true,标记listenerBus启动。遍历queues,启动有注册的队列AsyncEventQueue(总共四个队列,但不一定全部都启用)。遍历queuedEvents处理在listenerBus没有启动期间产生的event。最后不再需要缓存消息了,将queuedEvents置为空。

AsyncEventQueue启动
将started变量变成true,标记AsyncEventQueue启动。启动dispatchThread线程。

dispatchThread线程是调用dispatch方法。
在dispatch方法中,可以看到是循环读取eventQueue,从其中读取event,调用postToAll发送给全部的listener。
eventQueue是一个阻塞队列LinkedBlockingQueue


到此,listenerBus启动完成,其中的列队AsyncEventQueue也启动完成。AsyncEventQueue循环从eventQueue中获取event来处理(这里是阻塞的)

发送消息

发送消息的入口是调用LiveListenerBus的post方法。
如果还没有启动,就将消息先缓存到queueEvents中。
如果启动了,就调用postToQueues将消息发送给全部队列。
在postToQueues中是遍历queue,调用post方法。

AsyncEventQueue的post方法中,就是将消息放入eventQueue即可。
但是eventQueue是有容量大小的,超过的消息就会丢弃。

至此,发送消息完成,将消息放入到AsyncEventQueue的eventQueue中。

处理消息

在启动的时候,dispatch线程已经完成了启动,从eventQueue获取event来处理。
处理消息是调用父类postToAll方法

postToAll方法中是遍历该队列全部listener,调用doPostEvent方法。

doPostEvent对应是SparkListenerBus的doPostEvent方法,根据event的类型,调用listener的不同的方法。

listener是要实现的SparkListenerInterface的方法,可以看到方法很多。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1933785.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python gradio 的输出展示组件

HTML:展示HTML内容,适用于富文本或网页布局。JSON:以JSON格式展示数据,便于查看结构化数据。KeyValues:以键值对形式展示数据。Label:展示文本标签,适用于简单的文本输出。Markdown:…

独立游戏《星尘异变》UE5 C++程序开发日志6——实现存档和基础设置系统

目录 一、存档类 1.创建一个SaveGame类 2.存储关卡内数据 3.加载关卡数据 4.关于定时器 5.存储全局数据 6.加载全局数据 二、存档栏 1.存档栏的数据结构 2.创建新存档 3.覆盖已有存档 4.删除存档 三、游戏的基础设置 1.存储游戏设置的数据结构 2.初始化设置 3.…

JavaScript基础 第四弹 学习笔记

函数 1、为什么需要函数?可以实现代码复用,提高开发效率。 函数的定义 :函数function,是被设计为执行特定任务的代码块。 函数可以把具有相同或相似逻辑的代码‘包裹’起来,通过函数调用执行这些被“包裹”的代码逻…

羊大师:羊奶精华,补钙免疫,养颜促消化

在浩瀚的自然馈赠中,羊奶以其独特的精华,成为了现代人追求健康生活的优选。它不仅仅是一种饮品,更是大自然赋予我们的宝贵滋养圣品。 补钙免疫,守护健康基石 羊奶中富含的钙质,是构建强健骨骼的基石。其高吸收率的特性…

免杀笔记 ----> 动态调用

前一段时间不是说要进行IAT表的隐藏吗,终于给我逮到时间来写了,今天就来先将最简单的一种方式 ----> 动态调用!!! 1.静态查杀 这里还是说一下我们为什么要对他进行隐藏呢??&#xff1…

HBuilderX打包流程(H5)?HBuilder如何发布前端H5应用?前端开发怎样打包发布uniapp项目为h5?

打包步骤: 1、打开hbuilder x》发行》网站-PC Web或手机H5(仅适用于uni-app)(H) 2、面板里的所有信息都可以不填,也不用勾选》直接点击【发行】即可 3、打包成功: 4、部署 按照打包后的路径,找到打包好的文件夹,把文…

【前端数据层高可用架构】

前端数据层高可用架构 前后端架构模式如下图 在这个架构下,客端数据可用率计算方式: 因此整体数据可用性分析表如下: 只有在客端和 BFF 都正常的情况下数据才能可用,而这种情况占比不是很高,因此整体的用户体验就不是很好。 本次建设目标 本文的设计方案就是要解决…

【前端】表单密码格式—校验。

如图:实现表单输入密码和确认密码的时候进行表单校验。 实现方式: 1.在代码的data里面定义,函数验证的方法。如图所示,代码如下 【代码】如下: const validatePassword (rule, value, callback) > {if (value ) {callback(n…

Java SpringBoot 若依 后端实现评论“盖楼“,“楼中楼“功能 递归查询递归组装评论结构

效果图 数据库设计 还可以使用路径模块 一级评论id,二级评论id, 用like最左匹配原则查询子评论 因为接手遗留代码&#xff0c;需要添加字段&#xff0c;改动数据库&#xff0c;我就不改动了&#xff0c;导致我下面递归查询子评论不是很好。 业务代码 Overridepublic List<S…

C++类与对象(补)

感谢大佬的光临各位&#xff0c;希望和大家一起进步&#xff0c;望得到你的三连&#xff0c;互三支持&#xff0c;一起进步 个人主页&#xff1a;LaNzikinh-CSDN博客 文章目录 前言一.默认成员函数二.static三.友元四.匿名对象总结 前言 类的默认成员函数&#xff0c;默认成员…

充电宝选哪个好?选充电宝主要看什么?充电宝攻略请收下!

当我们的手机、平板等设备电量告急时&#xff0c;充电宝就如同一位救星&#xff0c;为我们解决燃眉之急。然而&#xff0c;面对市场上琳琅满目的充电宝产品&#xff0c;“充电宝选哪个好&#xff1f;”这一问题常常让我们感到困惑。选择一款合适的充电宝并非易事&#xff0c;需…

Qt支持LG高级汽车内容平台

Qt Group与LG 电子&#xff08;简称LG&#xff09;正携手合作&#xff0c;将Qt软件框架嵌入其基于 webOS的ACPLG车载娱乐平台&#xff0c;用于应用程序开发。该合作旨在让原始设备制造商&#xff08;OEM&#xff09;的开发者和设计师能为汽车创建更具创新性的沉浸式汽车内容流媒…

ClickHouse集成LDAP实现简单的用户认证

1.这里我的ldap安装的是docker版的 docker安装的化就yum就好了 sudo yum install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin sudo systemctl start docker 使用下面的命令验证sudo docker run hello-world docker pull osixia/openl…

SQL Server Query Store Settings (查询存储设置)

参考&#xff1a;Query Store Settings - Erin Stellato 在 SQL Server 2017 中&#xff0c;有九 (9) 个设置与查询存储相关。虽然这些设置记录在sys.database_query_store_options中&#xff0c;但我经常被问到每个设置的值“应该”是多少。我在下面列出了每个设置&am…

Puppeteer动态代理实战:提升数据抓取效率

引言 Puppeteer是由Google Chrome团队开发的一个Node.js库&#xff0c;用于控制Chrome或Chromium浏览器。它提供了高级API&#xff0c;可以进行网页自动化操作&#xff0c;包括导航、屏幕截图、生成PDF、捕获网络活动等。在本文中&#xff0c;我们将重点介绍如何使用Puppeteer…

移动打车项目

1.技术栈&#xff1a; AndroidJNIHTTPSlibeventmysql/redis高德地图 2.概要流程设计 1.注册 2.登录 3.司机地理位置上传更新 4.乘客地理位置上传更新 5.乘客下单流程 6.司机完成订单流程

[AWS]EKS启动HPA,HPA指标<unknown>,报错:error: Metrics API not available

背景&#xff1a;在AWS上创建的EKS集群&#xff0c;想要对于deployment部署HPA&#xff0c;来autoscling副本数。 1.HPA一般基于CPU或者内存对副本数进行控制&#xff0c;所以必须需要Metrics Server。 &#xff08;Metrics Server 是 Kubernetes 集群的一个关键组件&#xff0…

7、自定义管理站点

目录 1、自定义后台表单2、添加关联的对象3、自定义后台更改列表4、自定义后台界面和风格&#xff08;1&#xff09;自定义你的工程的模板&#xff08;2&#xff09;自定义你应用的模板 1、自定义后台表单 通过admin.site.register(Question)注册Question模型&#xff0c;Djan…

AI自动生成PPT哪个软件好?高效制作PPT优选这4个

7.15初伏的到来&#xff0c;也宣告三伏天的酷热正式拉开序幕~在这个传统的节气里&#xff0c;人们以各种方式避暑纳凉&#xff0c;享受夏日的悠闲时光。 而除了传统的避暑活动&#xff0c;我们还可以用一种新颖的方式记录和分享这份夏日的清凉——那就是通过PPT的方式将这一传…

抖音视频素材去哪里找啊?视频素材网站库分享

在这个视觉盛宴的抖音平台上&#xff0c;高质量和有趣的视频素材常常是吸引观众的重要钥匙。如果你也正在寻找那些能让你的视频作品更加出色的资源&#xff0c;那么恭喜你&#xff0c;今天我将为你介绍10个超实用的视频素材网站&#xff0c;让你的抖音视频创作充满创意和效率。…