案例1.spark和flink分别实现作业配置动态更新案例

news2025/2/12 13:44:51

目录

 目录

一、背景

二、解决

1.方法1:spark broadcast广播变量

a. 思路

b. 案例

① 需求

② 数据

③ 代码

2.方法2:flink RichSourceFunction

a. 思路

b. 案例

① 需求

② 数据

③ 代码

④ 测试验证

测试1

测试2

测试3


一、背景

         在实时作业(如 Spark Streaming、Flink 等流处理作业)中,通过外部配置管理系统动态修改配置,有以下优点:

         1. 无需重启作业,实现配置热更新
好处:实时作业通常需要长时间运行,重启会导致数据丢失或处理延迟。通过外部配置动态更新,可以在作业运行时修改配置(如并行度、窗口大小、超时时间等),而无需重启作业。

         2. 灵活应对业务需求变化
好处:实时作业通常需要快速响应业务需求的变化(如规则调整、参数优化等)。通过外部配置管理,可以快速更新业务逻辑或参数,而无需重新部署代码。

        3. 集中化管理配置
好处:将配置集中存储在外部的配置管理系统(如 ZooKeeper、Consul、Nacos、数据库等),便于统一管理和维护。多个作业可以共享同一份配置,避免配置分散和重复。

        通常初始化sparkConf 作为 Spark 应用程序的配置对象,在运行时设置配置参数,但是大多数配置在 SparkContext 初始化后无法更改了。

        如果想让程序在运行时做参数的动态调整,以下有两种思路可供参考,通过代码案例可以更深一步理解。

二、解决

1.方法1:spark broadcast广播变量

a. 思路

        可以将数据字典或者规则放入文件当中,spark读取加载,并将这些字典配置广播发送到各个节点。

b. 案例

① 需求

        根据ip地址查找其所属,实现根据ip对应所属的规则,将该规则广播出去,为数据中的ip附加所属。将ip与所属地址的规则broadcast广播到各个executor中。


② 数据

access.log
样例:字符分隔\t 分别是 时间\tip\t网址

20161127172603 12.197.80.128 http://scala.bjut.com.cn/scala/course/8.html
20161127172605 128.44.80.128 http://java.bjut.com.cn/java/course/31.html
20161127172605 12.18.80.188 http://java.bjut.com.cn/java/course/22.html
20161127172610 197.160.85.168 http://java.bjut.com.cn/java/course/18.html

实现根据ip对应所属的规则,将该规则广播出去,为数据中的ip附加所属。实现代码如下:

③ 代码

IpLocation.scala

package com.spark.demo.broadcast

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

/**
 * 根据ip地址查找其所属
 * 将ip与所属地址的规则broadcast广播到各个executor中
 */
object IpLocation {

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("ip location").setMaster("local[2]")
        val sc = new SparkContext(conf)

        val ipRulesRdd = sc.textFile("src\\com\\spark\\demo\\broadcast\\ip-by-country.csv").map { line => 
            val fields = line.split(",")
            val startIpNum = fields(2)
            val endIpNum = fields(3)
            val cityName = fields(5)
            (startIpNum, endIpNum, cityName)
        }
        //全部的ip映射规则
        val ipRulesArray = ipRulesRdd.collect //将数据收集到driver,为后面广播做准备。该变量值在driver中有,worker中不存在
        val bIpRules = sc.broadcast(ipRulesArray)

        //加载要处理的数据
        val ipsRdd = sc.textFile("src\\com\\spark\\demo\\broadcast\\access.log").map { line =>  
            val fields = line.split("\t")
            val ip = fields(1)
            ip
        }

        val result = ipsRdd.map { ip => 
            val ipNum = IpUtil.ip2Long(ip)
            val info = IpUtil.searchContentByKey(bIpRules.value, ipNum)
            val countryName = info.split(",")(2)
            (ip, countryName)
        }

        result.collect().foreach(println)
        sc.stop()
    }

}

IpUtil.scala

package com.spark.demo.broadcast

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap

object IpUtil {
  /*
     * 将ip地址转换成数字long
     */
    def ip2Long(ip:String):Long = {
        val fragments = ip.split("[.]") 
        var ipNum = 0L
        for(i <- 0 until fragments.length) {
            ipNum = fragments(i).toLong | ipNum << 8L
        }
        ipNum
    }

    def ip2Long2(ip:String) = {
        val fragments = ip.split("[.]")
        val ipNum = 16777216L*fragments(0).toLong+65536L*fragments(1).toLong+
                256L*fragments(2).toLong + fragments(3).toLong
        ipNum
    }
    /**
     * 将数字转换成ip地址
     */
    def long2Ip(ipNum:Long) = {
        val mask = List(0x000000FF,0x0000FF00,0x00FF0000,0xFF000000)
        val ipInfo = new StringBuffer
        var num = 0L
        for(i <- 0 until 4) {
            num = (ipNum & mask(i)) >> (i*8)
            if(i>0) ipInfo.insert(0, ".")
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2296094.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大数据学习之SparkSql

95.SPARKSQL_简介 网址&#xff1a; https://spark.apache.org/sql/ Spark SQL 是 Spark 的一个模块&#xff0c;用于处理 结构化的数据 。 SparkSQL 特点 1 易整合 无缝的整合了 SQL 查询和 Spark 编程&#xff0c;随时用 SQL 或 DataFrame API 处理结构化数据。并且支…

鸿蒙UI(ArkUI-方舟UI框架)- 使用文本

返回主章节 → 鸿蒙UI&#xff08;ArkUI-方舟UI框架&#xff09; 文本使用 文本显示 (Text/Span) Text是文本组件&#xff0c;通常用于展示用户视图&#xff0c;如显示文章的文字内容。Span则用于呈现显示行内文本。 创建文本 string字符串 Text("我是一段文本"…

Spider 数据集上实现nlp2sql训练任务

NLP2SQL&#xff08;自然语言处理到 SQL 查询的转换&#xff09;是一个重要的自然语言处理&#xff08;NLP&#xff09;任务&#xff0c;其目标是将用户的自然语言问题转换为相应的 SQL 查询。这一任务在许多场景下具有广泛的应用&#xff0c;尤其是在与数据库交互的场景中&…

【DeepSeek】DeepSeek概述 | 本地部署deepseek

目录 1 -> 概述 1.1 -> 技术特点 1.2 -> 模型发布 1.3 -> 应用领域 1.4 -> 优势与影响 2 -> 本地部署 2.1 -> 安装ollama 2.2 -> 部署deepseek-r1模型 1 -> 概述 DeepSeek是由中国的深度求索公司开发的一系列人工智能模型&#xff0c;以其…

ASP.NET Core 使用 WebClient 从 URL 下载

本文使用 ASP .NET Core 3.1&#xff0c;但它在.NET 5、 .NET 6和.NET 8上也同样适用。如果使用较旧的.NET Framework&#xff0c;请参阅本文&#xff0c;不过&#xff0c;变化不大。 如果想要从 URL 下载任何数据类型&#xff0c;请参阅本文&#xff1a;HttpClient 使用WebC…

【CubeMX-HAL库】STM32F407—无刷电机学习笔记

目录 简介&#xff1a; 学习资料&#xff1a; 跳转目录&#xff1a; 一、工程创建 二、板载LED 三、用户按键 四、蜂鸣器 1.完整IO控制代码 五、TFT彩屏驱动 六、ADC多通道 1.通道确认 2.CubeMX配置 ①开启对应的ADC通道 ②选择规则组通道 ③开启DMA ④开启ADC…

vue3 点击图标从相册选择二维码图片,并使用jsqr解析二维码(含crypto-js加密解密过程)

vue3 点击图标从相册选择二维码图片&#xff0c;并使用jsqr解析二维码&#xff08;含crypto-js加密解密过程&#xff09; 1.安装 jsqr 和 crypto-js npm install -d jsqr npm install crypto-js2.在util目录下新建encryptionHelper.js文件&#xff0c;写加密解密方法。 // e…

kafka 3.5.0 raft协议安装

前言 最近做项目&#xff0c;需要使用kafka进行通信&#xff0c;且只能使用kafka&#xff0c;笔者没有测试集群&#xff0c;就自己搭建了kafka集群&#xff0c;实际上笔者在很早之前就搭建了&#xff0c;因为当时还是zookeeper&#xff08;简称ZK&#xff09;注册元数据&#…

前后端服务配置

1、安装虚拟机&#xff08;VirtualBox或者vmware&#xff09;&#xff0c;在虚拟机上配置centos(选择你需要的Linux版本)&#xff0c;配置如nginx服务器等 1.1 VMware 下载路径Sign In注册下载 1.2 VirtualBox 下载路径https://www.virtualbox.org/wiki/Downloads 2、配置服…

在阿里云ECS上一键部署DeepSeek-R1

DeepSeek-R1 是一款开源模型&#xff0c;也提供了 API(接口)调用方式。据 DeepSeek介绍&#xff0c;DeepSeek-R1 后训练阶段大规模使用了强化学习技术&#xff0c;在只有极少标注数据的情况下提升了模型推理能力&#xff0c;该模型性能对标 OpenAl o1 正式版。DeepSeek-R1 推出…

git SourceTree 使用

Source Tree 使用原理 文件的状态 创建仓库和提交 验证 再克隆的时候发发现一个问题&#xff0c;就是有一个 这个验证&#xff0c;起始很简单 就是 gitee 的账号和密码&#xff0c;但是要搞清楚的是账号不是名称&#xff0c;我之前一直再使用名称登录老是出问题 这个很简单的…

游戏引擎学习第94天

仓库:https://gitee.com/mrxiao_com/2d_game_2 回顾上周的渲染器工作 完成一款游戏的开发&#xff0c;完全不依赖任何库和引擎&#xff0c;这样我们能够全面掌握游戏的开发过程&#xff0c;确保没有任何细节被隐藏。我们将深入探索每一个环节&#xff0c;犹如拿着手电筒翻看床…

win32汇编环境,结构体的使用示例二

;运行效果 ;win32汇编环境,结构体的使用示例二 ;举例说明结构体的定义&#xff0c;如何访问其中的成员&#xff0c;使用assume指令指向某个结构体&#xff0c;计算结构数组所需的偏移量得到某个成员值等 ;直接抄进RadAsm可编译运行。重要部分加备注。 ;下面为asm文件 ;>>…

DeepSeek从入门到精通教程PDF清华大学出版

DeepSeek爆火以来&#xff0c;各种应用方式层出不穷&#xff0c;对于很多人来说&#xff0c;还是特别模糊&#xff0c;有种雾里看花水中望月的感觉。 最近&#xff0c;清华大学新闻与传播学院新媒体研究中心&#xff0c;推出了一篇DeepSeek的使用教程&#xff0c;从最基础的是…

【PDF提取内容】如何批量提取PDF里面的文字内容,把内容到处表格或者批量给PDF文件改名,基于C++的实现方案和步骤

以下分别介绍基于 C 批量提取 PDF 里文字内容并导出到表格&#xff0c;以及批量给 PDF 文件改名的实现方案、步骤和应用场景。 批量提取 PDF 文字内容并导出到表格 应用场景 文档数据整理&#xff1a;在处理大量学术论文、报告等 PDF 文档时&#xff0c;需要提取其中的关键信…

SSA-TCN麻雀算法优化时间卷积神经网络时间序列预测未来Matlab实现

SSA-TCN麻雀算法优化时间卷积神经网络时间序列预测未来Matlab实现 目录 SSA-TCN麻雀算法优化时间卷积神经网络时间序列预测未来Matlab实现预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.Matlab实现SSA-TCN麻雀算法优化时间卷积神经网络时间序列预测未来&#xff08;优…

大模型推理——MLA实现方案

1.整体流程 先上一张图来整体理解下MLA的计算过程 2.实现代码 import math import torch import torch.nn as nn# rms归一化 class RMSNorm(nn.Module):""""""def __init__(self, hidden_size, eps1e-6):super().__init__()self.weight nn.Pa…

大数据项目2:基于hadoop的电影推荐和分析系统设计和实现

前言 大数据项目源码资料说明&#xff1a; 大数据项目资料来自我多年工作中的开发积累与沉淀。 我分享的每个项目都有完整代码、数据、文档、效果图、部署文档及讲解视频。 可用于毕设、课设、学习、工作或者二次开发等&#xff0c;极大提升效率&#xff01; 1、项目目标 本…

Windows逆向工程入门之汇编环境搭建

公开视频 -> 链接点击跳转公开课程博客首页 -> ​​​链接点击跳转博客主页 Visual Studio逆向工程配置 基础环境搭建 Visual Studio 官方下载地址安装配置选项(后期可随时通过VS调整) 使用C的桌面开发 拓展可选选项 MASM汇编框架 配置MASM汇编项目 创建新项目 选择空…

gc buffer busy acquire导致的重大数据库性能故障

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 作者&#xff1a;IT邦德 中国DBA联盟(ACDU)成员&#xff0c;10余年DBA工作经验 Oracle、PostgreSQL ACE CSDN博客专家及B站知名UP主&#xff0c;全网粉丝10万 擅长主流Oracle、MySQL、PG、高斯…