kafka生产者事务踩坑记录

news2025/1/10 15:59:56

1. 背景

公司需要迁移一个老 spark 项目,之前是消费阿里 LogStore 中的实时数据,处理之后将结果落库。使用的是 spark streaming,batch 时间为 2 分钟。迁移后,需要将 LogStore 切换为 kafka,涉及到了对代码的改动。公司的 kafka 上游生产者发送数据,由于细节的设计需要,生产者开启了事务,以确保数据准且一次的写入 kafka。

2. 遇到的问题

新项目重构完成之后进行上线,线上运行时发现,每批次数据处理,连接 kafka 代码,每次都要执行很长很长时间,而且经常执行超时,然后数据处理也停止了,具体截图如下:

image-20230218195738669

该项目中需要读取 kafka 中三个主题的数据,所以每批次数据处理生成的任务中,都会执行一次 createDirectStream 代码。

从上图中可以看到,连接 kafka 的代码处,执行了非常长的时间,甚至有时候会超时失败,然后任务会一直被卡主,不处理数据。

3. 排查步骤

刚开始,我以为是默认的连接 kafka 之类的超时时间太小了,然后就不断的调大各种超时时间,包括会话超时时间、请求超时时间、拉取数据超时时间、心跳时间等,但是最后还是不管用。即使是调整到了 10 分钟这个级别,任务依然会卡主,而且也不会因为发生了什么报错而停止,很让人摸不着头脑。

3.1. 查看spark日志

一直排查不到问题,而且任务改造花费了很长很长的时间,领导也来帮忙了(不是大数据方向的,但是一直在做后端,经验很足)。领导让我仔细的看看日志,一行一行的看,不忽略任何错误和警告日志,甚至是 info 级别的日志也仔细的一行一行看,最后发现了这行日志:

WARN consumer.ConsumerConfig: The configuration 
'What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server 
(e.g. because that data has been deleted): 
<ul><li>earliest: automatically reset the offset to the earliest offset<li>
latest: automatically reset the offset to the latest offset</li><li>n
one: throw exception to the consumer if no previous offset is found for the consumer's group</li><li>
anything else: throw exception to the consumer.</li></ul>' was supplied but isn't a known config.

注意日志中的这个信息:What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted),意思是说:如果在 kafka 中不存在指定的初始化偏移量,或者是当前偏移量不存在(比如数据被删除),该怎么做

后面的意思就是读取 kafka 中数据时,我们设置的初始偏移量,有 earliestlatestnone,大家一般都知道前两种是啥意思,可能很少注意最后这个 none,初始偏移量被设置为 none 时,消费者组如果找不到之前的偏移量,则会抛出异常,这个抛异常很关键。如果由于设置为 none,并且找不到之前的偏移量了,就会由于抛出的异常,而卡住咱们的消费程序。

之后在 idea 中调试运行程序,又发现了这些警告信息:

image-20230218201253824

注意第二行警告信息:**KafkaUtils: overriding auto.offset.reset to none for executor**,这是在说,spark 提供的连接 kafka 的工具类 KafkaUtils ,会覆盖一些咱们手动配置的信息。

即使我代码中手动设置了连接 kafka 的参数 auto.offset.reset,不管设置为什么,他都会覆盖为 none。而通过上面的日志信息可以知道,如果初始偏移量设置为 none,可能是会发生错误的。但还有另外一个问题,kafka 中的数据,是很少会丢失的,怎么会发生这个错误呢。

3.2. 排查kafka自身

通过 offset explorer(前身是 kafka tool,可以直接在软件界面读取到 kafka 中的主题数据)查看程序消费的 kafka 中的数据,发现数据是没啥问题的,一直在进来新数据。然后点开主题,直接查看主题一个分区内的数据,结果发现了问题,如下图:

image-20230218202155818

kafka 主题单个分区内的 offset 值是不连续的,而且都是双数。通过网络查询得知,造成这种 offset 值不连续的一个原因,就是上游生产者开启了事务,然后每条数据对应的事务都是占用一个 offset 值,所以真正的数据的 offset 值都会隔一个进行占用。

然后再次查看线上运行时的 executor 日志,然后发现了类似 set offset 1000 to 1002 日志,然后我们去 offset explorer 中查看具体 offset 值附近的数据,发现这个数据到达 kafka 中的时间,并不是打印日志的时间,而是延后了 2 分钟。我们经过思考,觉得有可能是事务造成的影响。spark 去 kafka 中查找到了最新的 offset 值,然后将最新 offset 值作为本批次读取数据的结束 offset 值,之后就开始读取数据,但是从 kafka 中却找不到 offset 值为 1002 的数据,然后就根据设置的 aotu.offset.reset 来重新初始化 offset 值。而由于 spark 将该配置强制设置为了 none ,这是消费者就抛出了异常,此时整个任务就卡住了。

知道了该现象是由于生产者开启事务造成的,我们就让后端同事重新上线了去掉了事务的生产者代码,之后就再没报错了。

4. 解决方案

4.1. 取消生产者事务

由于生产者开启了事务,所有数据真正写入 kafka,并且能让消费者看到的时间,必定会有一定的延迟。消费者直接获取最新 offset 值,获取到的是主题分区的 LEO(日志末端位移)值,但是这个 offset 对应的数据,默认是不能被消费者马上看到的,必须在生产者提交了事务之后才能看到。如果上游生产者由于某些原因,最后回滚了事务,那这个 offset 值对应的数据,就永远看不到了。所以最简单的方法就是取消生产者事务。

4.2. 设置消费者隔离级别

另外,我们还在官网看到了另外一个参数:isolation.level,这个参数含义如下:

控制如何读取以事务方式写入的消息。如果设置为 read_committed, consumer.poll()将只返回已提交的事务消息。如果设置为 read_uncommitted (默认值),consumer.poll() 将返回所有消息,甚至包括已经中止的事务性消息。在任何一种模式下,非事务性消息都将无条件返回。
消息总是按偏移量顺序返回。因此,在 read_committed 模式下,consumer.poll() 将只返回最后一个稳定偏移量(LSO)之前的消息,LSO小于第一个打开事务的偏移量。特别是,在属于正在进行的交易的消息之后出现的任何消息将被扣留,直到相关交易完成。因此,read_committed 消费者在事务运行中无法读取到高水位。
此外,在 read_committed 中,seekToEnd 方法将返回 LSO

默认为读未提交,如果事务中断,或者是事务还未提交,但是消费者开始读取最新的数据了,此时这个数据是还不能被消费者看到的,消费者此时拿不到数据,就会根据设置的 auto.offset.reset 值重新初始化偏移量。

所以,保险起见,如果生产者开启了事务,或者是不确定生产者是否开启了事务,都将消费者的 isolation.level 值设置为 read-committed,以防消费者去读取还未提交事务的消息,但又读不到而发生错误。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/355557.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

常见的数据结构

栈&#xff08;stack&#xff09; 栈&#xff08; stack&#xff09;是限制插入和删除只能在一个位置上进行的表&#xff0c;该位置是表的末端&#xff0c;叫做栈顶&#xff08;top&#xff09;。它是后进先出&#xff08;LIFO&#xff09;的。对栈的基本操作只有 push&#x…

linux高级命令之线程

线程学习目标能够知道线程的作用1. 线程的介绍在Python中&#xff0c;想要实现多任务除了使用进程&#xff0c;还可以使用线程来完成&#xff0c;线程是实现多任务的另外一种方式。2. 线程的概念线程是进程中执行代码的一个分支&#xff0c;每个执行分支&#xff08;线程&#…

macos 下载 macOS 系统安装程序及安装U盘制作方法

01 下载 macOS 系统安装程序的方法 本文来自: https://discussionschinese.apple.com/docs/DOC-250004259 简介 Mac 用户时不时会需要下载 macOS 的安装程序&#xff0c;目的不同&#xff0c;或者升级或者降级&#xff0c;或者研究或者收藏。为了方便不同用户&#xff0c;除…

设计模式之委派模式与模板模式详解和应用

目录1 委派模式1.1 目标1.2 内容定位1.3 定义1.4 委派模式的应用场景1.5 委派模式在业务场景中的应用1.6 委派模式在源码中的体现1.6.1 双亲委派模型1.6.2 常用代理执行方法 invoke1.6.3 Spring loC中 在调用 doRegisterBeanDefinitions()1.6.4 SpringMVC 的DispatcherServlet1…

python基于vue微信小程序的校园闲置二手跳蚤商城的设计与实现

在当今社会的高速发展过程中,产生的劳动力越来越大,提高人们的生活水平和质量,尤其计算机科技的进步,数据和信息以人兴化为本的目的,给人们提供优质的服务,其中网上购买二手商品尤其突出,使我们的购物方式发生巨大的改变。而线上购物,不仅需要在硬件上为人们提供服务网上购物,而…

尚医通 (十七)手机登录

目录一、登录需求分析二、搭建service-user模块三、登录接口实现1、添加service接口与实现2、添加Mapper接口3、添加Controller方法四、手机验证码登录&#xff08;生成token&#xff09;1、使用JWT进行跨域身份验证1.1 传统用户身份验证1.2 解决方案2、JWT介绍3、整合JWT4、单…

Minecraft服务端配置

✨✨前言 ✨✨ 我的世界大家肯定都不陌生&#xff0c;在网易拿下中国区的代理后&#xff0c;很多小伙伴也是都转向了网易版我的世界&#xff0c;网易版我的世界可以说已经做是的十分全面了&#xff0c;使用起来也十分方便&#xff0c;一部分小伙伴也是看重了网易庞大的玩家数量…

使用uniapp创建小程序和H5界面

uniapp的介绍可以看官网&#xff0c;接下来我们使用uniapp创建小程序和H5界面&#xff0c;其他小程序也是可以的&#xff0c;只演示创建这2个&#xff0c;其实都是一套代码&#xff0c;只是生成的方式不一样而已。 uni-app官网 1.打开HBuilder X 选择如图所示&#xff0c;下…

1. Unity的下载与安装

1. 下载 Unity Hub: unity hub是unity编辑器的一个管理工具&#xff0c;负责平时的unity项目创建和管理&#xff0c;以及unity编辑器的安装等 首先在unity官网网址链接&#xff0c;点击左下角的DownLoad Unity图标&#xff0c;如下图&#xff1a; 进入下一个页面&#xff0c;…

LinkedHashMap实现LRU算法

目录LRU 简介LinkedHashMap的使用手写LRU缓存淘汰算法LRU 简介 LRU 是 Least Recently Used 的缩写&#xff0c;这种算法认为最近使用的数据是热门数据&#xff0c;下一次很大概率将会再次被使用。而最近很少被使用的数据&#xff0c;很大概率下一次不再用到。当缓存容量的满时…

show profile和trance分析SQL

目录 一.show profile分析SQL 二.trance分析优化器执行计划 一.show profile分析SQL Mysql从5.0.37版本开始增加了对show profiles和show profile语句的支持。show profiles能够在做SQL优化时帮助我们了解时间都耗费到哪里去了。。 通过have_profiling参数&#xff0c;能够…

J东滑块分析

内容仅供参考学习 欢迎朋友们V一起交流&#xff1a; zcxl7_7 目标 网址&#xff1a;案例地址 J东登录页面会有滑块&#xff0c;直接用来研究 分析 模拟一次触发滑块验证请求(如图) 有2个重要请求&#xff0c;一个是g.html&#xff0c;一个是s.html。其中很明确的就是g是获…

【100个 Unity实用技能】 | Unity 通过自定义菜单将资源导出

Unity 小科普 老规矩&#xff0c;先介绍一下 Unity 的科普小知识&#xff1a; Unity是 实时3D互动内容创作和运营平台 。包括游戏开发、美术、建筑、汽车设计、影视在内的所有创作者&#xff0c;借助 Unity 将创意变成现实。Unity 平台提供一整套完善的软件解决方案&#xff…

C++——二叉树的前序遍历||中序遍历||后序遍历 非递归算法

目录二叉树的前序遍历&#xff0c;非递归迭代实现二叉树的中序遍历 &#xff0c;非递归迭代实现二叉树的后序遍历 &#xff0c;非递归迭代实现二叉树的前序遍历&#xff0c;非递归迭代实现 题目链接 思路&#xff1a; 将任何一颗树分成两个部分&#xff0c;一部分是左路节点&a…

用Three.js搭建的一个艺术场景

本文翻译自于Medium&#xff0c;原作者用 Three.js 创建了一个“Synthwave 场景”&#xff0c;效果还不错&#xff0c;在此加上自己的理解&#xff0c;记录一下。在线Demo. 地形构建 作者想要搭建一个中间平坦、两侧有凹凸山脉效果并且能够一直绵延不断的地形&#xff0c;接下…

Quartz组件任务调度管理

Quartz什么是Quartzquartz:石英钟的意思是一个当今市面上流行的高效的任务调度管理工具所谓"调度"就是制定好的什么时间做什么事情的计划由OpenSymphony开源组织开发Symphony:交响乐是java编写的,我们使用时需要导入依赖即可为什么需要Quartz所谓"调度"就是…

18:CTK 总结篇(FAQ)

作者: 一去、二三里 个人微信号: iwaleon 微信公众号: 高效程序员 经过了几个月的艰苦奋战,终于到了最后一节啦,是不是和我一样,心里有点儿小激动! 回顾之前的章节,从初级 -> 进阶 -> 高级,我们针对 CTK 做了详细的分类讲解。希望通过这些知识,大家能对模块化…

管理会计报告和财务报告的区别

财务会计报告是给投资人看的&#xff0c;可以反映公司总体的盈利能力。不过&#xff0c;我们回顾一下前面“第一天”里面提到的问题。如果你是公司的产品经理&#xff0c;目前有三个产品在你的管辖范围内。上级给你一笔新的资金&#xff0c;这笔资金应该投到哪个产品上&#xf…

c++容器

1、vector容器 1.1性质 a&#xff09;该容器的数据结构和数组相似&#xff0c;被称为单端数组。 b&#xff09;在存储数据时不是在原有空间上往后拓展&#xff0c;而是找到一个新的空间&#xff0c;将原数据深拷贝到新空间&#xff0c;释放原空间。该过程被称为动态拓展。 vec…

什么是猜疑心理?小猫测试网科普小作文

什么是猜疑心理&#xff1f;猜疑心理是说一个人心中想法偏离了客观事实&#xff0c;牵强附会&#xff0c;往往是指不好的一面&#xff0c;对别人的一言一行都充满了不良的解读&#xff0c;认为这些对自己都有针对性&#xff0c;目的性&#xff0c;对自己都是不利的。猜疑心理重…