FlinkSQL的Watermark机制和Checkpoint机制

news2024/10/6 6:02:05

Watermark机制

Watermark机制,就是水印机制,也叫做水位线机制。就是专门用来解决流式环境下数据迟到问题的。

MonotonousWatermark(单调递增水印)

package day05;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * @desc: 需求:从socket获取数据,转换成水位传感器类,基于事件时间,每5秒生成一个滚动窗口,来计算传感器水位信息
 * 定义类 WaterSensor
 * 	String id  --id
 * 	Integer vc  --value count
 * 	Long ts  --timestamp
 * 	TODO 演示单调递增水印monotonousWatermark
 */
public class Demo01_MonotonousWatermark {
    public static void main(String[] args) throws Exception {
        //1.构建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.数据输入
        DataStreamSource<String> source = env.socketTextStream("node1", 9999);
        //3.数据处理
        //	//3.1 把数据转换成WaterSensor对象
        SingleOutputStreamOperator<WaterSensor> mapStream = source.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String value) throws Exception {
                //lines就是转换后的数组类型,数组的长度为3,分别表示:
                // String id
                // Integer vc
                // Long ts
                String[] lines = value.split(",");
                return new WaterSensor(lines[0], Integer.parseInt(lines[1]), Long.parseLong(lines[2]));
            }
        });

        //	//3.2 分配watermark(演示常用的watermark水印)
        SingleOutputStreamOperator<WaterSensor> watermarks = mapStream
                //forMonotonousTimestamps:单调递增水印
                .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        return element.getTs() * 1000L;
                    }
                }));

        // 3.3 基于id分组
        SingleOutputStreamOperator<String> result = watermarks.keyBy(value -> value.getId())
                // 3.4 指定5秒的滚动窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                // 3.5 进行数据处理(统计次数)process方法。
                /**
                 * 参数1:输入的数据类型
                 * 参数2:输出的数据类型
                 * 参数3:分组的数据类型
                 * 参数4:时间窗口
                 */
                .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                    /**
                     *  process方法介绍:
                     * @param key 根据key来分组
                     * @param context 窗口计算的上下文对象(可以从上下文对象获取窗口的一些额外信息)
                     * @param elements 窗口内的数据
                     * @param out 收集窗口的计算结果
                     * @throws Exception 异常抛出
                     */
                    @Override
                    public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                        out.collect("分组的key为:" + key +
                                "\n窗口内的数据为:" + elements +
                                "\n窗口内的数据量为:" + elements.spliterator().estimateSize() +
                                "\n窗口为:[" + context.window().getStart() + "," + context.window().getEnd() + ")\n");
                    }
                });


        //4.数据输出
        result.print();
        //5.启动流式任务
        env.execute();
    }

    /**
     * 创建水位传感器类:WaterSensor
     * @Data:可以用来构建getter和setter方法
     * 构造器:Java中有无参和有参的构造器(构造方法)
     * @AllArgsConstructor:有参构造
     * @NoArgsConstructor:无参构造
     */
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class WaterSensor {
        //用户id
        private String id;
        //水位信息
        private Integer vc;
        //时间戳
        private Long ts;

    }


}

运行结果如下:

结论:

MonotonousWatermark(单调递增水印)会有数据丢失的情况。  

BoundedOutofOrder(固定延迟水印)

package day05;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @desc: 需求:从socket获取数据,转换成水位传感器类,基于事件时间,每5秒生成一个滚动窗口,来计算传感器水位信息
 *  * 定义类 WaterSensor
 *  * 	String id  --id
 *  * 	Integer vc  --value count
 *  * 	Long ts  --timestamp
 *  todo 演示固定延迟水印
 */
public class Demo02_BoundedOutofOrderWatermark {
    public static void main(String[] args) throws Exception {
        //1.构建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.数据输入
        DataStreamSource<String> source = env.socketTextStream("node1", 9999);

        //3.数据处理
        //3.1 把输入数据转换成WaterSensor对象
        SingleOutputStreamOperator<WaterSensor> mapStream = source.map(value -> {
            String[] lines = value.split(",");
            return new WaterSensor(lines[0], Integer.parseInt(lines[1]), Long.parseLong(lines[2]));
            //返回时需要使用自定义的类WaterSensor,写法如下:Types.GENERIC(WaterSensor.class)
        }).returns(Types.GENERIC(WaterSensor.class));
        /**
         * 3.2 给数据添加固定延迟水印
         * WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
         * 如上代码,2秒的固定延迟,允许数据乱序2秒。(延迟2秒到达)
         */
        SingleOutputStreamOperator<WaterSensor> watermarks = mapStream
                .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
            @Override
            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                return element.getTs() * 1000L;
            }
        }));
        //3.3 把数据进行分组
        SingleOutputStreamOperator<String> result = watermarks.keyBy(value -> value.getId())
                //3.4 分配滚动事件时间窗口,并且制定窗口大小为5秒钟
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                /**
                 *  3.5 对窗口内的数据进行处理
                 *  @param <IN> The type of the input value. 输入数据类型
                 *  @param <OUT> The type of the output value. 输出数据类型
                 *  @param <KEY> The type of the key. key的类型
                 *  @param <W> The type of {@code Window} that this window function can be applied on.窗口
                 */
                .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                    /**
                     *
                     * @param s 分组的key
                     * @param context 上下文对象,可以从上下文对象中获取其他信息
                     * @param elements 窗口内的元素(数据)
                     * @param out out收集器,用于结果输出
                     * @throws Exception 异常
                     */
                    @Override
                    public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                        out.collect("分组的key为:" + s +
                                "\n窗口内的数据:" + elements +
                                "\n窗口内的数据量为:" + elements.spliterator().estimateSize() +
                                "\n窗口起始时间为:[" + context.window().getStart() + "," + context.window().getEnd() + ")\n");
                    }
                });

        //4.数据输出
        result.print();

        //5.启动流式任务
        env.execute();
    }

    /**
     * 创建水位传感器类:WaterSensor
     * @Data:可以用来构建getter和setter方法
     * 构造器:Java中有无参和有参的构造器(构造方法)
     * @AllArgsConstructor:有参构造
     * @NoArgsConstructor:无参构造
     */
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class WaterSensor {
        //用户id
        private String id;
        //水位信息
        private Integer vc;
        //时间戳
        private Long ts;

    }
}

运行结果如下:

结论:

在固定延迟水印下,在一定范围内的数据迟到的情况下,可以正常统计。

AllowedLateness(在固定延迟水印下允许迟到)

package day05;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @desc: 需求:从socket获取数据,转换成水位传感器类,基于事件时间,每5秒生成一个滚动窗口,来计算传感器水位信息
 *  * 定义类 WaterSensor
 *  * 	String id  --id
 *  * 	Integer vc  --value count
 *  * 	Long ts  --timestamp
 *  todo 演示AllowedLateness
 */
public class Demo03_AllowedLateness {
    public static void main(String[] args) throws Exception {
        //1.构建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.数据输入
        DataStreamSource<String> source = env.socketTextStream("node1", 9999);

        //3.数据处理
        //3.1 把输入数据转换成WaterSensor对象
        SingleOutputStreamOperator<WaterSensor> mapStream = source.map(value -> {
            String[] lines = value.split(",");
            return new WaterSensor(lines[0], Integer.parseInt(lines[1]), Long.parseLong(lines[2]));
            //返回时需要使用自定义的类WaterSensor,写法如下:Types.GENERIC(WaterSensor.class)
        }).returns(Types.GENERIC(WaterSensor.class));
        /**
         * 3.2 给数据添加固定延迟水印
         * WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
         * 如上代码,2秒的固定延迟,允许数据乱序2秒。(延迟2秒到达)
         */
        SingleOutputStreamOperator<WaterSensor> watermarks = mapStream
                .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
            @Override
            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                return element.getTs() * 1000L;
            }
        }));
        //3.3 把数据进行分组
        SingleOutputStreamOperator<String> result = watermarks.keyBy(value -> value.getId())
                //3.4 分配滚动事件时间窗口,并且制定窗口大小为5秒钟
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                //allowedLateness,在固定延迟水印下,再允许你延迟的时间程度
                .allowedLateness(Time.seconds(1))
                /**
                 *  3.5 对窗口内的数据进行处理
                 *  @param <IN> The type of the input value. 输入数据类型
                 *  @param <OUT> The type of the output value. 输出数据类型
                 *  @param <KEY> The type of the key. key的类型
                 *  @param <W> The type of {@code Window} that this window function can be applied on.窗口
                 */
                .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                    /**
                     *
                     * @param s 分组的key
                     * @param context 上下文对象,可以从上下文对象中获取其他信息
                     * @param elements 窗口内的元素(数据)
                     * @param out out收集器,用于结果输出
                     * @throws Exception 异常
                     */
                    @Override
                    public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                        out.collect("分组的key为:" + s +
                                "\n窗口内的数据:" + elements +
                                "\n窗口内的数据量为:" + elements.spliterator().estimateSize() +
                                "\n窗口起始时间为:[" + context.window().getStart() + "," + context.window().getEnd() + ")\n");
                    }
                });

        //4.数据输出
        result.print();

        //5.启动流式任务
        env.execute();
    }

    /**
     * 创建水位传感器类:WaterSensor
     * @Data:可以用来构建getter和setter方法
     * 构造器:Java中有无参和有参的构造器(构造方法)
     * @AllArgsConstructor:有参构造
     * @NoArgsConstructor:无参构造
     */
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class WaterSensor {
        //用户id
        private String id;
        //水位信息
        private Integer vc;
        //时间戳
        private Long ts;

    }
}

运行结果如下:

结论:

AllowedLateness,允许在固定延迟水印下,再次迟到的数据被捕获。

虽然Watermark会触发窗口计算,如果使用AllowedLateness,窗口就不会立刻销毁,

因此,数据的延迟时间在AllowedLateness的时间范围内,数据能够被正常处理。

窗口会在AllowedLateness设置的时间之后再销毁。  

SideOutput(侧道输出)

package day05;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;

/**
 * @desc: 需求:从socket获取数据,转换成水位传感器类,基于事件时间,每5秒生成一个滚动窗口,来计算传感器水位信息
 *  * 定义类 WaterSensor
 *  * 	String id  --id
 *  * 	Integer vc  --value count
 *  * 	Long ts  --timestamp
 *  todo 演示SideoutputTag(侧道输出)
 */
public class Demo04_SideOutputTag {
    public static void main(String[] args) throws Exception {
        //1.构建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.数据输入
        DataStreamSource<String> source = env.socketTextStream("node1", 9999);
        //定义一个OutputTag对象,用于SideOutputLateData
        OutputTag<WaterSensor> lateData = new OutputTag<>("lateData", Types.GENERIC(WaterSensor.class));

        //3.数据处理
        //3.1 把输入数据转换成WaterSensor对象
        SingleOutputStreamOperator<WaterSensor> mapStream = source.map(value -> {
            String[] lines = value.split(",");
            return new WaterSensor(lines[0], Integer.parseInt(lines[1]), Long.parseLong(lines[2]));
            //返回时需要使用自定义的类WaterSensor,写法如下:Types.GENERIC(WaterSensor.class)
        }).returns(Types.GENERIC(WaterSensor.class));
        /**
         * 3.2 给数据添加固定延迟水印
         * WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
         * 如上代码,2秒的固定延迟,允许数据乱序2秒。(延迟2秒到达)
         */
        SingleOutputStreamOperator<WaterSensor> watermarks = mapStream
                .assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
            @Override
            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                return element.getTs() * 1000L;
            }
        }));
        //3.3 把数据进行分组
        SingleOutputStreamOperator<String> result = watermarks.keyBy(value -> value.getId())
                //3.4 分配滚动事件时间窗口,并且制定窗口大小为5秒钟
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                //allowedLateness,在固定延迟水印下,再允许你延迟的时间程度
                .allowedLateness(Time.seconds(1))
                //侧道输出:无论延迟多久的数据,都能够通过侧道输出来捕获
                .sideOutputLateData(lateData)
                /**
                 *  3.5 对窗口内的数据进行处理
                 *  @param <IN> The type of the input value. 输入数据类型
                 *  @param <OUT> The type of the output value. 输出数据类型
                 *  @param <KEY> The type of the key. key的类型
                 *  @param <W> The type of {@code Window} that this window function can be applied on.窗口
                 */
                .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                    /**
                     *
                     * @param s 分组的key
                     * @param context 上下文对象,可以从上下文对象中获取其他信息
                     * @param elements 窗口内的元素(数据)
                     * @param out out收集器,用于结果输出
                     * @throws Exception 异常
                     */
                    @Override
                    public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                        out.collect("分组的key为:" + s +
                                "\n窗口内的数据:" + elements +
                                "\n窗口内的数据量为:" + elements.spliterator().estimateSize() +
                                "\n窗口起始时间为:[" + context.window().getStart() + "," + context.window().getEnd() + ")\n");
                    }
                });

        //4.数据输出
        result.print();
        //获取迟到的数据,并且打印输出
        result.getSideOutput(lateData).printToErr("超过AllowedLateness的数据:");
        //5.启动流式任务
        env.execute();
    }

    /**
     * 创建水位传感器类:WaterSensor
     * @Data:可以用来构建getter和setter方法
     * 构造器:Java中有无参和有参的构造器(构造方法)
     * @AllArgsConstructor:有参构造
     * @NoArgsConstructor:无参构造
     */
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class WaterSensor {
        //用户id
        private String id;
        //水位信息
        private Integer vc;
        //时间戳
        private Long ts;

    }
}

运行结果如下:

结论:

SideOutput侧道输出,可以允许数据在既超过了Watermark的时间,又超过了AllowedLateness的时间范围后,仍然被正常捕获。

也就是说,数据无论迟到多久,都不会丢失。

Checkpoint机制

Checkpoint机制,又叫容错机制,可以保证流式任务中,不会因为异常时等原因,造成任务异常退出。可以保证任务正常运行。  

机制运行流程

解释:

(1)主节点上的检查点协调器(CheckpointCoordinator)会周期性地发送一个个地Barrier(栅栏),Barrier会混在数据里,随着数据流,流向source算子

(2)source算子在摄入数据的时候,如果碰到Barrier栅栏,不会去处理,Barrier就会让算子去汇报当前的状态

(3)处理完之后,Barrier就会随着数据流,流向下一个算子

(4)下一个算子收到Barrier,同样会听下手里的工作,也会向检查点协调器汇报当前的状态,把状态往主节点传递一份(备份,防止算子出错,状态丢失) (5)上一步处理完之后,Barrier又会随着数据流向下一个算子,以此类推。 (6)等Barrier流经所有的算子之后,这一轮的快照就算制作完成。

状态后端

状态后端,StateBackend,就是Flink存储状态的介质。Flink提供了三种状态后端的存储方式:

  • MemoryStateBackend(内存)

内存,掉电易失。不安全。基本不用。

配置如下:

state.backend: hashmap
# 可选,当不指定 checkpoint 路径时,默认自动使用 JobManagerCheckpointStorage
state.checkpoint-storage: jobmanager

  • FsStateBackend(文件系统)

FsStateBackend,文件系统的状态后端,就是把状态保存在文件系统中,常用来保存状态的文件系统有HDFS。

配置如下:

state.backend: hashmap 
state.checkpoints.dir: file:///checkpoint-dir/ 

# 默认为FileSystemCheckpointStorage 
state.checkpoint-storage: filesystem

  • RocksDBStateBackend(RocksDB数据库)

RocksDBStateBackend,把状态保存在RocksDB数据库中。

RocksDB,是一个小型文件系统的数据库。

配置如下:

state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/

# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem

Flink的重启策略

Flink支持四种类型的重启策略:

  • none:没有重启。任务一旦遇到异常,就退出。

  • fixed-delay:固定延迟重启策略。也就是说,可以配置一个重启的次数。超过次数后,才会退出。

  • failure-rate:失败率重启策略。也就是说,任务的失败频率。超过该频率后才退出。在设定的频率之内,不会退出。

  • exponential-delay:指数延迟重启策略。也就是说,任务在失败后,下一次的延迟时间是随着指数增长的。

Checkpoint配置和重启策略的配置

execution.checkpointing.interval: 5000
#设置有且仅有一次模式 目前支持 EXACTLY_ONCE、AT_LEAST_ONCE        
execution.checkpointing.mode: EXACTLY_ONCE
state.backend: hashmap
#设置checkpoint的存储方式
state.checkpoint-storage: filesystem
#设置checkpoint的存储位置
state.checkpoints.dir: hdfs://node1:8020/checkpoints
#设置savepoint的存储位置
state.savepoints.dir: hdfs://node1:8020/checkpoints
#设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃
execution.checkpointing.timeout: 600000
#设置两次checkpoint之间的最小时间间隔
execution.checkpointing.min-pause: 500
#设置并发checkpoint的数目
execution.checkpointing.max-concurrent-checkpoints: 1
#开启checkpoints的外部持久化这里设置了清除job时保留checkpoint,默认值时保留一个 假如要保留3个
state.checkpoints.num-retained: 3
#默认情况下,checkpoint不是持久化的,只用于从故障中恢复作业。当程序被取消时,它们会被删除。但是你可以配置checkpoint被周期性持久化到外部,类似于savepoints。这些外部的checkpoints将它们的元数据输出到外#部持久化存储并且当作业失败时不会自动
清除。这样,如果你的工作失败了,你就会有一个checkpoint来恢复。
#ExternalizedCheckpointCleanup模式配置当你取消作业时外部checkpoint会产生什么行为:
#RETAIN_ON_CANCELLATION: 当作业被取消时,保留外部的checkpoint。注意,在此情况下,您必须手动清理checkpoint状态。
#DELETE_ON_CANCELLATION: 当作业被取消时,删除外部化的checkpoint。只有当作业失败时,检查点状态才可用。
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION

# 重启策略选一个

# 设置无重启策略
restart-strategy: none



# 设置固定延迟策略
restart-strategy: fixed-delay
# 尝试重启次数
restart-strategy.fixed-delay.attempts: 3
# 两次连续重启的间隔时间
restart-strategy.fixed-delay.delay: 3 s



# 设置失败率重启
restart-strategy: failure-rate
# 两次连续重启的间隔时间
restart-strategy.failure-rate.delay: 3 s
# 计算失败率的统计时间跨度
restart-strategy.failure-rate.failure-rate-interval: 1 min
# 计算失败率的统计时间内的最大失败次数
restart-strategy.failure-rate.max-failures-per-interval: 3




# 设置指数延迟重启
restart-strategy: exponential-delay
# 初次失败后重启时间间隔(初始值)
restart-strategy.exponential-delay.initial-backoff: 1 s
# 以后每次失败,重启时间间隔为上一次重启时间间隔乘以这个值
restart-strategy.exponential-delay.backoff-multiplier: 2
# 每次重启间隔时间的最大抖动值(加或减去该配置项范围内的一个随机数),防止大量作业在同一时刻重启
restart-strategy.exponential-delay.jitter-factor: 0.1
# 最大重启时间间隔,超过这个最大值后,重启时间间隔不再增大
restart-strategy.exponential-delay.max-backoff: 1 min
# 多长时间作业运行无失败后,重启间隔时间会重置为初始值(第一个配置项的值)
restart-strategy.exponential-delay.reset-backoff-threshold: 1 h

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/426055.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【谷粒商城之JSR303数据校验和集中异常处理】

本笔记内容为尚硅谷谷粒商城JSR303数据校验和集中异常处理部分 目录 一、简介 二、SR303数据校验使用步骤 1、引入依赖 2、给参数对象添加校验注解 常见的注解 3、接口参数前增加Valid 开启校验 三、异常的统一处理 四、分组解决校验 1、创建Groups 2、添加分组 …

MySQL数据库之表的增删改查(进阶)

目录1. 数据库约束1.1 约束类型1.2 NULL约束1.3 UNIQUE&#xff1a;唯一约束1.4 DEFAULT&#xff1a;默认值约束1.5 PRIMARY KEY&#xff1a;主键约束1.6 FOREIGN KEY&#xff1a;外键约束1.7 CHECK约束2 表之间的关系2.1 一对一2.2 一对多2.3 多对多3 新增4 查询4.1 聚合查询4…

Redis一致性问题

&#xff08;1&#xff09;何为一致性&#xff1f; 1、定义&#xff1a; 指系统中各节点数据保持一致。 分布式系统中&#xff0c;可以理解为多个节点中的数据是一致的。 2、分类&#xff1a; 强一致性&#xff1a;写进去的数据是什么&#xff0c;读出来的数据就是什么。弱一…

DeepSpeed-Chat:最强ChatGPT训练框架,一键完成RLHF训练!

https://github.com/microsoft/DeepSpeedExamples/tree/master/applications/DeepSpeed-Chat 一个快速、负担得起、可扩展和开放的系统框架&#xff0c;用于实现端到端强化学习人类反馈 (RLHF) 培训体验&#xff0c;以生成各种规模的高质量 ChatGPT 样式模型。 目录 &#x…

计算机体系结构-体系结构基础与流水线原理

计算机体系结构&#xff1a;体系结构基础与流水线原理 ​ 计算机体系结构&#xff1a;量化设计与分析一书以RISC-V为例介绍计算机体系结构。本文为第一部分&#xff0c;介绍体系结构的基本知识和流水线原理。笔记内容为原书的第一章&#xff0c;附录A、B、C。 第一章 量化设计…

练习Tomcat

文章目录1. 简述静态网页和动态网页的区别。2. 简述 Webl.0 和 Web2.0 的区别。3. 安装tomcat8&#xff0c;配置服务启动脚本&#xff0c;部署jpress应用。1. 简述静态网页和动态网页的区别。 静态网页&#xff1a; &#xff08;1&#xff09;静态网页不能简单地理解成静止不…

SCADE Display(OpenGL)软件设计文档生成工具的设计考虑

SCADE Display&#xff08;OpenGL&#xff09;软件设计文档生成工具的设计考虑 2018年6月 1 引言 本文档描述在SCADE Display软件设计文档生成工具&#xff08;以下简称为SDYSDDGEN&#xff09;的设计过程中考虑到的一些问题及其解决方案。 2 目标 SDYSDDGEN的目标设定为&…

面向对象程序设计 C++总结笔记(1)

面向对象程序设计 学习方法 理解基本原理掌握程序设计方法加强动手实践 课程目标 理解面向对象程序设计的基本原理&#xff0c;掌握面向对象技术的基本概念和封装性、继承性和多态性&#xff0c;能够具有面向对象程序设计思想。掌握C语言面向对象的基本特性和C语言基础知识&…

就在20号!袋鼠云春季生长大会邀您共观数智生机,我们云上见

如今&#xff0c;数字经济正逐步走向深化应用、规范发展、普惠共享的新阶段&#xff0c;数字经济与实体经济深度融合、基础软件国产化替代成为数字时代主潮流。 「 2023 袋鼠云春季生长大会」乘风而起&#xff0c;带您走近大数据基础软件——数栈&#xff0c;低代码数字孪生世界…

Hadoop之Yarn篇

目录 ​编辑 Yarn的工作机制&#xff1a; 全流程作业&#xff1a; Yarn的调度器与调度算法&#xff1a; FIFO调度器&#xff08;先进先出&#xff09;&#xff1a; 容量调度器&#xff08;Capacity Scheduler&#xff09;&#xff1a; 容量调度器资源分配算法&#xff1…

【面试题】对 JSON.stringify()与JSON.parse() 理解

大厂面试题分享 面试题库 前后端面试题库 &#xff08;面试必备&#xff09; 推荐&#xff1a;★★★★★ 地址&#xff1a;前端面试题库 web前端面试题库 VS java后端面试题库大全 重新学习这两个API的起因 在本周五有线上的项目&#xff0c;16:30开始验证线上环境。 开始…

【数据挖掘与商务智能决策】第十一章 AdaBoost与GBDT模型

11.1 AdaBoost模型简单代码实现 1.AdaBoost分类模型演示 from sklearn.ensemble import AdaBoostClassifier X [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]] y [0, 0, 0, 1, 1]model AdaBoostClassifier(random_state123) model.fit(X, y)print(model.predict([[5, 5]]))[0…

使用 Urch 让 Ubuntu 原生远程控制功能稳定可靠

有些时候&#xff0c;使用远程控制能够简化不少运维和操作的事情。 本篇文章分享如何通过开源工具 “Urch&#xff08;Ubuntu Remote Control Helper&#xff09;” 让 Ubuntu 原生的远程控制&#xff08;远程桌面&#xff09;功能稳定可靠。 方案已经经过 Ubuntu 22.04 LTS …

JVM之低延迟垃圾收集器

目录 低延迟垃圾收集器 概要 各款收集器的并发情况 Shenandoah收集器 Shenandoah相比G1的改进之处 链接矩阵 定义 优点 Shenandoah收集器的工作过程 Brooks Pointer 转发指针技术 转发指针的优缺点 Shenandoah 性能测试 Shenandoah 总结 ZGC 收集器 ZGC的Region的…

编译原理第一章

编译原理笔记 文章目录编译原理笔记day1什么是编译&#xff1f;编译器的结构词法分析概述词法分析的主要任务语法分析概述主要目的主要任务具体实例语义分析概述主要目的主要任务中间代码生成和编译器后端常用的中间表示形式目标代码生成器代码优化器day1 什么是编译&#xff…

Mysql 学习(一)基础知识(待更新)

文章目录服务端处理客户端请求启动项系统变量启动项和系统变量的区别常见的字符集字符集比较规则服务端处理客户端请求 客户端进程向服务器进程发送一段文本&#xff08;MySQL语句&#xff09;&#xff0c;服务器进程处理后再向客户端进程发送一段文本&#xff08;处理结果&am…

Idea使用样式主题

目的 花里胡哨的idea显示主题 安装插件 在preferences>plugins中搜索“Material Theme”安装两个中的一个 重启>设置>选择主题 对比度&#xff08;多选&#xff09; Contrast Mode:对比度模式&#xff0c;目录结构&#xff0c;选项卡等非文本选择前后的颜色对比度。…

Docker部署RabbitMQ(单机,集群,仲裁队列)

RabbitMQ部署指南 1.单机部署 我们在Centos7虚拟机中使用Docker来安装。 1.1.下载镜像 方式一&#xff1a;在线拉取 docker pull rabbitmq:3-management方式二&#xff1a;从本地加载 在课前资料已经提供了镜像包&#xff1a; 上传到虚拟机中后&#xff0c;使用命令加载镜…

【论文阅读】(20230410-20230416)论文阅读简单记录和汇总

&#xff08;20230410-20230416&#xff09;论文阅读简单记录和汇总 2023/04/09&#xff1a;很久没有动笔写东西了&#xff0c;这两周就要被抓着汇报了&#xff0c;痛苦啊呜呜呜呜呜 目录 (CVPR 2023): Temporal Interpolation Is All You Need for Dynamic Neural Radiance …

RPC 漫谈:序列化问题

RPC 漫谈&#xff1a;序列化问题 何为序列 对于计算机而言&#xff0c;一切数据皆为二进制序列。但编程人员为了以人类可读可控的形式处理这些二进制数据&#xff0c;于是发明了数据类型和结构的概念&#xff0c;数据类型用以标注一段二进制数据的解析方式&#xff0c;数据结…