从Flink的Kafka消费者看算子联合列表状态的使用

news2025/1/17 3:06:02

背景

算子的联合列表状态是平时使用的比较少的一种状态,本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态

算子联合列表状态

首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况
在这里插入图片描述
算子联合列表状态主要由这两个方法处理:
1初始化方法

public final void initializeState(FunctionInitializationContext context) throws Exception {

        OperatorStateStore stateStore = context.getOperatorStateStore();
		// 在初始化方法中获取联合列表状态
        this.unionOffsetStates =
                stateStore.getUnionListState(
                        new ListStateDescriptor<>(
                                OFFSETS_STATE_NAME,
                                createStateSerializer(getRuntimeContext().getExecutionConfig())));

        if (context.isRestored()) {
            restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
// 把联合列表状态的数据都恢复成类的本地变量中
            // populate actual holder for restored state
            for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
                restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
            }

            LOG.info(
                    "Consumer subtask {} restored state: {}.",
                    getRuntimeContext().getIndexOfThisSubtask(),
                    restoredState);
        } else {
            LOG.info(
                    "Consumer subtask {} has no restore state.",
                    getRuntimeContext().getIndexOfThisSubtask());
        }
    }

2.开始通知检查点开始的方法:

public final void snapshotState(FunctionSnapshotContext context) throws Exception {
        if (!running) {
            LOG.debug("snapshotState() called on closed source");
        } else {
            unionOffsetStates.clear();

            final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
            if (fetcher == null) {
                // the fetcher has not yet been initialized, which means we need to return the
                // originally restored offsets or the assigned partitions
                for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :
                        subscribedPartitionsToStartOffsets.entrySet()) {
                        // 进行checkpoint时,把数据保存到联合列表状态中进行保存
                    unionOffsetStates.add(
                            Tuple2.of(
                                    subscribedPartition.getKey(), subscribedPartition.getValue()));
                }

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call
                    // can happen
                    // on this function at a time: either snapshotState() or
                    // notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
                }
            } else {
                HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call
                    // can happen
                    // on this function at a time: either snapshotState() or
                    // notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
                }

                for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :
                        currentOffsets.entrySet()) {
                    unionOffsetStates.add(
                            Tuple2.of(
                                    kafkaTopicPartitionLongEntry.getKey(),
                                    kafkaTopicPartitionLongEntry.getValue()));
                }
            }

            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                // truncate the map of pending offsets to commit, to prevent infinite growth
                while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
                    pendingOffsetsToCommit.remove(0);
                }
            }
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1096191.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Django 访问静态文件的APP staticfiles

Django 框架默认带的 APP&#xff1a; django.contrib.staticfiles Django文档中也写明了&#xff1a;如何管理静态文件&#xff08;如图片、JavaScript、CSS&#xff09; |姜戈 文档 |姜戈 (djangoproject.com)https://docs.djangoproject.com/zh-hans/4.2/howto/static-file…

k8s-14 存储之volumes

Volumes配置管理 容器中的文件在磁盘上是临时存放的&#xff0c;这给容器中运行的特殊应用程序带来一些问题。首先&#xff0c;当容器崩溃时&#xff0c;kubelet 将重新启动容器&#xff0c;容器中的文件将会丢失因为容器会以干净的状态重建。其次&#xff0c;当在一个 Pod 中…

k8s-10 cni 网络

k8s通过CNI接口接入其他网络插件来实现网络通讯。目前比较流行的插件有flannel,calico等。 CNI插件存放位置: # cat /etc/cni/net.d/10-flannel.conflist 插件使用的解决方案如下: 虚拟网桥&#xff0c;虚拟网卡&#xff0c;多个容器共用一个虚拟网卡进行通信。多路复用: Mac…

自定义安装Redhat8.6镜像:

目录 一、创建虚拟机 二、选择需要安装的镜像 三、选择正确的操作系统和版本 四、更改虚拟机名称和位置 五、配置处理器和内核数量以及内存 配置规则&#xff1a; 六、网络类型、I/O控制类型、磁盘类型使用推荐 即可 网络类型&#xff1a; I/O控制类型: 磁盘类型: 七…

CCF CSP认证 历年题目自练Day32

题目一 试题编号&#xff1a; 202209-1 试题名称&#xff1a; 如此编码 时间限制&#xff1a; 1.0s 内存限制&#xff1a; 512.0MB 问题描述&#xff1a; 题目背景 某次测验后&#xff0c;顿顿老师在黑板上留下了一串数字 23333 便飘然而去。凝望着这个神秘数字&#xff0c;小…

TCP发送接口(如send(),write()等)的返回值与成功发送到接收端的数据量无直接关系

1. TCP发送接口&#xff1a;send() TCP发送数据的接口有send&#xff0c;write&#xff0c;sendmsg。在系统内核中这些函数有一个统一的入口&#xff0c;即sock_sendmsg()。由于TCP是可靠传输&#xff0c;所以对TCP的发送接口很容易产生误解&#xff0c;比如sn send(...); 错误…

如何从 Pod 内访问 Kubernetes 集群的 API

Kubernetes API 是您检查和管理集群操作的途径。您可以使用Kubectl CLI、工具(例如curl)或流行编程语言的官方集成库来使用 API 。 该 API 也可供集群内的应用程序使用。Kubernetes Pod 会自动获得对 API 的访问权限,并且可以使用提供的服务帐户进行身份验证。您可以通过使…

06-React的路由

06-React的路由 1.相关理解 1).SPA的理解 单页Web应用&#xff08;single page web application&#xff0c;SPA&#xff09;。整个应用只有一个完整的页面。点击页面中的链接不会刷新页面&#xff0c;只会做页面的局部更新。数据都需要通过ajax请求获取, 并在前端异步展现。…

2018-2019 ACM-ICPC, Asia Nanjing Regional Contest G. Pyramid(组合数学 计数)

题目 t(t<1e6)组样例&#xff0c;每次给定一个n(n<1e9)&#xff0c;统计边长为n的上述三角形的等边三角形个数 其中等边三角形的三个顶点&#xff0c;可以在所有黑色三角形&白色三角形的顶点中任取&#xff0c; 答案对1e97取模 思路来源 申老师 & oeis A0003…

第 367 场 LeetCode 周赛题解

A 找出满足差值条件的下标 I 模拟 class Solution { public:vector<int> findIndices(vector<int> &nums, int indexDifference, int valueDifference) {int n nums.size();for (int i 0; i < n; i)for (int j 0; j < i; j)if (i - j > indexDiffe…

【探索Linux】—— 强大的命令行工具 P.11(基础IO,文件操作)

阅读导航 前言一、C语言的文件操作二、C的文件操作三、Linux系统文件操作&#xff08;I/O接口&#xff09;1. open()⭕传入多个打开方式&#xff08;按位或操作将不同的标志位组合在一起&#xff09; 2. write()3. read()4. close()5. lseek() 温馨提示 前言 前面我们讲了C语言…

vue3后台管理框架之axios二次封装

在开发项目的时候避免不了与后端进行交互,因此我们需要使用axios插件实现发送网络请求。在开发项目的时候 我们经常会把axios进行二次封装。 目的: 1:使用请求拦截器,可以在请求拦截器中处理一些业务(开始进度条、请求头携带公共参数) 2:使用响应拦截器,可以在响应拦截器…

Hadoop3教程(十二):MapReduce中Shuffle机制的概述

文章目录 &#xff08;95&#xff09; Shuffle机制什么是shuffle&#xff1f;Map阶段Reduce阶段 参考文献 &#xff08;95&#xff09; Shuffle机制 面试的重点 什么是shuffle&#xff1f; Map方法之后&#xff0c;Reduce方法之前的这段数据处理过程&#xff0c;就叫做shuff…

华为ICT——云计算基础知识、计算类技术听课笔记

ICT(information and communications technology):信息与通信技术 传统IT架构缺点 TCO&#xff1a;总体拥有成本 云计算模式 云计算价值 云计算通用点 虚拟化技术&#xff1a;将单台物理服务器虚拟为多台虚拟机使用&#xff0c;多台虚拟机共享物理服务器硬件资源。 虚拟化本质…

使用PM2部署spring-boot项目

一、打包应用 1、先清理之前的 2、修改pom.xml文件关于项目名称版本及jdk版本 3、在idea中直接打包项目 4、等打包完成后会在target文件夹下有一个xx.jar的文件,拷贝出来放到一个文件夹下&#xff0c;或者服务器指定目录下 二、使用pm2部署.jar文件 1、pm2的安装,参考地…

Linux性能优化--性能工具:特定进程CPU

4.0 概述 在用系统级性能工具找出是哪个进程降低了系统速度之后&#xff0c;你需要用特定进程性能工具来发现这个进程的行为。对此&#xff0c;Linux提供了丰富的工具用于追踪一个进程和应用程序性能的重要统计信息。 阅读本章后&#xff0c;你将能够&#xff1a; 确定应用程…

nodejs+vue水浒鉴赏平台系统

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性&#xff1a;…

unity脚本的生命周期

方法名称调用时间Awake最早调用&#xff0c;所以一般可以在此实现单例模式OnEnable组件激活后调用&#xff0c;在Awake后会调用一次Start在Update之前调用一次&#xff0c;在OnEnable之后调用&#xff0c;可以在此设置一些初始值FixedUpdate固定频率调用方法&#xff0c;每次调…

多模态大模型NextGPT整体结构图、模型示意图和使用模型时示意图

NextGPT模型整体结构 项目地址 论文地址 模型示意图 使用模型时示意图