大数据-玩转数据-Flink 容错机制

news2025/1/19 14:40:02

一、概述

在分布式架构中,当某个节点出现故障,其他节点基本不受影响。在 Flink 中,有一套完整的容错机制,最重要就是检查点(checkpoint)。

二、检查点(Checkpoint)

在流处理中,我们可以用存档读档的思路,把之前的计算结果做个保存,这样重启之后就可以继续处理新数据、而不需要重新计算了。所以我们最终的选择,就是将之前某个时间点所有的状态保存下来,这份“存档”就是所谓的“检查点(checkpoint)。遇到故障重启的时候,我们可以从检查点中“读档”,恢复出之前的状态,这样就可以回到当时保存的一刻接着处理数据了。checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性。

三、检查点快照的实现算法

1、简单算法:暂停应用,然后开始做检查点, 再重新恢复应用 。
2、Flink的改进Checkpoint算法. Flink的checkpoint机制原理自"Chandy-Lamport algorithm"算法(分布式快照算)的一种变体: 异步 barrier 快照(asynchronous barrier snapshotting)每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个 CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制作。

重要概念:流的barrier
流的barrier是Flink的Checkpoint中的一个核心概念. 多个barrier被插入到数据流中, 然后作为数据流的一部分随着数据流动(有点类似于Watermark)。这些barrier不会跨越流中的数据。每个barrier会把数据流分成两部分: 一部分数据进入当前的快照 , 另一部分数据进入下一个快照 。每个barrier携带着快照的id。barrier 不会暂停数据的流动, 所以非常轻量级。 在流中,同一时间可以有来源于多个不同快照的多个barrier, 这个意味着可以并发的出现不同的快照。

Flink的检查点制作过程
1、Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint,然后Source Task会在数据流中安插CheckPoint barrier;

2、source 节点向下游广播 barrier,这个 barrier 就是实现 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有进来的 barrier 才会执行相应的 Checkpoint(barrier对齐, 但是新版本有一种新的barrier);

3、当 task 完成 state checkpoint后,会将备份数据的地址(state handle)通知给 Checkpoint coordinator;

4、下游的 sink 节点收集齐上游两个 input 的 barrier 之后,会执行本地快照;

5、同样的,sink 节点在完成自己的 Checkpoint 之后,会将 state handle 返回通知 Coordinator;

6、最后,当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件。

严格一次语义: barrier对齐

在多并行度下, 如果要实现严格一次, 则要执行barrier对齐。
当 job graph 中的每个 operator 接收到 barriers 时,它就会记录下其状态。拥有两个输入流的 Operators(例如 CoProcessFunction)会执行 barrier 对齐(barrier alignment) 以便当前快照能够包含消费两个输入流 barrier 之前(但不超过)的所有 events 而产生的状态。

1、当operator收到数字流的barrier n时, 它就不能处理(但是可以接收)来自该流的任何数据记录,直到它从字母流所有输入接收到 barrier n 为止。否则,它会混合属于快照 n 的记录和属于快照 n + 1 的记录;

2、接收到 barrier n 的流(数字流)暂时被搁置。从这些流接收的记录入输入缓冲区, 不会被处理;

3、 Checkpoint barrier n之后的数据 123已结到达了算子, 存入到输入缓冲区没有被处理, 只有等到字母流的Checkpoint barrier n到达之后才会开始处理;

一旦最后所有输入流都接收到 barrier n,Operator 就会把缓冲区中 pending 的输出数据发出去,然后把 CheckPoint barrier n 接着往下游发送。这里还会对自身进行快照。

至少一次语义: barrier不对齐

假设不对齐, 在字母流的Checkpoint barrier n到达前, 已经处理了1 2 3. 等字母流Checkpoint barrier n到达之后, 会做Checkpoint n. 假设这个时候程序异常错误了, 则重新启动的时候会Checkpoint n之后的数据重新计算. 1 2 3 会被再次被计算, 所以123出现了重复计算。

savepoint原理

1、Flink 还提供了可以自定义的镜像保存功能,就是保存(savepoints)
2、原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以认为就是具有一些额外元数据的检查点
3、Flink不会自动创建保存点,因此用户(或外部调度程序)必须明确地触发创建操作
4、保存点是一个强大的功能。除了故障恢复外,保存点可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用,等等。

四、Kafka+Flink+Kafka 实现端到端严格一次

我们知道,端到端的状态一致性的实现,需要每一个组件都实现,对于Flink + Kafka的数据管道系统(Kafka进、Kafka出)而言,各组件怎样保证exactly-once语义呢?

  1. 内部 —— 利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证部的状态一致性
  2. source —— kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性
  3. sink —— kafka producer作为sink,采用两阶段提交 sink,需要实现一个 TwoPhaseCommitSinkFunction
    内部的checkpoint机制我们已经有了了解,那source和sink具体又是怎样运行的呢?接下来我们逐步做一个分析。

具体的两阶段提交步骤总结如下:

  1. 某个checkpoint的第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka分区日志但标记为未提交,这就是“预提交”(第一阶段提交)
  2. jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到 barrier
    的算子状态后端会进行相应进行checkpoint,并通jobmanagerr
  3. sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知
    jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
  4. jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成
  5. sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据(第二阶段提交)
  6. 外部kafka关闭事务,提交的数据可以正常消费了

在这里插入图片描述

五、代码中测试Checkpoint

package com.lyh.flink10;

import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;


import java.util.Properties;

public class kafka_flink_kafka_checkpoint {
    public static void main(String[] args) throws Exception {

        Properties sourceproperties = new Properties();
        sourceproperties.setProperty("bootstrap.servers","hadoop100:9092");
        sourceproperties.setProperty("group.id", "kafka_flink_kafka_checkpoint");
        sourceproperties.setProperty("auto.offset.reset","latest");
        StreamExecutionEnvironment env = StreamExecutionEnvironment
                .createLocalEnvironmentWithWebUI(new Configuration())
                .setParallelism(3);
        env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:8020/flink/checkpoints/rocksdb"));
        // 每 1000ms 开始一次 checkpoint
        env.enableCheckpointing(1000);
        // 高级选项:
        // 设置模式为精确一次 (这是默认值)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        // 确认 checkpoints 之间的时间会进行 500 ms
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

        // Checkpoint 必须在一分钟内完成,否则就会被抛弃
        env.getCheckpointConfig().setCheckpointTimeout(60000);

        // 同一时间只允许一个 checkpoint 进行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        // 开启在 job 中止后仍然保留的 externalized checkpoints
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.addSource(new FlinkKafkaConsumer<String>("s1",new SimpleStringSchema(),sourceproperties))
                .map(line ->{
                    String[] datas = line.split(",");
                    return new WaterSensor(
                            datas[0],
                            Long.valueOf(datas[1]),
                            Integer.valueOf(datas[2])
                    );
                }).keyBy(WaterSensor::getId)
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {
                    private ValueState<Integer> state;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                         state = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("state", Integer.class));
                    }

                    @Override
                    public void processElement(WaterSensor value,
                                               Context ctx,
                                               Collector<String> out) throws Exception {
                        Integer lastVC = value.getVc() == null ? 0 : value.getVc();
                        if ((Math.abs(value.getVc())-lastVC)>=10) {
                            out.collect(value.getId()+"红色告警");

                        }
                        state.update(value.getVc());


                    }
                }).addSink(new FlinkKafkaProducer<String>("hadoop100:9092","alert",new SimpleStringSchema()));
         env.execute();

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/997382.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

初识Nacos

前言 Nacos是一个用于微服务架构下的服务发现和配置管理以及服务管理的综合解决方案&#xff08;官网介绍&#xff09;&#xff0c;这里的服务发现其实就是注册中心&#xff0c;配置管理就是配置中心&#xff0c;而服务管理是二者的综合&#xff1b; Nacos特性 1.服务发现与…

《Go语言在微服务中的崛起:为什么Go是下一个后端之星?》

&#x1f337;&#x1f341; 博主猫头虎&#x1f405;&#x1f43e; 带您进入 Golang 语言的新世界✨✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文并茂&#x1f…

【JavaEE】_CSS常用属性值

目录 1. 字体属性 1.1 设置字体家族 font-family 1.2 设置字体大小 font-size 1.3 设置字体粗细 font-weight 1.4 设置字体倾斜 font-style 2. 文本属性 2.1 设置文本颜色 color 2.2 文本对齐 text-align 2.3 文本装饰 text-decoration 2.4 文本缩进 text-indent 2.…

车载软件架构——基础软件供应商开发工具链(二)

车载软件架构——基础软件供应商&开发工具链(二) 我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 没有人关注你。也无需有人关注你。你必须承认自己的价值,你不能站在他人的角度来反对自己…

2023-9-10 能被整除的数

题目链接&#xff1a;能被整除的数 #include <iostream> #include <algorithm>using namespace std;typedef long long LL;const int N 20;int n, m; int p[N];int main() {cin >> n >> m;for(int i 0; i < m; i) cin >> p[i];int res 0;f…

性能测试 —— 全链路压测最佳实践!

全链路压测理论基础 什么是全链路压测 基于实际的生产业务场景、系统环境&#xff0c;基于真实数据模拟海量的用户请求对整个业务链进行压力测试&#xff0c;并持续调优的过程&#xff1b; 全链路的核心为&#xff1a;业务场景、数据链路、压力模型和环境拓扑&#xff1b;…

QuantLib学习笔记——看看几何布朗运动有没有股票走势的感觉

⭐️ 小鹿在乱撞 小伙伴们肯定看过股票的走势&#xff0c;真是上蹿下跳啊&#xff0c;最近小编学了一丢丢关于随机过程和QuantLib的知识&#xff0c;想利用随机过程生成一个类似股票价格走势的图&#xff0c;安排&#xff01;&#xff01;&#xff01; ⭐️ 随机过程 随机过程…

基于seata实现分布式事务实现订单服务 + 账户服务 + 商品库存服务之间的分布式事务

概述 实现订单服务 账户服务 商品库存服务之间的分布式事务. 订单服务生成订单,同时调用账户服务扣减金额, 调用库存服务扣减库存. 服务采用seata的刚性事务, 保证数据一致性. 详细 1.需求&#xff08;要做什么&#xff09; 模仿一个购物流程. 利用seata的分布式事务实现 …

【数据结构】栈、队列和数组

栈、队列和数组 栈队列数组数组的顺序表示和实现顺序表中查找和修改数组元素 矩阵的压缩存储特殊矩阵稀疏矩阵 栈 初始化 #define MaxSize 50//栈中元素的最大个数 typedef char ElemType;//数据结构 typedef struct{int top;//栈顶指针ElemType data[MaxSize];//存放栈中的元…

MySQL无法查看系统默认字符集以及校验规则

show variables like character_set_database; show variables like collation_database;这个错误信息表示MySQL在尝试访问performance_schema.session_variables表时&#xff0c;发现该表不存在。这个问题可能是由于MySQL的版本升级导致的。解决这个问题的一种方法是运行mysql…

[管理与领导-81]:IT基层管理者 - 核心技能 - 高效执行力 - 6- 两大思维:服务思维、重点思维、高效与快速的区别?

目录 前言&#xff1a; 一、服务思维 二、重点思维 三、高效与快速的区别 补充&#xff1a;执行人才的表现 前言&#xff1a; 在某种程度上&#xff0c;高效与完美是相互制约的&#xff0c;要想高效&#xff0c;大多数时候&#xff0c;会牺牲一部分完美&#xff0c;而要达…

Pandas 掉包侠刷题实战--条件筛选

本博文内容为力扣刷题过程的记录&#xff0c;所有题目来源于力扣。 题目链接&#xff1a;https://leetcode.cn/studyplan/30-days-of-pandas/ 文章目录 准备工作1. isin(values) 和 ~2. df.drop_duplicates()3. df.sort_values()4. df.rename()5. pd.merge() 题目-条件筛选1. 大…

postgresql-窗口函数种类

postgresql-聚合窗口函数 聚合函数排名窗口函数案例1案例2 取值窗口函数环比增长率同比增长率 聚合函数 常用的聚合函数&#xff0c;例如 AVG、SUM、COUNT 等&#xff0c;也可以作为窗口函数使用 --计算移动平均值 select saledate, amount, avg(amount) over (order by sale…

Leetcode - 361周赛

一&#xff0c;2843. 统计对称整数的数目 这道题直接暴力&#xff0c;要注意的一点是这个数字必须是由 2 * N 位数字组成。 代码如下&#xff1a; class Solution {public int countSymmetricIntegers(int low, int high) {int ans 0;for(int ilow; i<high; i){if(i>1…

threejs的dat.gui辅助工具的使用

threejs的dat.gui辅助工具的使用 安装使用 安装 npm i dat.gui -S 使用 import dat from dat.gui const controlData {rotationSpeed: 0.01,color: #66ccff,wireframe: false } const gui new dat.GUI() const f gui.addFolder(配置) f.add(controlData, rotationSpeed, …

【Proteus仿真】【STM32单片机】血压心率血氧体温蓝牙

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 系统运行后&#xff0c;LCD1604液晶显示心率、血氧、血压和体温&#xff0c;及其阈值&#xff1b;可通过K3键进入阈值设置模式&#xff0c;K1和K2加减调节&#xff0c;K4确定&#xff1b;当检测心率、血氧…

Java JUC 并发编程(笔记)

文章目录 再谈多线程并发与并行顺序执行并发执行并行执行 再谈锁机制重量级锁轻量级锁偏向锁锁消除和锁粗化 JMM内存模型Java内存模型重排序volatile关键字happens-before原则 多线程编程核心锁框架Lock和Condition接口可重入锁公平锁与非公平锁 读写锁锁降级和锁升级 队列同步…

ARMv7-A 那些事 - 2.通用寄存器与流水线

By: Ailson Jack Date: 2023.09.10 个人博客&#xff1a;http://www.only2fire.com/ 本文在我博客的地址是&#xff1a;http://www.only2fire.com/archives/154.html&#xff0c;排版更好&#xff0c;便于学习&#xff0c;也可以去我博客逛逛&#xff0c;兴许有你想要的内容呢。…

Visual Studio 2019 简单安装教程

思路 官方页面下载 – 安装Visual Studio Installer – 安装Visual Studio 2019 下载 打开页面&#xff1a;Visual Studio 2019 生成号和发布日期 | Microsoft Learn 点击需要的版本&#xff0c;跳转后会开始下载在线安装包&#xff0c;这里选择第一个Community版本 安装 …

SpringBoot【基础篇】

一、快速上手 按照要求&#xff0c;左侧选择web&#xff0c;然后在中间选择Spring Web即可&#xff0c;选完右侧就出现了新的内容项&#xff0c;这就表示勾选成功了 关注&#xff1a;此处选择的SpringBoot的版本使用默认的就可以了&#xff0c;需要说一点&#xff0c;SpringBo…