flink 增量快照同步文件引用关系和恢复分析

news2025/4/8 9:16:12

文章目录

      • 文件引用分析
      • 相关代码分析
      • 从state 恢复,以rocksdb为例
        • 不修改并行度
        • 修改并行度
          • keyGroupRange
          • 过程
          • 问题

文件引用分析

每次生成的checkpoint 里都会有所有文件的引用信息

  • 问题,引用分析里如何把f1,f2去掉了,可以参考下面的代码,每次生成引用分析时,会先list 本地的文件目录,因为f1,f2已经合并了,所以不会出现在后续里了
//保存引用的是个map,每个checkpoint 是一个
SortedMap<Long, Collection<HandleAndLocalPath>> uploadedSstFiles
uploadedSstFiles.put(checkpointId, Collections.unmodifiableList(sstFiles));

在这里插入图片描述

相关代码分析


  List<HandleAndLocalPath> sstFilesUploadResult =
                        stateUploader.uploadFilesToCheckpointFs(
                                sstFilePaths,
                                checkpointStreamFactory,
                                stateScope,
                                snapshotCloseableRegistry,
                                tmpResourcesRegistry);
                uploadedSize +=
                        sstFilesUploadResult.stream().mapToLong(e -> e.getStateSize()).sum();
                 //这里加上本次上传的
                sstFiles.addAll(sstFilesUploadResult);

 private void createUploadFilePaths(
                Path[] files,
                List<HandleAndLocalPath> sstFiles,
                List<Path> sstFilePaths,
                List<Path> miscFilePaths) {
            for (Path filePath : files) {
                final String fileName = filePath.getFileName().toString();

                if (fileName.endsWith(SST_FILE_SUFFIX)) {
                    Optional<StreamStateHandle> uploaded = previousSnapshot.getUploaded(fileName);
                    if (uploaded.isPresent()) {
                    //这里加上已经上传的
                        sstFiles.add(HandleAndLocalPath.of(uploaded.get(), fileName));
                    } else {
                        sstFilePaths.add(filePath); // re-upload
                    }
                } else {
                    miscFilePaths.add(filePath);
                }
            }
        }
    }

从state 恢复,以rocksdb为例

不修改并行度

大致是这三步

在这里插入图片描述

@SuppressWarnings("unchecked")
    private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) throws Exception {
        logger.info(
                "Starting to restore from state handle: {} without rescaling.", keyedStateHandle);
        if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
            IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle =
                    (IncrementalRemoteKeyedStateHandle) keyedStateHandle;
            restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle);
            restoreBaseDBFromRemoteState(incrementalRemoteKeyedStateHandle);
        } else if (keyedStateHandle instanceof IncrementalLocalKeyedStateHandle) {
            IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle =
                    (IncrementalLocalKeyedStateHandle) keyedStateHandle;
            restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle);
            restoreBaseDBFromLocalState(incrementalLocalKeyedStateHandle);
        } else {
            throw unexpectedStateHandleException(
                    new Class[] {
                        IncrementalRemoteKeyedStateHandle.class,
                        IncrementalLocalKeyedStateHandle.class
                    },
                    keyedStateHandle.getClass());
        }
        logger.info(
                "Finished restoring from state handle: {} without rescaling.", keyedStateHandle);
    }
修改并行度
keyGroupRange

先理解keyGroupRange
定义 :
KeyGroupRange 是一组连续的 KeyGroup 的集合。它表示某个任务实例(subtask)负责处理哪些 KeyGroup。
作用 :
KeyGroupRange 用于定义每个并行子任务(Task)需要处理的 KeyGroup 范围。
每个 Task 都会分配到一个或多个 KeyGroupRange,从而明确该 Task 应该处理哪些 KeyGroup 的数据和状态。
特点 :
KeyGroupRange 是一个范围的概念,可以包含多个 KeyGroup。
当任务的并行度发生变化时,KeyGroupRange 的分配也会相应调整
假设:

总共有 16 个 KeyGroup(编号从 0 到 15)。
并行度为 2。
分配方式
Task 1 负责 KeyGroupRange [0, 8),即 KeyGroup 0 到 7。
Task 2 负责 KeyGroupRange [8, 16),即 KeyGroup 8 到 15。
如果并行度增加到 4:

Task 1 负责 KeyGroupRange [0, 4),即 KeyGroup 0 到 3。
Task 2 负责 KeyGroupRange [4, 8),即 KeyGroup 4 到 7。
Task 3 负责 KeyGroupRange [8, 12),即 KeyGroup 8 到 11。
Task 4 负责 KeyGroupRange [12, 16),即 KeyGroup 12 到 15。

过程
  1. 核心挑战
    当并行度变化时,每个子任务负责的 数据分片(KeyGroup) 范围会发生变化。例如:

旧并行度 :2 个子任务,每个子任务负责 KeyGroup 0-1 和 KeyGroup 2-3。
新并行度 :4 个子任务,每个子任务负责更小的 KeyGroup 范围(如 0-0, 1-1, 2-2, 3-3)。
此时,需要将旧子任务的 状态数据 按新分片规则重新分配到新子任务中,同时避免数据丢失或重复。

  1. 恢复流程步骤
    步骤 1:选择“最佳”检查点
    目标 :找到一个与新子任务的 KeyGroup 范围 重叠最多的检查点增量句柄,作为恢复的基础。
    原因 :减少需要迁移的数据量,提高效率。
    示例 :
    如果新子任务的 KeyGroup 范围是 0-0,则优先选择旧检查点中覆盖 KeyGroup 0 的数据。
    步骤 2:下载所有相关状态数据
    操作 :将所有相关检查点的数据(如 SST 文件)从远程存储(如 HDFS/S3)下载到本地临时目录。
    原因 :确保所有可能需要的数据都可用,以便后续合并。
    步骤 3:初始化基础数据库
    操作 :
    加载“最佳”检查点的数据 :将选中的检查点数据加载到一个新的 RocksDB 实例中。
    裁剪数据 :仅保留属于新子任务 KeyGroup 范围 的键(通过键的前缀过滤)。
    示例 :
    新子任务负责 KeyGroup 0-0,则删除所有不属于该范围的键。
    步骤 4:合并其他检查点数据
    操作 :
    逐个处理其他检查点 :对剩余的检查点,逐个加载到临时 RocksDB 实例。
    遍历键值对 :逐条检查键是否属于新子任务的 KeyGroup 范围 。
    复制有效数据 :将符合条件的键值对写入主 RocksDB 实例。
    示例 :
    检查点 A 包含 KeyGroup 0-3 的数据,新子任务只保留 KeyGroup 0 的部分。
    步骤 5:清理临时数据
    操作 :删除临时文件和资源,释放存储空间。
问题

为什么选择重叠最多的增量检查点句柄?
减少数据合并工作量 :
初始检查点覆盖的 KeyGroup 越多,后续需要从其他检查点合并的数据越少。
提高恢复效率 :
减少临时 RocksDB 实例的数量和迭代次数,从而加速恢复过程。
降低资源消耗 :
减少网络传输和磁盘 I/O 开销。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2330369.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

c++概念—内存管理

文章目录 c内存管理c/c的内存区域划分回顾c语言动态内存管理c动态内存管理new和delete的使用new和delete的底层逻辑operator new函数和operator delete函数new和delete的实现操作方式不匹配的情况定位new new/delete和malloc/free的区别 c内存管理 在以往学习c语言的过程中&…

无人机双频技术及底层应用分析!

一、双频技术的核心要点 1. 频段特性互补 2.4GHz&#xff1a;穿透力强、传输距离远&#xff08;可达5公里以上&#xff09;&#xff0c;适合复杂环境&#xff08;如城市、建筑物密集区&#xff09;&#xff0c;但易受Wi-Fi、蓝牙等设备的干扰。 5.8GHz&#xff1a;带宽更…

【电视软件】小飞电视v2.7.0 TV版-清爽无广告秒换台【永久更新】

软件介绍 小飞电视是一款电视端的直播软件&#xff0c;无需二次付费和登录&#xff0c;资源丰富&#xff0c;高清流畅。具备开机自启、推送功能、自定义直播源、个性化设置及节目预告等实用功能&#xff0c;为用户带来良好的观看体验。基于mytv开源项目二改&#xff0c;涵盖央…

Valgrind——内存调试和性能分析工具

文章目录 一、Valgrind 介绍二、Valgrind 功能和使用1. 主要功能2. 基本用法2.1 常用选项2.2 内存泄漏检测2.3 详细报告2.4 性能分析2.5 多线程错误检测 三、在 Ubuntu 上安装 Valgrind四、示例1. 检测内存泄漏2. 使用未初始化的内存3. 内存读写越界4. 综合错误 五、工具集1. M…

学习MySQL第七天

夕阳无限好 只是近黄昏 一、子查询 1.1 定义 将一个查询语句嵌套到另一个查询语句内部的查询 我们通过具体示例来进行演示&#xff0c;这一篇博客更侧重于通过具体的小问题来引导大家独立思考&#xff0c;然后熟悉子查询相关的知识点 1.2 问题1 谁的工资比Tom高 方…

Spring启示录、概述、入门程序以及Spring对IoC的实现

一、Spring启示录 阅读以下代码&#xff1a; dao package org.example1.dao;/*** 持久层* className UserDao* since 1.0**/ public interface UserDao {/*** 根据id删除用户信息*/void deleteById(); } package org.example1.dao.impl;import org.example1.dao.UserDao;/**…

电机的了解到调试全方面讲解

一、什么是电机 电机是一种将电能转换为机械能的装置,通常由定子、转子和电磁场组成。 当电流通过电机的绕组时,产生的磁场会与电机中的磁场相互作用,从而使电机产生旋转运动。电机广泛应用于各种机械设备和工业生产中,是现代社会不可或缺的重要设备之一。 常见的电机种…

笔试专题(七)

文章目录 乒乓球筐&#xff08;哈希&#xff09;题解代码 组队竞赛题解代码 删除相邻数字的最大分数&#xff08;线性dp&#xff09;题解代码 乒乓球筐&#xff08;哈希&#xff09; 题目链接 题解 1. 两个哈希表 先统计第一个字符串中的字符个数&#xff0c;再统计第二个字…

【嵌入式学习3】UDP发送端、接收端

目录 1、发送端 2、接收端 3、UDP广播 1、发送端 from socket import *udp_socket socket(AF_INET,SOCK_DGRAM) udp_socket.bind(("127.0.0.1",3333))data_str "UDP发送端数据" data_bytes data_str.encode("utf-8") udp_socket.sendto(d…

Linux 系统 SVN 源码安装与配置全流程指南

Linux系统SVN源码安装与配置全流程指南 一、环境准备 系统要求 CentOS 7及以上版本需安装GCC编译工具链 依赖项 APR/APR-UTIL&#xff08;Apache可移植运行库&#xff09;SQLite&#xff08;嵌入式数据库&#xff09;zlib&#xff08;数据压缩库&#xff09; 二、下载及安装…

Redis 的五种数据类型面试回答

这里简单介绍一下面试回答、我之前有详细的去学习、但是一直都觉得太多内容了、太深入了 然后面试的时候不知道从哪里讲起、于是我写了这篇CSDN帮助大家面试回答、具体的深入解析下次再说 面试官你好 我来介绍一下Redis的五种基本数据类型 有String List Set ZSet Map 五种基…

关于类模板STL中vector容器的运用和智能指针的实现

代码题&#xff1a;使用vector实现一个简单的本地注册登录系统 注册&#xff1a;将账号密码存入vector里面&#xff0c;注意防重复判断 登录&#xff1a;判断登录的账号密码是否正确 #include <iostream> #include <cstring> #include <cstdlib> #in…

Opencv计算机视觉编程攻略-第十一节 三维重建

此处重点讨论在特定条件下&#xff0c;重建场景的三维结构和相机的三维姿态的一些应用实现。下面是完整投影公式最通用的表示方式。 在上述公式中&#xff0c;可以了解到&#xff0c;真实物体转为平面之后&#xff0c;s系数丢失了&#xff0c;因而无法会的三维坐标&#xff0c;…

git修改已经push的commit的message

1.修改信息 2.修改message 3.强推

2026考研数学张宇武忠祥复习视频课,高数基础班+讲义PDF

2026考研数学武忠祥老师课&#xff08;网盘&#xff09;&#xff1a;点击下方链接 2026考研数学武忠祥网课&#xff08;最新网盘&#xff09; 一、基础阶段&#xff08;3-5个月&#xff09; 目标&#xff1a;搭建知识框架掌握基础题型 教材使用&#xff1a; 高数&#xff1a;…

C++使用Qt Charts可视化大规模点集

引言 数据可视化是数据分析和决策过程中的重要环节。随着数据量的不断增长&#xff0c;如何高效地可视化大规模数据集成为了一个挑战。Qt Charts 提供了一个强大的工具集&#xff0c;用于创建直观的数据可视化图表。本文将探讨如何使用 C 和 Qt Charts 可视化大规模点集&#…

质检LIMS系统在生态修复企业的实践 生态修复行业的质量管控难题

一、生态修复行业的质量管控新命题 在生态文明建设的大背景下&#xff0c;生态修复企业面临着复杂的环境治理挑战。土壤改良、水体净化、植被恢复等工程&#xff0c;均需以精准的实验数据支撑决策。传统实验室管理模式存在数据孤岛、流程非标、合规风险高等痛点&#xff0c;而…

Spring Cloud之服务入口Gateway之Route Predicate Factories

目录 Route Predicate Factories Predicate 实现Predicate接口 测试运行 Predicate的其它实现方法 匿名内部类 lambda表达式 Predicate的其它方法 源码详解 代码示例 Route Predicate Factories The After Route Predicate Factory The Before Route Predicate Fac…

《AI大模型应知应会100篇》第6篇:预训练与微调:大模型的两阶段学习方式

第6篇&#xff1a;预训练与微调&#xff1a;大模型的两阶段学习方式 摘要 近年来&#xff0c;深度学习领域的一个重要范式转变是“预训练-微调”&#xff08;Pretrain-Finetune&#xff09;的学习方式。这种两阶段方法不仅显著提升了模型性能&#xff0c;还降低了特定任务对大…

java后端对时间进行格式处理

时间格式处理 通过java后端&#xff0c;使用jackson库的注解JsonFormat(pattern "yyyy-MM-dd HH:mm:ss")进行格式化 package com.weiyu.pojo;import com.fasterxml.jackson.annotation.JsonFormat; import lombok.AllArgsConstructor; import lombok.Data; import …