【极数系列】Flink配置参数如何获取?(06)

news2024/12/26 23:23:41

文章目录

  • gitee码云地址
  • 简介概述
  • 01 配置值来自.properties文件
    • 1.通过路径读取
    • 2.通过文件流读取
    • 3.通过IO流读取
  • 02 配置值来自命令行
  • 03 配置来自系统属性
  • 04 注册以及使用全局变量
  • 05 Flink获取参数值Demo
    • 1.项目结构
    • 2.pom.xml文件如下
    • 3.配置文件
    • 4.项目主类
    • 5.运行查看相关日志

gitee码云地址

直接下载解压可用 https://gitee.com/shawsongyue/aurora.git
模块:aurora_flink
主类:GetParamsStreamingJob

简介概述

​ 1.几乎所有的批和流的 Flink 应用程序,都依赖于外部配置参数。这些配置参数可以用于指定输入和输出源(如路径或地址)、系统参数(并行度,运行时配置)和特定的应用程序参数(通常使用在用户自定义函数)。

​ 2.为解决以上问题,Flink 提供一个名为 Parametertool 的简单公共类,其中包含了一些基本的工具。请注意,这里说的 Parametertool 并不是必须使用的。Commons CLI 和 argparse4j 等其他框架也可以非常好地兼容 Flink。

​ 3.**ParameterTool**定义了一组静态方法,用于读取配置信息。该工具类内部使用了 Map` 类型,这样使得它可以很容易地与你的配置集成在一起。

01 配置值来自.properties文件

1.通过路径读取

//定义文件路径
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";

//方式一:直接使用内置工具类
ParameterTool parameter_01 = ParameterTool.fromPropertiesFile(propertiesFilePath);
String jobName_01 = parameter_01.get("jobName");
logger.info("方式一:读取配置文件中指定的key值={}",jobName_01);

2.通过文件流读取

//定义文件路径
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";

//方式二:使用文件
File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter_02 = ParameterTool.fromPropertiesFile(propertiesFile);
String jobName_02 = parameter_02.get("jobName");
logger.info("方式二:读取配置文件中指定的key值={}",jobName_02);

3.通过IO流读取

//定义文件路径
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";

//方式三:使用IO流
InputStream propertiesFileInputStream = new FileInputStream(new File(propertiesFilePath));
ParameterTool parameter_03 = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
String jobName_03 = parameter_03.get("jobName");
logger.info("方式三:读取配置文件中指定的key值={}",jobName_03);

02 配置值来自命令行

tips:在idea的命令行传参,格式:–jobName program_job_aurora

在这里插入图片描述

ParameterTool parameter_04 = ParameterTool.fromArgs(args);
String jobName_04 = parameter_04.get("jobName");
logger.info("方式四:命令行传参key值={}",jobName_04);

03 配置来自系统属性

tips:在idea的的jvm系统参数设置,格式:-Dinput=hdfs:///mydata

在这里插入图片描述

//方式五:获取jvm参数值
ParameterTool parameter_05 = ParameterTool.fromSystemProperties();
String jobName_05 = parameter_05.get("input");
logger.info("方式五:获取jvm参数key值={}",jobName_05);

04 注册以及使用全局变量

注意:Flink全局变量仅支持在富函数中使用,即Rich开头的类使用

//定义文件路径
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";

//直接使用内置工具类获取参数
ParameterTool parameter_01 = ParameterTool.fromPropertiesFile(propertiesFilePath);

//方式六:注册全局参数
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(parameter_01);
        //在任意富函数中均可以获取,注意!注意!注意!只有富文本函数才可以使用
        //1.创建富函数
        RichFlatMapFunction<String, String> richFlatMap = new RichFlatMapFunction<>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                //获取运行环境
                ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
                //获取对应的值
                String jobName = parameters.getRequired("jobName");
                logger.info("方式六:获取全局注册参数key值={}",jobName_05);
            }
        };
        //2.创建数据集
        ArrayList<String> list = new ArrayList<>();
        list.add("001");
        list.add("002");
        list.add("003");
        //3.把有限数据集转换为数据源
        DataStreamSource<String> dataStreamSource = env.fromCollection(list).setParallelism(1);
        //4.执行富文本处理
        dataStreamSource.flatMap(richFlatMap);
        //5.启动程序
        env.execute();

05 Flink获取参数值Demo

1.项目结构

在这里插入图片描述

2.pom.xml文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xsy</groupId>
    <artifactId>aurora_flink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!--属性设置-->
    <properties>
        <!--java_JDK版本-->
        <java.version>11</java.version>
        <!--maven打包插件-->
        <maven.plugin.version>3.8.1</maven.plugin.version>
        <!--编译编码UTF-8-->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!--输出报告编码UTF-8-->
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <!--json数据格式处理工具-->
        <fastjson.version>1.2.75</fastjson.version>
        <!--log4j版本-->
        <log4j.version>2.17.1</log4j.version>
        <!--flink版本-->
        <flink.version>1.18.0</flink.version>
        <!--scala版本-->
        <scala.binary.version>2.11</scala.binary.version>
        <!--log4j依赖-->
        <log4j.version>2.17.1</log4j.version>
    </properties>

    <!--通用依赖-->
    <dependencies>

        <!-- json -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <!--================================集成外部依赖==========================================-->
        <!--集成日志框架 start-->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <!--集成日志框架 end-->
    </dependencies>

    <!--编译打包-->
    <build>
        <finalName>${project.name}</finalName>
        <!--资源文件打包-->
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
            </resource>
        </resources>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>org.google.code.flindbugs:jar305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <excluder>org.apache.logging.log4j:*</excluder>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <!--插件统一管理-->
        <pluginManagement>
            <plugins>
                <!--maven打包插件-->
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                    <configuration>
                        <fork>true</fork>
                        <finalName>${project.build.finalName}</finalName>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>

                <!--编译打包插件-->
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>${maven.plugin.version}</version>
                    <configuration>
                        <source>${java.version}</source>
                        <target>${java.version}</target>
                        <encoding>UTF-8</encoding>
                        <compilerArgs>
                            <arg>-parameters</arg>
                        </compilerArgs>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <!--配置Maven项目中需要使用的远程仓库-->
    <repositories>
        <repository>
            <id>aliyun-repos</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <!--用来配置maven插件的远程仓库-->
    <pluginRepositories>
        <pluginRepository>
            <id>aliyun-plugin</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>

</project>

3.配置文件

(1)application.properties

jobName=job_aurora
jobMemory=1024
taskName=task_aurora

(2)log4j2.properties

rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmprootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmp

4.项目主类

package com.aurora;


import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;

/**
 * @description flink获取外部参数作业
 *
 * @author 浅夏的猫
 * @datetime 15:54 2024/1/28
*/
public class GetParamsStreamingJob {

    private static final Logger logger = LoggerFactory.getLogger(GetParamsStreamingJob.class);

    public static void main(String[] args) throws Exception {

        //定义文件路径
        String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";

        //方式一:直接使用内置工具类
        ParameterTool parameter_01 = ParameterTool.fromPropertiesFile(propertiesFilePath);
        String jobName_01 = parameter_01.get("jobName");
        logger.info("方式一:读取配置文件中指定的key值={}",jobName_01);

        //方式二:使用文件
        File propertiesFile = new File(propertiesFilePath);
        ParameterTool parameter_02 = ParameterTool.fromPropertiesFile(propertiesFile);
        String jobName_02 = parameter_02.get("jobName");
        logger.info("方式二:读取配置文件中指定的key值={}",jobName_02);

        //方式三:使用IO流
        InputStream propertiesFileInputStream = new FileInputStream(new File(propertiesFilePath));
        ParameterTool parameter_03 = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
        String jobName_03 = parameter_03.get("jobName");
        logger.info("方式三:读取配置文件中指定的key值={}",jobName_03);

        //方式四:命令行传参格式:--jobName program_job_aurora
        ParameterTool parameter_04 = ParameterTool.fromArgs(args);
        String jobName_04 = parameter_04.get("jobName");
        logger.info("方式四:命令行传参key值={}",jobName_04);

        //方式五:获取jvm参数值
        ParameterTool parameter_05 = ParameterTool.fromSystemProperties();
        String jobName_05 = parameter_05.get("input");
        logger.info("方式五:获取jvm参数key值={}",jobName_05);

        //方式六:注册全局参数
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(parameter_01);
        //在任意富函数中均可以获取,注意!注意!注意!只有富文本函数才可以使用
        //1.创建富函数
        RichFlatMapFunction<String, String> richFlatMap = new RichFlatMapFunction<>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                //获取运行环境
                ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
                //获取对应的值
                String jobName = parameters.getRequired("jobName");
                logger.info("方式六:获取全局注册参数key值={}",jobName_05);
            }
        };
        //2.创建数据集
        ArrayList<String> list = new ArrayList<>();
        list.add("001");
        list.add("002");
        list.add("003");
        //3.把有限数据集转换为数据源
        DataStreamSource<String> dataStreamSource = env.fromCollection(list).setParallelism(1);
        //4.执行富文本处理
        dataStreamSource.flatMap(richFlatMap);
        //5.启动程序
        env.execute();
    }

}

5.运行查看相关日志

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1418381.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

D6212——由八路达林顿管 阵列和 H 桥驱动电路两个单元组成,可用于驱动步进电机等产品上

D6212是专为安防摄像头系统设计的驱动电路&#xff0c;电路由八路达林顿管 阵列和 H 桥驱动电路两个单元组成。八路达林顿管阵列均带有续流二极 管&#xff0c;可用于驱动步进电机&#xff1b;H 桥驱动电路单元可以直接驱动IR-CUT。单个 达林顿管在输入电压低至 1.8V 状态下支持…

SQL中实现行列转换

目录 方法一&#xff1a;sum case when 方法二&#xff1a;sum if 方法三&#xff1a;pivot 现在有一张表class_gender&#xff0c;内容如下&#xff1a; classgender一年级女一年级女一年级男一年级男二年级女二年级女二年级男 现在我们要根据上表&#xff0c;统计得到下…

spring boot学习第八篇:操作elastic search的索引和索引中的数据

前提参考&#xff1a;elastic search入门-CSDN博客 前提说明&#xff1a;已经安装好了elastic search 7.x版本&#xff0c;我的es版本是7.11.1 1、 pom.xml文件内容如下&#xff1a; <?xml version"1.0" encoding"UTF-8"?> <project xmlns&q…

深分页怎么导致索引失效了?提供6种优化的方案!

深分页怎么导致索引失效了&#xff1f;提供6种优化的方案&#xff01; 上篇文章说到索引失效的几种规则&#xff0c;其中就有包括 深分页回表太多导致索引失效 的场景 本篇文章来聊聊深分页场景中的问题并提供几种优化方案&#xff0c;以下是本篇文章的思维导图&#xff1a; …

go学习之air库的使用

首先下载air库 go install github.com/cosmtrek/air之后你需要去找到库下载的地方&#xff0c;若使用的是go mod可以使用命令 go env GOPATH找到下载库的位置 进入后&#xff0c;有bin&#xff0c;pkg目录&#xff0c;进入bin目录&#xff0c;你能看到air.exe文件 这时候将此…

数据结构(队列Queue)

文章目录 一、队列1、队列的定义2、队列的顺序实现2.1、初始化2.2、入队2.3、出队2.4、查找2.5、判断队列 满/空 3、队列的链式实现3.1、初始化3.2、入队3.3、出队 4、双端队列 一、队列 1、队列的定义 2、队列的顺序实现 2.1、初始化 //初始化 void InitQueue(SqQueue &Q…

微信小程序(二十一)css变量-定义页面主题色

注释很详细&#xff0c;直接上代码 上一篇 新增内容&#xff1a; 1.使用css变量 2.消除按钮白块影响 3.修改图标样式 源码&#xff1a; npmTest.json {"navigationStyle": "custom","usingComponents": {//引入vant组件"van-nav-bar"…

使用py-spy对python程序进行性能诊断学习

py-spy简介 py-spy是一个用Rust编写的轻量级Python分析工具&#xff0c;它能够监视正在运行的Python程序&#xff0c;而不需要修改代码或者重新启动程序。Py-spy可以在不影响程序运行的情况下&#xff0c;采集程序运行时的信息&#xff0c;生成火焰图&#xff08;flame graph&…

API:低代码平台的强大秘诀与无限可能

应用编程接口 (API) 是应用程序以可编程格式访问其关键能力和功能的一种方式&#xff0c;从而其他应用程序可以利用它们。API 本质上支持应用程序之间的无缝数据流&#xff0c;使开发人员能够在应用程序中添加更多功能&#xff0c;而无需依赖大量编码。 举一个简单的例子。 您…

55-工厂模式创建对象,instanceof检测,自定义构造函数

1.对象的概括JavaScript中所有事物都是对象:字符串,数值,数组,函数。对象就是带有属性和方法的特殊数据类型。当函数被保存为一个对象的属性时,它就可以称为这个对象的方法(例如方法模式调用 this指向调用对象) 2.用字面量/自定义对象的方式创建单个对象 <script>/…

华为三层交换机与防火墙对接配置上网示例

三层交换机与防火墙对接上网配置示例 组网图形 图1 三层交换机与防火墙对接上网组网图 三层交换机简介配置注意事项组网需求配置思路操作步骤配置文件 三层交换机简介 三层交换机是具有路由功能的交换机&#xff0c;由于路由属于OSI模型中第三层网络层的功能&#xff0c;所以…

别再做“背锅侠”!软件测试工程师被开发吐槽,如何应对?

作为一名软件测试工程师&#xff0c;我们的角色可以算是“战场上的后勤”&#xff0c;战役的胜败和所有团队人员都息息相关。但是难免碰到战役失败后&#xff0c;很多团队互相推脱的局面&#xff0c;而测试人员就是所有团队中的弱势群体&#xff0c;自然是首当其冲的背锅侠&…

【基础算法练习】单调队列与单调栈模板

文章目录 单调栈模板题代码模板算法思想 单调队列模板题代码模板算法思想 单调栈 模板题 题目链接&#xff1a;ACwing 830. 单调栈 代码模板 #include <iostream> #include <vector> #include <stack>using namespace std;const int N 100010;vector<…

群辉开启WebDav服务+cpolar内网穿透实现移动端ES文件浏览器远程访问本地NAS文件

文章目录 1. 安装启用WebDAV2. 安装cpolar3. 配置公网访问地址4. 公网测试连接5. 固定连接公网地址6. 使用固定地址测试连接 本文主要介绍如何在群辉中开启WebDav服务&#xff0c;并结合cpolar内网穿透工具生成的公网地址&#xff0c;通过移动客户端ES文件浏览器即可实现移动设…

Centos7 单机单网卡安装 OpenStack

本文实际环境 vmware 虚拟机&#xff1a; 网络采用的桥接方式&#xff0c;和我的物理网络在一个网段 CPU开启虚拟化 虚拟机安装系统后&#xff0c;配置上静态IP&#xff0c;能连接外网就行了&#xff0c;最好是把内核升级到5.19以上 1、初始化准备 1&#xff09;关闭防火墙 …

有趣的css - 第一个字符串自动生成文字图标

在设计 app 界面的时候&#xff0c;要展示一部分最新的资讯入口&#xff0c;然后出了一张下面的 UI 稿。 UI稿截图如下&#xff1a; 列表设计比较简单&#xff0c;就是列表前面的圆形图标这块&#xff0c;我个人觉得还是有点意思的。 一般的话&#xff0c;大概率都是用js限制…

ASP .NET Core Api 使用过滤器

过滤器说明 过滤器与中间件很相似&#xff0c;过滤器&#xff08;Filters&#xff09;可在管道&#xff08;pipeline&#xff09;特定阶段&#xff08;particular stage&#xff09;前后执行操作。可以将过滤器视为拦截器&#xff08;interceptors&#xff09;。 过滤器级别范围…

《SPSS统计学基础与实证研究应用精解》视频讲解:在线分析处理报告

《SPSS统计学基础与实证研究应用精解》5.1 视频讲解 视频为《SPSS统计学基础与实证研究应用精解》张甜 杨维忠著 清华大学出版社 一书的随书赠送视频讲解5.1节内容。本书已正式出版上市&#xff0c;当当、京东、淘宝等平台热销中&#xff0c;搜索书名即可。本书旨在手把手教会使…

RabbitMQ“延时队列“

1.RabbitMQ"延时队列" 延迟队列存储的对象是对应的延迟消息&#xff0c;所谓“延迟消息”是指当消息被发送以后&#xff0c;并不想让消费者立刻拿到消息&#xff0c;而是等待特定时间后&#xff0c;消费者才能拿到这个消息进行消费 注意RabbitMQ并没有延时队列慨念,…

图片中的水印怎么去掉?教你三个去水印方法

在拍摄照片时&#xff0c;有时候会遇到不期而遇的路人出现在镜头中&#xff0c;给照片带来不必要的干扰。这时候我们就需要把路人给去掉&#xff0c;让照片变的更加完美。下面我将给大家分享三个把照片中的路人去掉的小妙招。 一、水印云 水印云是一款非常实用的图片处理工具…