Checkpoint机制和生产配置

news2024/12/24 8:36:19

1.前提

在将Checkpoint之前,先回顾一下flink处理数据的流程:

在这里插入图片描述

2. 概述

Checkpoint机制,又叫容错机制,可以保证流式任务中,不会因为异常时等原因,造成任务异常退出。可以保证任务正常运行。
(1)能在集群异常时,保持已计算的数据,下次恢复时能在已保存数据的基础上,继续计算(类似于快照);
(2)避免数据丢失(通过Barrier实现)

3.机制运行流程

在这里插入图片描述
解释:

(1)主节点上的检查点协调器(CheckpointCoordinator)会周期性地发送一个个地Barrier(栅栏,前面说的 偏移量做标识),Barrier会混在数据里,随着数据流,流向source算子;

(2)source算子在摄入数据的时候,如果碰到Barrier栅栏,不会去处理,Barrier就会让先算子去汇报当前的状态

(3)处理完之后,Barrier就会随着数据流,流向下一个算子;

(4)下一个算子收到Barrier,同样会停下手里的工作,也会向检查点协调器汇报当前的状态,把状态往主节点传递一份(备份,防止算子出错,状态丢失)
(5)上一步处理完之后,Barrier又会随着数据流向下一个算子,以此类推。
(6)等Barrier流经所有的算子之后,这一轮的快照就算制作完成

4. 状态后端

状态后端,StateBackend,就是Flink存储状态的介质(存储状态的地方)。Flink提供了三种状态后端的存储方式:

  • MemoryStateBackend(内存,使用HashMapStateBackend实现,生产一般不用)
  • FsStateBackend(文件系统,比如说HDFS,生产常用)
  • RocksDBStateBackend(RocksDB数据库,生产常用)
  • 同时也可以把状态外置到 Hbase和Redis,解决大状态存储问题
MemoryStateBackend

内存,掉电易失。不安全。基本不用。
在这里插入图片描述
配置如下:

state.backend: hashmap
# 可选,当不指定 checkpoint 路径时,默认自动使用 JobManagerCheckpointStorage
state.checkpoint-storage: jobmanager
FsStateBackend

FsStateBackend,文件系统的状态后端,就是把状态保存在文件系统中,常用来保存状态的文件系统有HDFS;
工作中常用;
在这里插入图片描述
配置如下:

state.backend: hashmap 
state.checkpoints.dir: file:///checkpoint-dir/ 

# 默认为FileSystemCheckpointStorage 
state.checkpoint-storage: filesystem
RocksDBStateBackend

RocksDBStateBackend,把状态保存在RocksDB数据库中。

RocksDB,是一个小型文件系统的数据库。

配置如下:

state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/

# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem

特点:可以保持巨大的状态,且支持增量状态保存。

5.重启策略

5.1 重启策略概述

Flink流式任务,需要长期运行,就算遇到一些数据异常问题等,也不能随便退出。

Flink为了让任务能够在遇到异常退出时,能够重新启动,正常运行,Flink提出了重启策略的概念。

5.2 Flink的重启策略

Flink支持四种类型的重启策略:

  • none:没有重启。任务一旦遇到异常,就退出。

  • fixed-delay:固定延迟重启策略。也就是说,可以配置一个重启的次数。超过次数后,才会退出。

  • failure-rate:失败率重启策略。也就是说,任务的失败频率。超过该频率后才退出。在设定的频率之内,不会退出。

  • exponential-delay:指数延迟重启策略。也就是说,任务在失败后,下一次的延迟时间是随着指数增长的。

5.3案例演示
模拟异常的代码
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * Flink 代码实现流处理,进行单词统计。数据源来自于socket数据。
 * todo 演示Flink遇到异常重启。
 */
public class RestartStrategy {
    public static void main(String[] args) throws Exception {
        //1.构建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.setParallelism(1);
        //2.数据输入(数据源)
        //从socket读取数据,socket = hostname + port
        DataStreamSource<String> source = env.socketTextStream("node1", 9999);
        //3.数据处理
        //3.1 使用flatMap进行扁平化处理
        SingleOutputStreamOperator<String> flatMapStream = source.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    if (word.equals("evil")) {
                        //evil:恶魔,魔鬼,程序如果碰到魔鬼就退出。
                        throw new Exception("魔鬼来了,程序退出");
                    }
                    out.collect(word);
                }
            }
        });
        //3.2 使用map进行转换,转换成(单词,1)
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return Tuple2.of(value, 1);
            }
        });
        //3.3使用keyBy进行单词分组
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });
        //3.4 使用reduce(sum)进行聚合操作,sum:就是根据第一个元素(Integer)进行sum操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedStream.sum(1);
        //4.数据输出
        result.print();
        //5.启动流式任务
        env.execute();
    }
}

5.4Checkpoint配置

修改flink-conf.yaml文件

execution.checkpointing.interval: 5000
#设置有且仅有一次模式 目前支持 EXACTLY_ONCE、AT_LEAST_ONCE        
execution.checkpointing.mode: EXACTLY_ONCE
state.backend: hashmap
#设置checkpoint的存储方式
state.checkpoint-storage: filesystem
#设置checkpoint的存储位置
state.checkpoints.dir: hdfs://node1:8020/checkpoints
#设置savepoint的存储位置
state.savepoints.dir: hdfs://node1:8020/checkpoints
#设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃
execution.checkpointing.timeout: 600000
#设置两次checkpoint之间的最小时间间隔
execution.checkpointing.min-pause: 500
#设置并发checkpoint的数目
execution.checkpointing.max-concurrent-checkpoints: 1
#开启checkpoints的外部持久化这里设置了清除job时保留checkpoint,默认值时保留一个 假如要保留3个
state.checkpoints.num-retained: 3
#默认情况下,checkpoint不是持久化的,只用于从故障中恢复作业。当程序被取消时,它们会被删除。但是你可以配置checkpoint被周期性持久化到外部,类似于savepoints。这些外部的checkpoints将它们的元数据输出到外#部持久化存储并且当作业失败时不会自动
清除。这样,如果你的工作失败了,你就会有一个checkpoint来恢复。
#ExternalizedCheckpointCleanup模式配置当你取消作业时外部checkpoint会产生什么行为:
#RETAIN_ON_CANCELLATION: 当作业被取消时,保留外部的checkpoint。注意,在此情况下,您必须手动清理checkpoint状态。
#DELETE_ON_CANCELLATION: 当作业被取消时,删除外部化的checkpoint。只有当作业失败时,检查点状态才可用。
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION

#------------------------------------

# 设置固定延迟策略
restart-strategy: fixed-delay
# 尝试重启次数
restart-strategy.fixed-delay.attempts: 3
# 两次连续重启的间隔时间
restart-strategy.fixed-delay.delay: 3 s
fixed-delay重启策略

提交命令:

#1.启动HDFS
#2.把jar包上传到Linux
#3.配置Flink的Checkpoint和重启策略
#4.提交任务
	cd $FLINK_HOME
	bin/flink run -c test.RestartStrategy /root/original-gz_flinkbase-1.0-SNAPSHOT.jar
#5.在socket中数据单词
nc -lk 9999
hadoop
hive
flink
evil

运行结果:
在这里插入图片描述

6.官方推荐的配置

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1621631.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【iCloud】土耳其苹果礼品卡购买

礼品购买官网 注册账号后&#xff0c;点击苹果礼品卡&#xff0c;选择自己想要的一款加购 没有国外的卡&#xff0c;只能选这种方式付款&#xff0c;使用银联卡&#xff1a; 接着填入银行卡信息付款即可。

Xbox VR头盔即将推出,但它是Meta Quest的‘限量版’。

&#x1f4f3;Xbox VR头盔即将推出&#xff0c;但它是Meta Quest的‘限量版’。 微软与Meta合作推出限量版Meta Quest VR头映射Xbox风格&#xff0c;可能是Meta Quest 3或未来版本的特别定制版&#xff0c;附带Xbox控制器。这一合作是Meta向第三方硬件制造商开放其Quest VR头盔…

介绍一个开源IOT组态项目

项目介绍 金合可视化平台是一款强大而操作简便的低代码平台&#xff0c;专为满足物联网领域的可视化开发需求而设计。通过该平台&#xff0c;用户可以利用拖拽配置的方式&#xff0c;轻松创建个性化的可视化大屏&#xff0c;无需熟练的编程技能&#xff0c;大幅提高了开发效率。…

Go Sync并发包之errgroup

你是否写过一个函数&#xff0c;它之所以很长&#xff0c;是因为它要完成很多任务&#xff0c;即使这些任务之间并不相互依赖&#xff1f; 你是否写过一个很长的函数&#xff0c;因为它要完成很多任务&#xff0c;即使这些任务并不相互依赖&#xff1f;我就遇到过这种情况。 想…

k8s集群CD工具-ArgoCD

ArgoCD是什么 Argo CD 是 Kubernetes 的声明式 GitOps 持续交付工具。应用程序定义、配置和环境应该是声明性的和版本控制的。应用程序部署和生命周期管理应该是自动化的、可审计的且易于理解。 官方文档 CD工作流&#xff08;无ArgoCD&#xff09; 假设有一个微服务应用程序…

Python解析和嵌入媒体资源的工具库之micawber使用详解

概要 在Web开发中,经常需要处理媒体资源的解析和嵌入,例如视频、音频、图片等。Python Micawber库就是一个用于解析和嵌入媒体资源的工具,它可以自动识别各种媒体资源的URL,并生成对应的嵌入代码,方便在网页中展示多媒体内容。 安装 可以通过pip来安装Micawber库: pip…

RustGUI学习(iced)之小部件(一):如何使用按钮和文本标签部件

前言 本专栏是学习Rust的GUI库iced的合集&#xff0c;将介绍iced涉及的各个小部件分别介绍&#xff0c;最后会汇总为一个总的程序。 iced是RustGUI中比较强大的一个&#xff0c;目前处于发展中&#xff08;即版本可能会改变&#xff09;&#xff0c;本专栏基于版本0.12.1. 概述…

开源文本嵌入模型M3E

进入正文前&#xff0c;先扯点题外话 这两天遇到一个棘手的问题&#xff0c;在用 docker pull 拉取镜像时&#xff0c;会报错&#xff1a; x509: certificate has expired or is not yet valid 具体是下面&#x1f447;这样的 rootDS918:/volume2/docker/xiaoya# docker pul…

数据中台工具的选型要点_光点科技

数据中台工具扮演着举足轻重的角色。想要全面理解数据中台工具的意义、作用以及应用方式&#xff0c;就必须深入探讨这一概念以及相关实践。 数据中台工具概述 数据中台&#xff0c;是一个支持数据集成、管理、分析和服务的平台&#xff0c;它能够帮助企业统一数据资源&#xf…

Linux使用操作(一)

Linux创建链接的方式 在Linux中&#xff0c;可以给文件创建链接。链接的意思可以理解是快捷方式&#xff0c;它指向另一个文件或目录。 软链接 软连接&#xff08;也叫符号链接&#xff09;是一种特殊类型的文件&#xff0c;它指向另一个文件或目录 语法 ln -s 原文件路径…

udp/tcp错误总结

udp tcp——多进程 tcp——多线程 tcp——线程池 tcp——守护进程 &#x1f386;udp  ✨pthread_create 错误总结  ✨LockGuard错误总结  ✨服务端需要写成多线程  ✨客户端也需要写成多线程  ✨多线程调试工具 &#x1f386;tcp  ✨tcp独有调试工具——telnet  ✨Threa…

【Unity】UnityEvent(一)

​UnityEvent----高效管理游戏事件的利器 在游戏开发中&#xff0c;事件系统是实现各种功能的关键组成部分。它允许我们将不同对象之间的交互解耦&#xff0c;使得代码更加模块化和易于维护。而UnityEvent作为Unity引擎提供的一种强大的事件系统工具&#xff0c;为开发者提供了…

CTFHub(web sql)(四)

Cookie注入 Cookie 注入的原理也和其他注入一样&#xff0c;只不过是将提交的参数已 Cookie 方式提交&#xff0c;而一般的注入是使用 GET 或者 POST 方式提交&#xff0c;GET 方式提交就是直接在网址后面加上需要注入的语句&#xff0c;POST 方式则是通过表单&#xff0c;GET …

数据仓库与数据挖掘(实验一2024.4.24)

实验准备&#xff1a; 1.下载conda 2.配置环境C:\ProgramData\miniconda3\Scripts 3.创建文件夹panda进入虚拟环境qq 激活虚拟环境&#xff1a;activate qq 启动jupyter lab&#xff08;python语言环境编译&#xff09;&#xff1a;jupyter lab 4.panda下载 &#xff08;…

【行为型模式】中介者模式

一、中介者模式概述 中介者模式定义&#xff1a;用一个中介对象来封装一系列的对象交互&#xff0c;中介者使各对象不需要显式地相互引用&#xff0c;从而使其耦合松散&#xff0c;而且可以独立地改变它们之间的交互。中介者模式又称为调停者模式。(对象行为型模式) 中介者模式…

基于uni-app的动态表单

一、应用场景和意义 可以通过配置字段和校验规则&#xff0c;快速完成页面开发、提升开发效率 二、应用前提 形成ui/业务规范&#xff0c;最好是应用在问卷调查之类的业务 三、动态表单的功能 字段报错、快速滚动定位报错信息、支持字段值和字段规则拆分&#xff0c;便于实…

《R语言与农业数据统计分析及建模》学习——描述性统计分析

一、描述性统计概念和方法 1、概念和作用 描述性统计是对数据进行概括和描述&#xff0c;便于理解数据的特征、趋势和分布&#xff0c;帮助我们了解数据基本情况和总体特征&#xff0c;为后续更深入的数据分析和建模提供基础。 2、基础方法 &#xff08;1&#xff09;中心趋…

STM32的定时器

一、介绍 定时器的工作原理 通用定时器的介绍 定时器的计数模式 定时器时钟源 定时器溢出时间计算公式 二、使用定时器中断点亮LED灯 打开一个LED灯 更改TIME2 然后就是生成代码 三&#xff0c;代码

深度学习基础之《TensorFlow框架(15)—神经网络》

一、神经网络基础 1、什么是神经网络 人工神经网络&#xff08;Artificial Neural Network&#xff0c;简写为ANN&#xff09;。也简称为神经网络&#xff08;NN&#xff09; 是一种模仿生物神经网络&#xff08;动物的中枢神经系统&#xff0c;特别是大脑&#xff09;结构和功…

网络安全与密码学--AES加密

分组加密之AES加密算法 AES算法的诞生 python实现AES加密 AES加密详细流程 AES解密过程 AES的应用 1997年 NIST征集AES&#xff08;Advanced Encryption Standard&#xff09;2000年选中 https://www.nist.gov/ https://csrc.nist.gov/projects/block-cipher-techniques A…