不同环境Flink配置信息是不同的,为了区分不同环境的配置文件,使用ParameterTool工具读取带有环境的配置文件信息
区分环境的配置文件
三个配置文件:
flink.properties
:决定那个配置文件生效
flink-dev.properties
:测试环境配置文件
flink-prod.properties
:生产环境配置文件
flink.properties配置文件中只配置一项flink.env.active=dev
,读取该配置项然后组装出生效的配置文件名
工具类实现
public class ParameterUtil {
/**
* 默认配置文件 flink.properties
*/
private static final String DEFAULT_CONFIG = ParameterConstants.FLINK_ROOT_FILE;
/**
* 带环境配置文件 flink-%s.properties
*/
private static final String FLINK_ENV_FILE = ParameterConstants.FLINK_ENV_FILE;
/**
* 环境变量 flink.env.active
*/
private static final String ENV_ACTIVE = ParameterConstants.FLINK_ENV_ACTIVE;
/**
* 配置文件+启动参数+系统环境变量 生成ParameterTool
*/
public static ParameterTool getParameters(final String[] args) {
/* **********************
* Java读取资源的方式:
*
* a. Class.getResourceAsStream(Path): Path 必须以 “/”,表示从ClassPath的根路径读取资源
* b. Class.getClassLoader().getResourceAsStream(Path):Path 无须以 “/”, 默认从ClassPath的根路径读取资源
*
* 推荐使用第2种,也就是类加载器的方式获取静态资源文件, 不要通过ClassPath的相对路径查找
* *********************/
InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);
try {
// 读取根配置文件
ParameterTool defaultPropertiesFile =
ParameterTool.fromPropertiesFile(inputStream);
// 获取环境参数
String envActive = getEnvActiveValue(defaultPropertiesFile);
// 读取真正的配置环境 (推荐使用 Thread.currentThread() 读取配置文件)
return ParameterTool
// ParameterTool读取变量优先级 系统环境变量 > 启动参数变量 > 配置文件变量
// 从配置文件获取配置
.fromPropertiesFile(
//当前线程
Thread.currentThread()
//返回该线程的上下文信息, 获取类加载器
.getContextClassLoader()
.getResourceAsStream(envActive))
// 从启动参数中获取配置
.mergeWith(ParameterTool.fromArgs(args))
// 从系统环境变量获取配置
.mergeWith(ParameterTool.fromSystemProperties());
} catch (IOException e) {
throw new RuntimeException("");
}
}
/**
* 配置文件+系统环境变量 生成ParameterTool
*/
public static ParameterTool getParameters() {
InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);
/* **********************
*
* 注意:
*
* ParameterTool 读取配置文件需要抛出 IOException,
* IOException 的捕捉就在这里 catch
*
* 以前代码是直接抛出,没有进行catch,要注意对以前代码的修改
*
* *********************/
try {
ParameterTool defaultPropertiesFile =
ParameterTool.fromPropertiesFile(inputStream);
//获取环境参数
String envActive = getEnvActiveValue(defaultPropertiesFile);
//读取真正的配置环境 (推荐使用 Thread.currentThread() 读取配置文件)
return ParameterTool
// ParameterTool读取变量优先级 系统环境变量>启动参数变量>配置文件变量
// 从配置文件获取配置
.fromPropertiesFile(
//当前线程
Thread.currentThread()
//返回该线程的上下文信息, 获取类加载器
.getContextClassLoader()
.getResourceAsStream(envActive))
// 从系统环境变量获取配置
.mergeWith(ParameterTool.fromSystemProperties());
} catch (Exception e) {
throw new FlinkPropertiesException(FlinkPropertiesExceptionInfo.PROPERTIES_NULL);
}
}
/**
* 获取环境配置变量
*/
private static String getEnvActiveValue(ParameterTool defaultPropertiesFile) {
// 选择参数环境
String envActive = null;
if (defaultPropertiesFile.has(ENV_ACTIVE)) {
envActive = String.format(FLINK_ENV_FILE, defaultPropertiesFile.get(ENV_ACTIVE));
}
return envActive;
}
/**
* 从配置文件参数配置流式计算的上下文环境
*/
public static void envWithConfig(
StreamExecutionEnvironment env,
ParameterTool parameterTool
) {
/* **********************
*
* checkpoint 设置
*
* 1.
* 若checkpoint 时间不要设置太短,
* 这里的时间包括了超时时间
*
* 2.
* 设置了周期性checkpoint,
* 若上一个周期的checkpoint没完成,
* 下一个周期的checkpoint不会开始的.
*
* 3.
* 若checkpoint的持续时间超过了超时时间,
* 会出现排队,
* 过多的checkpoint排队会耗费资源
*
* 4.
* 为了解决checkpoint排队堆积,
* 需要优化checkpoint的完成效率
*
* *********************/
// 每60秒触发checkpoint
env.enableCheckpointing(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_INTERVAL));
CheckpointConfig ck = env.getCheckpointConfig();
// checkpoint 必须在60秒内结束,否则被丢弃
ck.setCheckpointTimeout(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_TIMEOUT));
// checkpoint间最小间隔 30秒 (指定了这个值, setMaxConcurrentCheckpoints自动默认为1)
ck.setMinPauseBetweenCheckpoints(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_MINPAUSE));
// checkpoint 语义设置为 精确一致( EXACTLY_ONCE )
ck.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 最多允许 checkpoint 失败 3 次
ck.setTolerableCheckpointFailureNumber(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_FAILURENUMBER));
// 同一时间只允许一个 checkpoint 进行
ck.setMaxConcurrentCheckpoints(parameterTool.getInt(ParameterConstants.FLINK_CHECKPOINT_MAXCONCURRENT));
// 设置 State 存储
env.setStateBackend(new HashMapStateBackend());
// 并行度设置
env.setParallelism(parameterTool.getInt(ParameterConstants.FLINK_PARALLELISM));
}
}
读取环境信息
该方法会读取 flink.properties 配置的生效的配置文件,组装成要读取的配置文件
// flink.properties
InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);
ParameterTool defaultPropertiesFile =
ParameterTool.fromPropertiesFile(inputStream);
private static String getEnvActiveValue(ParameterTool defaultPropertiesFile) {
// 选择参数环境
String envActive = null;
// 配置文件中是否有该属性 flink.env.active
if (defaultPropertiesFile.has(ENV_ACTIVE)) {
// 有的话,直接拼装 flink-%s.properties -> flink-dev.properties
envActive = String.format(FLINK_ENV_FILE, defaultPropertiesFile.get(ENV_ACTIVE));
}
return envActive;
}
ParameterTool 获取参数的3种方式
-
fromPropertiesFile 配置文件
-
fromArgs 程序启动参数
- 或者 -- 开头 空格分隔
, 如:-name likelong --age 21 -
fromSystemProperties 系统环境变量, 包括程序 -D启动的变量
内部调用的是 Java提供的 System.getProperties()
ParameterTool 获取参数优先级, 可通过 mergeWith() 设置优先级, 但 mergeWith() 会覆盖前面的同名变量
因此,上述ParameterTool读取变量优先级 系统环境变量 > 启动参数变量 > 配置文件变量
ParameterTool 注册 global 变量
ParameterTool 注册为 global 变量:
env.getConfig().setGlobalJobParameter()
这样, 在上下文中就能获取 ParameterTool
(ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
【该方法可以在富函数生命周期方法中调用】
如下:
private static void initEnv(String[] args) {
// ParameterTool 注册为 global
parameterTool = ParameterUtil.getParameters();
env.getConfig().setGlobalJobParameters(parameterTool);
// 配置上下文环境
ParameterUtil.envWithConfig(env, parameterTool);
}