谈谈Flink消费kafka的偏移量

news2025/1/16 1:39:33

offset配置:

flinkKafkaConsumer.setStartFromEarliest():从topic的最早offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。
flinkKafkaConsumer.setStartFromLatest():从topic的最新offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。
flinkKafkaConsumer.setStartFromTimestamp(…):从指定的时间戳(毫秒)开始消费数据,Kafka中每个分区中数据大于等于设置的时间戳的数据位置将被当做开始消费的位置。如果kafka中保存有消费者组的消费位置将被忽略。
flinkKafkaConsumer.setStartFromGroupOffsets():默认的设置。根据代码中设置的group.id设置的消费者组,去kafka中或者zookeeper中找到对应的消费者offset位置消费数据。如果没有找到对应的消费者组的位置,那么将按照auto.offset.reset设置的策略读取offset。

消费者offset提交配置:

配置offset的提交方式取决于是否为job设置开启checkpoint。可以使用env.enableCheckpointing(5000)来设置开启checkpoint。5000单位毫秒,代表每5秒进行依次checkpoint

关闭checkpoint:如何禁用了checkpoint,那么offset位置的提交取决于Flink读取kafka客户端的配置,enable.auto.commit ( auto.commit.enable【Kafka 0.8】)配置是否开启自动提交offset, auto.commit.interval.ms决定自动提交offset的周期。
开启checkpoint:如果开启了checkpoint,那么当checkpoint保存状态完成后,将checkpoint中保存的offset位置提交到kafka。这样保证了Kafka中保存的offset和checkpoint中保存的offset一致,可以通过配置setCommitOffsetsOnCheckpoints(boolean)来配置是否将checkpoint中的offset提交到kafka中(默认是true)。如果使用这种方式,那么properties中配置的kafka offset自动提交参数enable.auto.commit和周期提交参数auto.commit.interval.ms参数将被忽略。
总结:Flink提供了消费kafka数据的offset如何提交给Kafka或者zookeeper(kafka0.8之前,因为0.8之前offset是维护在zookeeper中的)的配置 ;关闭checkpoint的话,flink消费kafka数据 offset取决于kafka客户端的配置;开启checkpoint的话,flink消费kafka offset由jobmanager中的checkpoint维护,并同步到kafka中保持一置,注意,Flink并不依赖提交给Kafka或者zookeeper中的offset来保证容错。提交的offset只是为了外部来查询监视kafka数据消费的情况。

使用checkpoint + 两阶段提交来保证仅消费一次kafka中的数据:
Flink checkpoint机制: 这种机制是在Flink应用内部实现仅一次处理数据的基础。

当谈及“exactly-once semantics”仅一次处理数据时,指的是每条数据只会影响最终结果一次。Flink可以保证当机器出现故障或者程序出现错误时,也没有重复的数据或者未被处理的数据出现,实现仅一次处理的语义。

checkpoint中包含:

当前应用的状态;
当前消费流数据的位置;
注意:checkpoint机制仅限于Flink架构内部保证仅一次处理数据;

使用两阶段提交协议保证flink连接外部系统数据仅一次处理

Flink checkpoint机制: 这种机制是在Flink应用内部实现仅一次处理数据的基础。

当谈及“exactly-once semantics”仅一次处理数据时,指的是每条数据只会影响最终结果一次。Flink可以保证当机器出现故障或者程序出现错误时,也没有重复的数据或者未被处理的数据出现,实现仅一次处理的语义。

checkpoint中包含:

当前应用的状态;
当前消费流数据的位置;
注意:checkpoint机制仅限于Flink架构内部保证仅一次处理数据;

使用两阶段提交协议保证flink连接外部系统数据仅一次处理;
当Flink处理完的数据需要写入外部系统时,不保证仅一次处理数据。为了提供端到端的仅一次处理数据,在将数据写入外部系统时也要保证仅一次处理数据,这些外部系统必须提供一种手段来允许程序提交或者回滚写入操作,同时还要保证与Flink的checkpoint机制协调使用,在分布式系统中协调提交和回滚的常见方法就是两阶段提交协议。下面给出一个实例了解Flink如何使用两阶段提交协议来实现数据仅一次处理语义。

该实例是从kafka中读取数据,经过处理数据之后将结果再写回kafka。kafka0.11版本之后支持事务,这也是Flink与kafka交互时仅一次处理的必要条件。【注意:当Flink处理完的数据写入kafka时,即当sink为kafka时,自动封装了两阶段提交协议】。Flink支持仅一次处理数据不仅仅限于和Kafka的结合,只要sink提供了必要的两阶段协调实现,可以对任何sink都能实现仅一次处理数据语义。

其原理如下:

上图Flink程序包含以下组件:

一个从kafka中读取数据的source
一个窗口聚合操作
一个将结果写往kafka的sink。
要使sink支持仅一次处理数据语义,必须以事务的方式将数据写往kafka,将两次checkpoint之间的操作当做一个事务提交,确保出现故障时操作能够被回滚。假设出现故障,在分布式多并发执行sink的应用程序中,仅仅执行单次提交或回滚事务是不够的,因为分布式中的各个sink程序都必须对这些提交或者回滚达成共识,这样才能保证两次checkpoint之间的数据得到一个一致性的结果。Flink使用两阶段提交协议(pre-commit+commit)来实现这个问题。

Filnk checkpointing开始时就进入到pre-commit阶段,具体来说,一旦checkpoint开始,Flink的JobManager向输入流中写入一个checkpoint barrier将流中所有消息分隔成属于本次checkpoint的消息以及属于下次checkpoint的消息,barrier也会在操作算子间流转,对于每个operator来说,该barrier会触发operator的State Backend来为当前的operator来打快照。如下图示:

Flink DataSource中存储着Kafka消费的offset,当完成快照保存后,将chechkpoint barrier传递给下一个operator。这种方式只有在Flink内部状态的场景是可行的,内部状态指的是由Flink的State Backend管理状态,例如上面的window的状态就是内部状态管理。只有当内部状态时,pre-commit阶段无需执行额外的操作,仅仅是写入一些定义好的状态变量即可,checkpoint成功时Flink负责提交这些状态写入,否则就不写入当前状态。

但是,一旦operator操作包含外部状态,事情就不一样了。我们不能像处理内部状态一样处理外部状态,因为外部状态涉及到与外部系统的交互。这种情况下,外部系统必须要支持可以与两阶段提交协议绑定的事务才能保证仅一次处理数据。

本例中的data sink是将数据写往kafka,因为写往kafka是有外部状态的,这种情况下,pre-commit阶段下data sink 在保存状态到State Backend的同时,还必须pre-commit外部的事务。如下图:

当checkpoint barrier在所有的operator都传递一遍切对应的快照都成功完成之后,pre-commit阶段才算完成。这个过程中所有创建的快照都被视为checkpoint的一部分,checkpoint中保存着整个应用的全局状态,当然也包含pre-commit阶段提交的外部状态。当程序出现崩溃时,我们可以回滚状态到最新已经完成快照的时间点。

下一步就是通知所有的operator,告诉它们checkpoint已经完成,这便是两阶段提交的第二个阶段:commit阶段。这个阶段中JobManager会为应用中的每个operator发起checkpoint已经完成的回调逻辑。本例中,DataSource和Winow操作都没有外部状态,因此在该阶段,这两个operator无需执行任何逻辑,但是Data Sink是有外部状态的,因此此时我们需要提交外部事务。如下图示:

汇总以上信息,总结得出:

一旦所有的operator完成各自的pre-commit,他们会发起一个commit操作。
如果一个operator的pre-commit失败,所有其他的operator 的pre-commit必须被终止,并且Flink会回滚到最近成功完成的checkpoint位置。
一旦pre-commit完成,必须要确保commit也要成功,内部的operator和外部的系统都要对此进行保证。假设commit失败【网络故障原因】,Flink程序就会崩溃,然后根据用户重启策略执行重启逻辑,重启之后会再次commit。
因此,所有的operator必须对checkpoint最终结果达成共识,即所有的operator都必须认定数据提交要么成功执行,要么被终止然后回滚。

参考:https://blog.51cto.com/u_16213620/7923389

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1881840.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Golang | Leetcode Golang题解之第200题岛屿数量

题目&#xff1a; 题解&#xff1a; func numIslands(grid [][]byte) int {res : 0for i : 0; i < len(grid); i {for j : 0; j < len(grid[i]); j {if grid[i][j] 1 {resdfs(grid, i, j)}}}return res }func dfs(grid [][]byte, r, c int) {h, w : len(grid), len(gri…

C++感受12-Hello Object 派生版

不变的功能&#xff0c;希望直接复用原有代码&#xff1b;变化的功能&#xff0c;希望在分开的代码里实现。 派生的基本概念和目的如何定义派生类以及创建派生对象派生对象的生死过程 0. 课堂视频 ff14-HelloObject-派生版 1. 派生的基本概念与目的 编程&#xff0c;或者说软…

Games101学习笔记 Lecture 14: Ray Tracing 2 (Acceleration Radiometry)

Lecture 14: Ray Tracing 2 (Acceleration & Radiometry 一、加速光线追踪 AABB1.均匀网格 Uniform Spatial Partitions (Grids)①前处理-构建加速网格②射线与场景相交③网格分辨率④适用情况 2.空间划分KD-Tree①预处理②数据结构③遍历④问题 3.对象划分 & 包围盒层…

使用Python绘制极坐标图

使用Python绘制极坐标图 极坐标图极坐标图的优点使用场景 效果代码 极坐标图 极坐标图&#xff08;Polar Chart&#xff09;是一种图表类型&#xff0c;用于显示在极坐标系中的数据。极坐标图使用圆形坐标系&#xff0c;角度表示一个变量的值&#xff0c;半径表示另一个变量的…

森马基于MaxCompute+Hologres+DataWorks构建数据中台

讲师&#xff1a;晋银龙 浙江森马数仓高级经理 本次案例主要分享森马集团面对多年自建的多套数仓产品体系&#xff0c;通过阿里云MaxComputeHologresDataWorks统一数仓平台&#xff0c;保障数据生产稳定性与数据质量&#xff0c;减少ETL链路及计算时间&#xff0c;每年数仓整体…

Vue中的axios深度探索:从基础安装到高级功能应用的全面指南

文章目录 前言一、axios 请求1. axios的概念2. axios的安装3. axiso请求方式介绍4. axios请求本地数据5. axios跨域6. axios全局注册7. axios支持的请求类型1&#xff09;get请求2&#xff09;post请求3&#xff09;put请求4&#xff09;patch请求5&#xff09;delete请求 二、…

K8s的基本使用和认识

目录 介绍 控制端 Node(节点) 控制端与节点的关系图 基本使用 创建和运行资源 查找和参看资源 修改和删除资源 介绍 控制端 api-server(api)是集群的核心是k8s中最重要的组件,因为它是实现声明式api的关键 kubernetes api-server的核心功能是提供了Kubernetes各类资…

基于FreeRTOS+STM32CubeMX+LCD1602+MCP3008(SPI接口)的ADC转换器Proteus仿真

一、仿真原理图: 二、运行效果: 三、STM32CubeMX配置: 1)、RCC配置: 2)、SPI配置: 四、部分代码: 1)、主函数: /* USER CODE BEGIN Header */ /** ****************************************************************************** * @file : main…

mysql8.0.19安装zip版本

下载地址https://downloads.mysql.com/archives/community/ 下载版本 下载后解压&#xff0c;不包括data 和my.ini文件。其中data 文件是自动生成的【mysqld --initialize --console】&#xff0c;my.ini需要自己编写设置。 新建my.ini文件 需要自己设置 basedirG:\soft\mysql…

华为DCN网络之:VXLAN

VXLAN RFC定义了VLAN扩展方案VXLAN&#xff08;Virtual eXtensible Local Area Network&#xff0c;虚拟扩展局域网&#xff09;。VXLAN采用MAC in UDP封装方式&#xff0c;是NVO3&#xff08;Network Virtualization over Layer 3&#xff09;中的一种网络虚拟化技术。 VXLAN…

android studio 添加aar包

按着以前旧的导包方式栽了大跟头&#xff0c;后面在留老板的的博客下找到了解决办法&#xff0c;记录一下。 Andriod Studio 导入aar最新的方式_gradle 8 引入arr-CSDN博客 最新导包方式 1.在新建libs目录&#xff0c;在app/libs目录下导入aar包&#xff08;其实就是拷贝过去…

hive4 从入门到精通

查询hive 架构 准备 HDFS配置 vim $HADOOP_HOME/etc/hadoop/core-site.xml <!--配置所有节点的root用户都可作为代理用户--><property><name>hadoop.proxyuser.root.hosts</name><value>*</value></property><!--配置root用户…

ETH巨鲸鱼异动和最大BTC爆仓

小编使用币界网地址监控工具&#xff0c;跟踪了在一次令人震惊的链上巨鲸地址异动&#xff0c;一个新创建的地址在过去四个小时内从 Coinbase 提取了价值 2121 万美元的 ETH。总共积累了 6,215 个 ETH&#xff0c;平均购买价格为每 ETH 3,414 美元。如此大规模的提款暗示着周一…

JVM原理(八):JVM虚拟机工具之基础故障工具

这里主要介绍监视虚拟机运行状态和进行故障处理的工具 1. jsp:虚拟机进程状况工具 jsp命令格式&#xff1a; jsp [options] [hostid] jps远程查询虚拟机进程状态 2. jstat:虚拟机统计信息监视工具 jstat命令格式&#xff1a; jstat [option vmid [interval [s|ms] [count]…

【Elasticsearch】Elasticsearch索引创建与管理详解

文章目录 &#x1f4d1;引言一、Elasticsearch 索引的基础概念二、创建索引2.1 使用默认设置创建索引2.2 自定义设置创建索引2.3 创建索引并设置映射 三、索引模板3.1 创建索引模板3.2 使用索引模板创建索引 四、管理索引4.1 查看索引4.2 更新索引设置4.3 删除索引 五、索引别名…

基于VUE3+VITE+SpringBoot+Nginx部署项目之跨域配置等问题

前言&#xff1a;遇到问题&#xff0c;解决问题。 第一部分&#xff1a;VUE 配置 1、vite.config.js 文件 server: {proxy: {/api: {target: env.VITE_BASE_URL,changeOrigin: true,secure: false,rewrite: path > path.replace(/^\/api/, )}}}, 2、.env 文件 VITE_BAS…

Tampermonkey 油猴脚本使用教程

Tampermonkey 油猴脚本使用教程 一、Tampermonkey 油猴脚本简介 Tampermonkey 是一款流行的浏览器扩展&#xff0c;它允许用户通过用户脚本增强网页功能或改变网页的外观。它支持包括 Chrome、Microsoft Edge、Safari、Opera Next 和 Firefox 在内的多种浏览器。Tampermonkey…

搭建LNMP环境实现高并发请求

LNMP和Nginx详细配置 一、LNMP环境平台搭建准备工作源码软件包安装nginxmysqlphp使用nginx连接php 二、web服务器-nginx1.学nginx之前需要了解的知识1.1 同步和异步&#xff08;通知结果的方式&#xff09;1.2 阻塞和非阻塞&#xff08;进线程等待时状态&#xff09;1.3 epoll模…

.NET 漏洞情报 | 某整合管理平台SQL注入

01阅读须知 此文所提供的信息只为网络安全人员对自己所负责的网站、服务器等&#xff08;包括但不限于&#xff09;进行检测或维护参考&#xff0c;未经授权请勿利用文章中的技术资料对任何计算机系统进行入侵操作。利用此文所提供的信息而造成的直接或间接后果和损失&#xf…

Github 2024-07-01开源项目月报 Top15

根据Github Trendings的统计,本月(2024-07-01统计)共有15个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Python项目6JavaScript项目3C++项目2PHP项目1Blade项目1非开发语言项目1C#项目1Lua项目1Go项目1MDX项目1Jupyter Notebook项目1从零开始构建你喜…