Spark/Flink广播实现作业配置动态更新

news2024/11/6 10:01:52

前言

在实时计算作业中,往往需要动态改变一些配置,举几个栗子:

  • 实时日志ETL服务,需要在日志的格式、字段发生变化时保证正常解析;
  • 实时NLP服务,需要及时识别新添加的领域词与停用词;
  • 实时风控服务,需要根据业务情况调整触发警告的规则。

那么问题来了:配置每次变化都得手动修改代码,再重启作业吗?答案显然是否定的,毕竟实时任务的终极目标就是7 x 24无间断运行。Spark Streaming和Flink的广播机制都能做到这点,本文分别来简单说明一下。

Spark Streaming的场合

Spark Core内部的广播机制: 广播变量(broadcast variable)的设计初衷是简单地作为只读缓存,在Driver与Executor间共享数据,Spark文档中的原话如下:

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.

也就是说原生并未支持广播变量的更新,所以我们得自己稍微hack一下。直接贴代码吧。

public class BroadcastStringPeriodicUpdater {
    private static final int PERIOD = 60 * 1000;
    private static volatile BroadcastStringPeriodicUpdater instance;
    private Broadcast < String > broadcast;
    private long lastUpdate = 0 L;
    private BroadcastStringPeriodicUpdater() {}
    public static BroadcastStringPeriodicUpdater getInstance() {
        if (instance == null) {
            synchronized(BroadcastStringPeriodicUpdater.class) {
                if (instance == null) {
                    instance = new BroadcastStringPeriodicUpdater();
                }
            }
        }
        return instance;
    }
    public String updateAndGet(SparkContext sc) {
        long now = System.currentTimeMillis();
        long offset = now - lastUpdate;
        if (offset > PERIOD || broadcast == null) {
            if (broadcast != null) {
                broadcast.unpersist();
            }
            lastUpdate = now;
            String value = fetchBroadcastValue();
            broadcast = JavaSparkContext.fromSparkContext(sc).broadcast(value);
        }
        return broadcast.getValue();
    }
    private String fetchBroadcastValue() {}
}

这段代码将字符串型广播变量的更新包装成了一个单例类,更新周期是60秒。在Streaming主程序中,就可以这样使用了:

dStream.transform(rdd - > {
    String broadcastValue = BroadcastStringPeriodicUpdater.getInstance().updateAndGet(rdd.context());
    rdd.mapPartitions(records - > {});
});

这种方法基本上解决了问题,但不是十全十美的,因为广播数据的更新始终是周期性的,并且周期不能太短(得考虑外部存储的压力),从根本上讲还是受Spark Streaming微批次的设计理念限制的。接下来看看Flink是怎样做的。

Flink的场合

Flink中也有与Spark类似的广播变量,用法也几乎相同。但是Flink在1.5版本引入了更加灵活的广播状态(broadcast state),可以视为operator state的一种特殊情况。它能够将一个流中的数据(通常是较少量的数据)广播到下游算子的所有并发实例中,实现真正的低延迟动态更新。

下图来自Data Artisans(被阿里收购了的Flink母公司)的PPT,其中流A是普通的数据流,流B就是含有配置信息的广播流(broadcast stream),也可以叫控制流(control stream)。流A的数据按照keyBy()算子的规则发往下游,而流B的数据会广播,最后再将这两个流的数据连接到一起进行处理。

既然它的名字叫“广播状态”,那么就一定要有与它对应的状态描述符StateDescriptor。Flink直接使用了MapStateDescriptor作为广播的状态描述符,方便存储多种不同的广播数据。示例:

MapStateDescriptor < String, String > broadcastStateDesc = new MapStateDescriptor < > ("broadcast-state-desc", String.class, String.class);

接下来在控制流controlStream上调用broadcast()方法,将它转换成广播流BroadcastStream。controlStream的产生方法与正常数据流没什么不同,一般是从消息队列的某个特定topic读取。

BroadcastStream<String> broadcastStream = controlStream .setParallelism(1) .broadcast(broadcastStateDesc);

然后在DataStream上调用connect()方法,将它与广播流连接起来,生成BroadcastConnectedStream。

BroadcastConnectedStream<String, String> connectedStream = sourceStream.connect(broadcastStream);

最后就要调用process()方法对连接起来的流进行处理了。如果DataStream是一个普通的流,

需要定义BroadcastProcessFunction,反之,如果该DataStream是一个KeyedStream,

就需要定义KeyedBroadcastProcessFunction。

并且与之前我们常见的ProcessFunction不同的是,它们都多了一个专门处理广播数据的方法

processBroadcastElement()。类图如下所示。

下面给出一个说明性的代码示例。

connectedStream.process(new BroadcastProcessFunction < String, String, String > () {
    private static final long serialVersionUID = 1 L;@
    Override
    public void processElement(String value, ReadOnlyContext ctx, Collector < String > out) throws Exception {
        ReadOnlyBroadcastState < String, String > state = ctx.getBroadcastState(broadcastStateDesc);
        for (Entry < String, String > entry: state.immutableEntries()) {
            String bKey = entry.getKey();
            String bValue = entry.getValue();
            // 根据广播数据进行原数据流的各种处理
        }
        out.collect(value);
    }@
    Override
    public void processBroadcastElement(String value, Context ctx, Collector < String > out) throws Exception {
        BroadcastState < String, String > state = ctx.getBroadcastState(broadcastStateDesc);
        // 如果需要的话,对广播数据进行转换,最后写入状态
        state.put("some_key", value);
    }
});

可见,BroadcastProcessFunction的行为与RichCoFlatMapFunction、CoProcessFunction非常相像。其基本思路是processBroadcastElement()方法从广播流中获取数据,进行必要的转换之后将其以键值对形式写入BroadcastState。而processElement()方法从BroadcastState获取广播数据,再将其与原流中的数据结合处理。也就是说,BroadcastState起到了两个流之间的桥梁作用。
最后还有一点需要注意,processElement()方法获取的Context实例是ReadOnlyContext,说明只有在广播流一侧才能修改BroadcastState,而数据流一侧只能读取BroadcastState。这提供了非常重要的一致性保证:假如数据流一侧也能修改BroadcastState的话,不同的operator实例有可能产生截然不同的结果,对下游处理造成困扰。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/544929.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

访问学者J1签证面签的七个问题

作为访问学者&#xff0c;申请J1签证面签时可能会遇到一些常见问题。下面知识人网小编将介绍七个访问学者面签可能遇到的问题&#xff0c;并提供相应的答案。 问题一&#xff1a;您将在美国进行何种类型的学术研究&#xff1f; 答案&#xff1a;我将在美国从事学术研究&#x…

普冉PY32L020单片机简介,主频最高48MHZ

PY32L020单片机是一颗32 位 ARM Cortex-M0内核&#xff0c;宽电压工作范围的 MCU。这颗MCU的价格跟八位单片机相差不大&#xff0c;性价比可以说是非常的高了。来看看PY32L020的配置吧。 PY32L020单片机产品特性&#xff1a; 内核&#xff1a; — 32 位 ARM Cortex - M0 — 最…

飞浆AI studio人工智能课程学习(2)-Prompt优化思路|十个技巧高效优化Prompt|迭代法|Trick法|通用法|工具辅助

文章目录 优化思路上节课的例子问题分析思路解析 Prompt优化技巧Prompt优化原理 十个技巧高效优化Prompt迭代法Trick法工具法通用技巧│定基础通用技巧│做强调需求强调怎么做&#xff1f; 通用技巧│提预设Trick法│戴高帽原理 Trick法│说好话以基础计算为例: Trick法│给提示…

小红书数据分析:如何用ChatGPT输出爆文笔记

ChatGPT的热度依旧不减&#xff0c;随着技术升级&#xff0c;越来越多更高级的玩法被发掘。今天我们就来聊聊&#xff0c;如何用ChatGPT写出小红书风格的文章。 首先&#xff0c;小红书笔记制作分为两个步骤&#xff1a; 1、找选题 2、写小红书风格的笔记 我们用例子说话&a…

全国自考本科通过率仅7%,为什么还有这么多人报考?

根据教育部官网公布的《2021年全国教育事业发展统计公报》得知&#xff1a;2021年&#xff0c;全国高等教育自学考试学历教育报考625.78万人次&#xff0c;取得毕业证书48.94万人。也就是说2021年我国自考平均通过率大概在7%左右。 自考通过率为什么这么低&#xff1f; ①自考…

Android平台外部编码数据(H264/H265/AAC/PCMA/PCMU)实时预览播放技术实现

开发背景 好多开发者可能疑惑&#xff0c;外部数据实时预览播放&#xff0c;到底有什么用&#xff1f; 是的&#xff0c;一般场景是用不到的&#xff0c;我们在开发这块前几年已经开发了非常稳定的RTMP、RTSP直播播放模块&#xff0c;不过也遇到这样的场景&#xff0c;部分设…

“ChatGPT之父”勇闯币圈!数十亿人的空投计划,只需交出你的虹膜?

最近&#xff0c;Worldcoin&#xff08;世界币&#xff09;热度持续提升&#xff0c;这个由OpenAI创始人SamAltman亲自操持的加密项目&#xff0c;让沉寂已久的币圈开始躁动起来。 虽然Worldcoin并非最新项目&#xff0c;但颇具乌托邦色彩的理念&#xff0c;独特的虹膜识别机制…

10几个国产免费ChatGPT网页版(内附体验网址)

文章目录 前言1. AI文本工具站效率工具自媒体创作工具代码工具 2.道和顺ChatIC3.星期五4.文心一言5.讯飞星火认知大模型6.通义千问7.商汤-日日新8.Moss9.ChatGLM10. 360智脑写在最后 前言 随着ChatGPT迅速走红,国内各大企业纷纷发力认知大模型领域。经过一段时间的酝酿&#x…

剩余参数、扩展运算符、将伪数组转换伪数组

剩余参数 剩余参数语法允许我们将一个不定数量的参数表示为一个数组 语法: array.forEach(function(currentValue,index,arr){}) currentValue:数组当前项的值index:数组当前项的索引arr:数组对象本身 示例 <!DOCTYPE html> <html lang"en"><h…

实力认证!开源网安入选《2023年中国网络安全市场全景图》

近日&#xff0c;数说安全正式发布《2023年中国网络安全市场全景图》&#xff08;以下简称全景图&#xff09;。开源网安凭借强大的技术能力与服务经验上榜本次全景图开发安全/DevSecOps、应用安全测试&#xff08;AST&#xff09;、软件成分分析&#xff08;SCA&#xff09;、…

野火STM32电机系列(三)Cubemx配置CAN通信

CAN接口: PI9 PB9 1.配置CAN 通信参数 由于F4的 CAN外设挂载在APB1上&#xff0c;时钟配置后APB1的时钟速率为42MHz&#xff0c;目标通信速率为1000KHz&#xff0c;由公式&#xff1a; BaudRate 1/NominalBitTime NominalBitTime 1tq tBS1 tBS2 设置参数如下&#xff1a;…

pip 安装 pytorch

一.使用pip安装 pytorch pytorch博客教程 最好先创建一个虚拟环境&#xff0c;因为如果在同一环境&#xff0c;升级某一个包可能会导致另一个包无法使用&#xff0c;比如 a的包是1.0&#xff0c;b的包是1.0依赖a的1.0&#xff0c;然后a升级为1.1&#xff0c;b可能无法使用 创…

Typora 含多图片笔记快速上传到 CSDN 上发表

Typora 含多图片笔记快速上传到 CSDN 上发表 适用场景解决方案具体步骤 适用场景 在 Typora 里面记笔记,上传的图片是本地保存的,如果要将笔记上传到 CSDN 上发表的话,图片得一张一张地拖拽非常麻烦 解决方案 Typora PicGo Gitee 具体步骤 先安装 PicGo 和 node.js, 创建…

开源之夏 2023 | 欢迎报名Rust相关项目

开源之夏是中国科学院软件研究所联合openEuler发起的开源软件供应链点亮计划系列暑期活动&#xff0c;旨在鼓励在校学生积极参与开源软件的开发维护&#xff0c;促进优秀开源软件社区的蓬勃发展。活动联合各大开源社区&#xff0c;针对重要开源软件的开发与维护提供项目&#x…

java+springboot 做日志链路追踪

一、 为什么要做日志链路追踪 日志链路追踪&#xff08;Log Path Tracing&#xff09;是Spring Boot项目的一项关键功能&#xff0c;它通过将日志消息的源头与其对应的请求或响应路径相关联&#xff0c;实现对日志数据的可视化跟踪。随着项目规模的扩大和复杂性的增加&#xf…

Padding, Spacer, Initializer 的使用

1. Padding 的使用 1.1 样式一 1) 实现 func testText1()-> some View{Text("Hello, World!").background(Color.yellow) // 背景颜色//.padding() // 默认间距.padding(.all, 10) // 所有的间距.padding(.leading, 20) // 开始的间距.ba…

真题详解(数字签名算法)-软件设计(七十八)

真题详解(有限自动机)-软件设计&#xff08;七十七)https://blog.csdn.net/ke1ying/article/details/130748759 可用于数字签名算法的是_____。 答案&#xff1a;非对称RSA 移植性&#xff1a;易安装、易替换、适应性。 UML状态图转换不正确的是______。 活动可以在转换时执…

药包材国家标准ybb2020-电子版在线阅读

国家药包材标准对于药品的质量和安全至关重要&#xff0c;因此需要查阅国家药包材标准来确保药品的质量和安全。 对于一些医药生产企业、药品检验机构、药品注册申请人、医疗机构来说他们查阅相关国家药包材标准可以说是轻车熟路&#xff0c;但对于部分新入行或普通人群想要了…

要想工作流程更简便,试试开源web表单设计器

繁杂的工作流程&#xff0c;让您头疼不已&#xff1f;传统的表单制作效率低&#xff1f;内部数据迟迟得不到有效管理&#xff1f;…作为职场人的你&#xff0c;是否经常遇到上述问题。别着急&#xff0c;在如今的快节奏发展时代&#xff0c;传统的表单制作已经满足不了行业和市…

python3 爬虫相关学习5: python相关工具:anaconda,sublime_text等等

前言 1 作为一个中国人坚决不用notepad 2 sublimeText 3 anaconda 1 sublime Text 下载地址 Sublime Text - Text Editing, Done Righthttp://www.sublimetext.com/ 下载是个绿色包解压缩即可用快捷方式需要自己剪切 2 导航器/浏览器 /平台 Anaconda 下载地址 Anaconda…