【动态读取配置文件】ParameterTool读取带环境的配置信息

news2025/1/17 15:39:39

不同环境Flink配置信息是不同的,为了区分不同环境的配置文件,使用ParameterTool工具读取带有环境的配置文件信息

区分环境的配置文件

image-20231217202925250

三个配置文件:

flink.properties:决定那个配置文件生效

flink-dev.properties:测试环境配置文件

flink-prod.properties:生产环境配置文件

flink.properties配置文件中只配置一项flink.env.active=dev,读取该配置项然后组装出生效的配置文件名

工具类实现

public class ParameterUtil {

    /**
     * 默认配置文件 flink.properties
     */
    private static final String DEFAULT_CONFIG = ParameterConstants.FLINK_ROOT_FILE;
    /**
     * 带环境配置文件 flink-%s.properties
     */
    private static final String FLINK_ENV_FILE = ParameterConstants.FLINK_ENV_FILE;
    /**
     * 环境变量 flink.env.active
     */
    private static final String ENV_ACTIVE = ParameterConstants.FLINK_ENV_ACTIVE;

    /**
     * 配置文件+启动参数+系统环境变量 生成ParameterTool
     */
    public static ParameterTool getParameters(final String[] args) {

        /* **********************
         * Java读取资源的方式:
         *
         * a. Class.getResourceAsStream(Path): Path 必须以 “/”,表示从ClassPath的根路径读取资源
         * b. Class.getClassLoader().getResourceAsStream(Path):Path 无须以 “/”, 默认从ClassPath的根路径读取资源
         *
         * 推荐使用第2种,也就是类加载器的方式获取静态资源文件, 不要通过ClassPath的相对路径查找
         * *********************/

        InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);

        try {
            // 读取根配置文件
            ParameterTool defaultPropertiesFile =
                    ParameterTool.fromPropertiesFile(inputStream);

            // 获取环境参数
            String envActive = getEnvActiveValue(defaultPropertiesFile);

            // 读取真正的配置环境 (推荐使用 Thread.currentThread() 读取配置文件)

            return ParameterTool
                    // ParameterTool读取变量优先级 系统环境变量 > 启动参数变量 > 配置文件变量

                    // 从配置文件获取配置
                    .fromPropertiesFile(
                            //当前线程
                            Thread.currentThread()
                                    //返回该线程的上下文信息, 获取类加载器
                                    .getContextClassLoader()
                                    .getResourceAsStream(envActive))
                    // 从启动参数中获取配置
                    .mergeWith(ParameterTool.fromArgs(args))
                    // 从系统环境变量获取配置
                    .mergeWith(ParameterTool.fromSystemProperties());
        } catch (IOException e) {
            throw new RuntimeException("");
        }

    }


    /**
     * 配置文件+系统环境变量 生成ParameterTool
     */
    public static ParameterTool getParameters() {

        InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);

        /* **********************
         *
         * 注意:
         *
         * ParameterTool 读取配置文件需要抛出 IOException,
         * IOException 的捕捉就在这里 catch
         *
         * 以前代码是直接抛出,没有进行catch,要注意对以前代码的修改
         *
         * *********************/

        try {
            ParameterTool defaultPropertiesFile =
                    ParameterTool.fromPropertiesFile(inputStream);

            //获取环境参数
            String envActive = getEnvActiveValue(defaultPropertiesFile);

            //读取真正的配置环境 (推荐使用 Thread.currentThread() 读取配置文件)
            return ParameterTool
                    // ParameterTool读取变量优先级 系统环境变量>启动参数变量>配置文件变量

                    // 从配置文件获取配置
                    .fromPropertiesFile(
                            //当前线程
                            Thread.currentThread()
                                    //返回该线程的上下文信息, 获取类加载器
                                    .getContextClassLoader()
                                    .getResourceAsStream(envActive))
                    // 从系统环境变量获取配置
                    .mergeWith(ParameterTool.fromSystemProperties());
        } catch (Exception e) {
            throw new FlinkPropertiesException(FlinkPropertiesExceptionInfo.PROPERTIES_NULL);
        }
    }

    /**
     * 获取环境配置变量
     */
    private static String getEnvActiveValue(ParameterTool defaultPropertiesFile) {
        // 选择参数环境
        String envActive = null;
        if (defaultPropertiesFile.has(ENV_ACTIVE)) {
            envActive = String.format(FLINK_ENV_FILE, defaultPropertiesFile.get(ENV_ACTIVE));
        }

        return envActive;
    }

    /**
     * 从配置文件参数配置流式计算的上下文环境
     */
    public static void envWithConfig(
            StreamExecutionEnvironment env,
            ParameterTool parameterTool
    ) {

        /* **********************
         *
         * checkpoint 设置
         *
         * 1.
         * 若checkpoint 时间不要设置太短,
         * 这里的时间包括了超时时间
         *
         * 2.
         * 设置了周期性checkpoint,
         * 若上一个周期的checkpoint没完成,
         * 下一个周期的checkpoint不会开始的.
         *
         * 3.
         * 若checkpoint的持续时间超过了超时时间,
         * 会出现排队,
         * 过多的checkpoint排队会耗费资源
         *
         * 4.
         * 为了解决checkpoint排队堆积,
         * 需要优化checkpoint的完成效率
         *
         * *********************/
        // 每60秒触发checkpoint
        env.enableCheckpointing(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_INTERVAL));
        CheckpointConfig ck = env.getCheckpointConfig();
        // checkpoint 必须在60秒内结束,否则被丢弃
        ck.setCheckpointTimeout(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_TIMEOUT));
        // checkpoint间最小间隔 30秒 (指定了这个值, setMaxConcurrentCheckpoints自动默认为1)
        ck.setMinPauseBetweenCheckpoints(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_MINPAUSE));
        // checkpoint 语义设置为 精确一致( EXACTLY_ONCE )
        ck.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 最多允许 checkpoint 失败 3 次
        ck.setTolerableCheckpointFailureNumber(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_FAILURENUMBER));
        // 同一时间只允许一个 checkpoint 进行
        ck.setMaxConcurrentCheckpoints(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_MAXCONCURRENT));

        // 设置 State 存储
        env.setStateBackend(new HashMapStateBackend());
        // 并行度设置
        env.setParallelism(parameterTool.getInt(ParameterConstants.FLINK_PARALLELISM));

    }

}

读取环境信息

该方法会读取 flink.properties 配置的生效的配置文件,组装成要读取的配置文件

		// flink.properties	
		InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);
        ParameterTool defaultPropertiesFile =
            ParameterTool.fromPropertiesFile(inputStream);  

	private static String getEnvActiveValue(ParameterTool defaultPropertiesFile) {
        // 选择参数环境
        String envActive = null;
        
        // 配置文件中是否有该属性 flink.env.active
        if (defaultPropertiesFile.has(ENV_ACTIVE)) {
            // 有的话,直接拼装 flink-%s.properties -> flink-dev.properties
            envActive = String.format(FLINK_ENV_FILE, defaultPropertiesFile.get(ENV_ACTIVE));
        }

        return envActive;
    }

ParameterTool 获取参数的3种方式

  1. fromPropertiesFile 配置文件

  2. fromArgs 程序启动参数

    - 或者 -- 开头 空格分隔, 如:-name likelong --age 21

  3. fromSystemProperties 系统环境变量, 包括程序 -D启动的变量

    内部调用的是 Java提供的 System.getProperties()

ParameterTool 获取参数优先级, 可通过 mergeWith() 设置优先级, 但 mergeWith() 会覆盖前面的同名变量

因此,上述ParameterTool读取变量优先级 系统环境变量 > 启动参数变量 > 配置文件变量

ParameterTool 注册 global 变量

ParameterTool 注册为 global 变量:env.getConfig().setGlobalJobParameter()

这样, 在上下文中就能获取 ParameterTool

(ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters()

【该方法可以在富函数生命周期方法中调用】

如下:

    private static void initEnv(String[] args) {

        // ParameterTool 注册为 global
        parameterTool = ParameterUtil.getParameters();
        env.getConfig().setGlobalJobParameters(parameterTool);
        // 配置上下文环境
        ParameterUtil.envWithConfig(env, parameterTool);
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1318246.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【计算机网络】—— 详解码元,传输速率的计算|网络奇缘系列|计算机网络

🌈个人主页: Aileen_0v0🔥系列专栏: 一见倾心,再见倾城 --- 计算机网络~💫个人格言:"没有罗马,那就自己创造罗马~" 目录 码元 速率和波特 思考1 思考2 思考3 带宽(Bandwidth) 📝总结 码元…

MapReduce序列化实例代码

1 )需求:统计每个学号该月的超市消费、食堂消费、总消费 2 )输入数据格式 序号 学号 超市消费 食堂消费 18 202200153105 8.78 12 3 )期望输出格式 key (学号) value ( bean 对象&#xf…

用Rust刷LeetCode之58 最后一个单词的长度

58. 最后一个单词的长度[1] 难度: 简单 原描述: 思路 使用标准库: package mainimport ( "fmt" "strings")func lengthOfLastWord(s string) int { s strings.TrimSpace(s) // 删除 首尾 的空格 arr : strings.Split(s, " ") // 字符串转为切片…

FL Studio 21.1.0.3713中文版最新安装激活图文教程及系统配置要求

FL Studio 21.1.0.3713中文版是一款功能强大的编曲软件,它也能够剪辑、混音、录音,它的矢量界面,能更好用在4K、5K甚至8K显示器上。完全重新设计混音器、动态缩放、具有 6 种布局风格、外加 3个用户自定义面板管理音轨、多推子选择和调整、混…

Python 反编译Il2Cpp APK

引入 https://github.com/Perfare/Il2CppDumper/ 实现 开源的Ii2Cpp Dumper可以帮助我们将So和globalmetadata.dat文件反编译出 Assembly-CSharp.dll 本博客教程可以帮助我们直接拖入APK反编译出来 调用方式 两种 第一种 拖入后回车运行 第二种 放入运行的根目录下 源码 i…

Pandas-DataFtame的索引与切片(第3讲)

Pandas-DataFtame的索引与切片(第3讲)         🍹博主 侯小啾 感谢您的支持与信赖。☀️ 🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ�…

数据解密战:.mallox勒索病毒攻击下的数据保护

引言: 近期,网络安全领域再度掀起一场不安的浪潮,.Mallox勒索病毒作为最新一位悍匪登场,以其毒辣的加密技术,将用户的数据转瞬间变成了无解之谜。这股数字黑暗力量的出现引起了广泛关注,让用户和企业重新审…

事件监听的艺术:掌握`addEventListener`的魅力

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云…

前端实现一个时间区间内,再次单选功能,使用Antd组件库内日历组件Calendar

需求:需要先让用户选择一个时间区间,然后再这个时间区间中,让用户再次去单选其种特殊日期。 思路: 1.先用Antd组件库中日期选择DatePicker.RangePicker实现让用户选择时间区间 2.在选择完时间区间后,用这个时间区间…

Java之Clonable接口和深浅拷贝

Clonable接口 我们船舰了一个人的对象,想要克隆一个一模一样的对象,可以用到object类里面的克隆方法 object不是所有类的父类吗?那为什么用person1点不出这个方法呢?可以看一下源码 这是Object类里面的clone方法的声明&#xff0…

随机游走Python中的实现

随机游走是一个数学对象,称为随机或随机过程,它描述了一条路径,该路径由一些数学空间(如整数)上的一系列随机步骤组成。随机游走的一个基本例子是整数线上的随机游走,它从0开始,每一步以相等的概…

docker小白第五天

docker小白第五天 docker的私有库 有些涉密的信息代码不能放在阿里云的镜像仓库,因此需要构建一个个人内网专属的私有库,将镜像或者容器代码进行推送保存。 下载镜像docker registry 执行代码docker pull registry,用于搭建私服前的准备。…

为什么选择计算机?大数据时代学习计算机的价值探讨

还记得当初自己为什么选择计算机? 计算机是在90年代兴起的专业,那时候的年轻人有驾照、懂外语、懂计算机是很时髦的事情! 当初你问我为什么选择计算机,我笑着回答:“因为我梦想成为神奇的码农!我想像编织魔法一样编写程序,创造出炫酷的虚拟世界!”谁知道,我刚入门的…

力扣刷题-二叉树-平衡二叉树

110 平衡二叉树 给定一个二叉树,判断它是否是高度平衡的二叉树。 本题中,一棵高度平衡二叉树定义为:一个二叉树每个节点 的左右两个子树的高度差的绝对值不超过1。 示例 1: 给定二叉树 [3,9,20,null,null,15,7] 返回 true 。 给定二叉树 [1…

diffuser为pipeline设置不用的scheduler

查看默认的schedulers: 使用默认的schedulers生成数据 查看默认scheduler的默认配置,定义了采样器中的相关参数,网上关于DDPM和DDIM的文章较多,可以先去看看这两种schedulers: 修改scheduler,可以用于…

【TB作品】51单片机,具有报时报温功能的电子钟

2.具有报时报温功能的电子钟 一、功能要求: 1.显示室温。 2.具有实时时间显示。 3.具有实时年月日显示和校对功能。 4.具有整点语音播报时间和温度功能。 5.定闹功能,闹钟音乐可选。 6.操作简单、界面友好。 二、设计建议: 1.单片机自选(C51、STM32或其他单片机)。 2.时钟日历芯…

【已解决】每次点击Windows 10 任务栏会闪退

一台装有 Windows 10 系统的电脑,最近通过“Windows 易升”升级到22H2版之后出现一个兼容问题:每次鼠标点击任务栏切换其他程序,任务栏都会闪退,桌面图标消失,然后又恢复显示桌面图标以及任务栏程序。 这样每次切换其他…

社交网络分析3:社交网络隐私攻击、保护的基本概念和方法 + 去匿名化技术 + 推理攻击技术 + k-匿名 + 基于聚类的隐私保护算法

社交网络分析3:社交网络隐私攻击、保护的基本概念和方法 去匿名化技术 推理攻击技术 k-匿名 基于聚类的隐私保护算法 写在最前面社交网络隐私泄露用户数据暴露的途径复杂行为的隐私风险技术发展带来的隐私挑战经济利益与数据售卖防范措施 社交网络 用户数据隐私…

【气候模式降尺度】分位数增量映射(QDM)原理及MATLAB代码实现

分位数增量映射(quantile delta mapping, QDM) 1 QDM偏差订正原理2 MATLAB实现代码3 案例参考气候模式的模拟结果与观测数据往往存在着一定的系统偏差,若将气候模式结果直接应用于作物模型或者水文模型中,其偏差会对模拟产生很大的影响,因此需要对气候模拟结果进行误差订正…

C++初阶-queue的使用与模拟实现

queue的使用与模拟实现 一、queue的介绍和使用二、queue的使用三、queue的模拟实现3.1 成员变量3.2 成员函数3.2.1 push入队列3.2.2 pop出队列3.2.3 返回队头数据3.2.4 返回队尾数据3.2.5 返回队列的大小3.2.6 判断队列是否为空 四、完整代码4.1 queue.h4.2 test.h 五、deque的…