kafka:java client使用总结塈seek() VS commitSync()的区别(三)

news2025/1/12 9:54:17

最近一段日子接触了kafka这个消息系统,主要为了我的开源中间件项目simplemq增加kafka支持(基于kafka-client【java】),如今总算完成,本文是对这个过程中对kafka消息系统的使用总结

线程安全

关于线程安全,kafka-client的代码注释有明确说明,

KafkaProducer是线程安全的

The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
– from Java Comment of org.apache.kafka.clients.producer.KafkaProducer

也就是说在工程实践中,KafkaProducer实例可以使用单例模式。不需要为了发送一条消息而频繁创建KafkaProducer实例。

KafkaConsumer不是线程安全的

Multi-threaded Processing
The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application
making the call. It is the responsibility of the user to ensure that multi-threaded access
is properly synchronized. Un-synchronized access will result in {@link ConcurrentModificationException}.
– from Java Comment of org.apache.kafka.clients.consumerKafkaConsumer

在工程实践中,如果希望对订阅的主题单独管理,那么对于订阅的每一个主题(topic)必须创建一个单独的KafkaConsumer实例负责接收消息。并且要注意对KafkaConsumer实例的多数方法也只能在消息接收线程中。

分区

KafkaConsumer.poll()方法返回拉取的消息对象迭代对象(Iterable),迭代元素类型为ConsumerRecord,从ConsumerRecord返回的字段可知包括了key,value,offset,partition,partition即为分区。
也就是说,如果topic有多个分区,那么每次摘取的一批消息可能是来自不同分区的。所以不能想当然认为每一批消息都是一个分区的。
每批次拉取的消息同一个分区的消息的消息偏移值都是连续的。即[33,34,35]这样的连续数字,
不同的分区的偏移值没有相关性

手动提交

创建KafkaConsumer实例时如果不指定enable.auto.commit参数为true,默认KafkaConsumer是自动提交的。
自动提交模式没啥好说的,不会存在重复消费和遗漏消息的问题。
如果要使用手动提交模式,调用方就要自己维护分区的偏移,以确保不会出现重复消费和遗漏消息问题。
本节讲述手动提交模式下,设计需要注意的问题

团进团出

团进团出是旅游行业的一个术语,即要求一个旅行团,整团出发入境时是多少人,返程出境时要一个不少的回来
在这里的意思就是手动提交模式下每次KafkaConsumer.poll()方法每次拉取一批消息(数量不等),处理完消息后,就要对这批消息进行手动提交处理。提交完成后,才能继续拉取下一批消息。不能在上一批消息还没有完成提交的时候,就调用KafkaConsumer.poll()方法拉取下一批消息。

所以如果你的项目中消息处理是异步的,那么一定要同步等待当前这批消息被处理完,才能再次执行KafkaConsumer.poll()方法拉取消息。

前面说过如果主题有多个分区,每批拉取的消息可能是来自不同分区的。
为方便举例,我们以如下格式表示收到的一条消息

0-100-true

消息由-号三段数字字母代表,

  • 第一段数字代表分区,
  • 第二段数字为偏移,
  • 最后的true/false代表该消息是否正确处理并提交确认,
    为true的需要提交,
    false则是因为各种原因处理失败不需要提交,希望下一轮拉取消息继续处理。

完整提交

如下面的分区0,如果一批消息中同一个分区的所有消息都被正确处理需要提交,那么它就是完整提交

[0-100-true,0-101-true,0-103-true]

如下调用 KafkaConsumer.commitSync方法就可以了。

/** 分区完整提交,提交偏移为最后一个偏移+1 */
// 分区0
TopicPartition topicPartition = new TopicPartition(topic_name, 0);
long lastTrueOffset = 103;
/** 提交的偏移指向最后偏移量的下一条记录 */
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(lastTrueOffset+1);
consumer.commitSync(Collections.singletonMap(topicPartition, offsetAndMetadata));

不完整提交

如下面的分区1,如果一批消息中同一个分区的消息有部分消息标记为false不能提交,那么它就是不完整提交。

[1-41-true,1-42-false,1-43-false,1-44-true]

对于不完整提交,我们只能从将第一个false之前的记录下次循环不用再处理,第一个false及之后的消息只能留给下次循环拉取消息再处理。如下使用seek()方法修改分区偏移

/** 
 * 分区不完整提交:
 * 记录本轮第一个标记为false的记录之后所有提交标记为true的偏移 
 * 下一轮拉取消息从第一个标记为false的偏移开始
 */
// 分区1
TopicPartition topicPartition = new TopicPartition(topic_name, 1);
long firstFalseOffset =42;
consumer.seek(topicPartition,firstFalseOffset);

在不完整提交的状态下,下次执行poll()方法拉取的消息中包含上一批消息为标记为true的消息,所以还需要有机制记录上一轮拉取的消息中不完整提交中标记为true的消息,这些消息不需要再被处理,否则就会出现重复消费问题。

重复消费问题

即使如上面所说在程序中有机制记录上次不完整提交中标记为true的消息,在下次循环拉取消息后,对上次已经标记为true的消息不再被重复处理,还是无法完全避免重复消费问题。因为这只是解决当前消费者实例在当前消费循环中的重复消费问题。
在消息循环结束前最后一次拉取消息如果是不完整提交,如果这些不完整提交的数据没有持久化保存,那么在下次创建的消费者实例还是会有已经被确认消费的消息被重复消费的情况。
所以如果要完全解决重复消费问题,需要应用层对不完全提交的消息进行额外处理:

  1. 将确认为false的消息存储到缓冲区或持久化存储中:在处理确认为false的消息时,你可以将这些消息存储到缓冲区或持久化存储中,例如内存队列、数据库或文件系统。这样,下次启动消费者时,可以从缓冲区或存储中加载这些消息,并进行再次处理。
  2. 使用定时任务重新处理消息:你可以设置一个定时任务,定期检查确认为false的消息,并重新进行处理。定时任务可以根据需要从缓冲区或持久化存储中获取这些消息,并重新发送给消费者进行处理。

seek() VS commitSync()

seek()方法和commitSync()方法的作用都是通过更新分区的偏移值,控制拉取消息的位置,但这两个方法肯定是有区别的否则不可能设计两个方法干同样的事儿。

commitAsync()commitSync()方法作用是一样的,区别在于commitAsync()是异步提交
事实上我通过输出日志的方式发现commitAsync()执行结束调用OffsetCommitCallback对象时所在线程与commitAsync()执行在同一线程,也就是说commitAsync()可能也是同步提交

我通过反复的实验,对它们的差别有了初步的判断。但并不太确定。
于是,关于seek()方法和commitSync()方法的区别我问了bito机器人,这是它的回答,证实了我的想法,与我的实验结论是一致的。
在这里插入图片描述

我在机器人回答的基础上再做一些示例补充就是如下完整的说明:
commitSync() 方法管理的是消费者下次启动时获取消息的偏移量。当调用 commitSync() 方法时,消费者会将当前消费的最新偏移量提交给Kafka,并在下次启动时从该偏移量处继续消费。

比如:本次poll拉取了 100,101,102三条消息,commitSync提交偏移101,那么下次一轮执行poll拉取消息会从偏移103开始,此刻如果中止拉取消息,下次再重新启动消费者时拉取偏移为101。

seek() 方法更直接,它会修改当前消费者实例下次循环拉取消息的偏移量。如果你在消费者实例中调用 seek() 方法来设置偏移量,并在之后中止拉取消息,下次再启动消费者实例时,它会从你设置的偏移量处开始拉取消息。

还以上例,本次poll拉取了 100,101,102三条消息,seek修改偏移101,那么下次一轮执行poll拉取消息会从偏移101开始,如果此刻中止拉取消息,下次再重新启动消费者时拉取偏移为100,因为我们没有执行commitSync将偏移量持久化。

因此, commitSync() 方法影响的是下次消费者启动时的偏移量,而 seek() 方法影响的是当前消费者实例下次循环拉取消息的偏移量,并不会影响下次再启动消费者实例时的偏移量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/844679.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

04-2_Qt 5.9 C++开发指南_SpinBox使用

文章目录 1. SpinBox简介2. SpinBox使用2.1 可视化UI设计2.2 widget.h2.3 widget.cpp 1. SpinBox简介 QSpinBox 用于整数的显示和输入,一般显示十进制数,也可以显示二进制、十六进制的数,而且可以在显示框中增加前缀或后缀。 QDoubleSpinBox…

无人车沿着指定线路自动驾驶与远程控制的实践应用

有了前面颜色识别跟踪的基础之后,我们就可以设定颜色路径,让无人车沿着指定线路做自动驾驶了,视频:PID控制无人车自动驾驶 有了前几章的知识铺垫,就比较简单了,也是属于颜色识别的一种应用,主要…

Vue + Cesium快速搭建,全流程(最新总结)

方式一&#xff1a;直接引入&#xff08;最简单&#xff09; 1.安装Cesium&#xff08;Vue搭建可以看我上一期的文章&#xff09; npm i cesium -save2.将node_modules\cesium\Build\Cesium文件夹拷贝到项目的public文件中 3.在public\index.html引入Cesium <!DOCTYPE h…

1466. 重新规划路线

题目描述&#xff1a; 主要思路&#xff1a; 将所有有向边抽象为无向边&#xff0c;将原有的方向权重置为1&#xff0c;其余置为0。 从0开始遍历所有城市&#xff0c;ans权重和。 class Solution { public:vector<vector<int>> a,w;int ans0;bool book[500010];v…

Node.js |(一)Node.js简介及计算机基础 | 尚硅谷2023版Node.js零基础视频教程

学习视频&#xff1a;尚硅谷2023版Node.js零基础视频教程&#xff0c;nodejs新手到高手 文章目录 &#x1f4da;关于Node.js&#x1f407;为什么要学Node.js&#x1f407;Node.js是什么&#x1f407;Node.js的作用&#x1f407;Node.js下载安装&#x1f407;命令行工具&#x1…

【Linux】多路转接 -- poll函数

文章目录 1. poll函数原型2. poll服务器3. poll的优点和确定 1. poll函数原型 poll函数和与我上一篇文章介绍的select函数一样&#xff0c;都是系统提供的多路转接接口&#xff0c;允许进程或线程在同一时间监听多个文件描述符。 本篇文章的一部分内容与上一篇介绍select函数…

Report Sharp-Shooter Lite Edition Crack

Report Sharp-Shooter Lite Edition Crack 报告Sharp Shooter™ 是为.NET Framework设计的&#xff0c;使用C#编写&#xff0c;并且只包含100%的托管代码。Report Sharp Shooter能够从多个数据源生成任何复杂的报告&#xff0c;并将生成的报告导出为大多数格式&#xff0c;包括…

UNIX 入门

与 UNIX 建立连接启动会话登录命令提示符修改口令退出系统 简单的 UNIX 命令命令格式ls 命令who 命令虚拟终端 tty伪终端 ptywho am i 命令 cal 命令help 命令man 命令 shell 概述shell 命令更换 shell临时更改 shell永久更改 shell 登录过程 与 UNIX 建立连接 启动会话 要启…

Java Set集合:HashSet和TreeSet类

Set 集合类似于一个罐子&#xff0c;程序可以依次把多个对象“丢进”Set 集合&#xff0c;而 Set 集合通常不能记住元素的添加顺序。也就是说 Set 集合中的对象不按特定的方式排序&#xff0c;只是简单地把对象加入集合。Set 集合中不能包含重复的对象&#xff0c;并且最多只允…

【无标题】发大水

发大声道TOC 欢迎使用Markdown编辑器 你好&#xff01; 这是你第一次使用 Markdown编辑器 所展示的欢迎页。如果你想学习如何使用Markdown编辑器, 可以仔细阅读这篇文章&#xff0c;了解一下Markdown的基本语法知识。 新的改变 我们对Markdown编辑器进行了一些功能拓展与语…

K8s集群安全机制

1.访问K8s集群的时候&#xff0c;需要经过三个步骤完成具体操作 &#xff08;1&#xff09;认证&#xff08;2&#xff09;鉴权&#xff08;授权&#xff09;&#xff08;3&#xff09;准入控制 进行访问的时候&#xff0c;过程中都要经过apiserver&#xff0c;apiserver做统…

Qt小项目贪吃蛇实线,主要掌握定时器、信号与槽、按键事件、绘制事件、坐标运算、随机数生成等

Qt小项目贪吃蛇实线&#xff0c;主要掌握定时器、信号与槽、按键事件、绘制事件、坐标运算、随机数生成等 Qt 贪吃蛇演示QWidget 绘制界面项目源文件 注释清晰widget.hwidget.cpp 拓展QTimerQKeyEventQRectFQPointFQPainterQIcon Qt 贪吃蛇演示 QWidget 绘制界面 项目源文件 注…

java版直播商城平台规划及常见的营销模式 电商源码/小程序/三级分销+商城免费搭建 bbcbbc

​ Java版工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离 1. 涉及平台 平台管理、商家端&#xff08;PC端、手机端&#xff09;、买家平台&#xff08;H5/公众号、小程序、APP端&#xff08;IOS/Android&#xff09;、微服务平台&#xff08;业务服务…

Unity Sort Group(排序组)

** Unity 中的Sort Group组组件允许让Sprite Renderer(精灵渲染器)重新决定渲染顺序. ** 作为组件存在 组件内容&#xff1a; Unity 使用Sort Group 组件的Sort layer 和Order in layer的值来确定排序组在渲染队列内相对与场景内其他排序组和游戏对象的优先级。 属性功能So…

解密时尚RFID服装电商仓储系统

大家好&#xff0c;今天我们要聊一个时尚圈的新宠——RFID服装电商仓储系统&#xff01;是不是听起来就很高端大气上档次&#xff1f;别急&#xff0c;我会来给你扒一扒这个神秘的系统。 首先&#xff0c;咱们得搞清楚什么是RFID。别想太复杂&#xff0c;RFID就是一种让衣服变得…

大数据课程H2——TELECOM的电信流量项目实现

文章作者邮箱&#xff1a;yugongshiyesina.cn 地址&#xff1a;广东惠州 ▲ 本章节目的 ⚪ 了解TELECOM项目的数据收集&#xff1b; ⚪ 了解TELECOM项目的数据清洗&#xff1b; ⚪ 了解TELECOM项目的数据导出&#xff1b; ⚪ 了解TELECOM项目的数据可视化&…

抖音小店规则解读:开设个人店铺,合规经营

抖音小店是抖音平台上的一项功能&#xff0c;允许用户在抖音上开设个人店铺&#xff0c;进行商品销售。下面不若与众将介绍关于抖音小店的一些规则&#xff1a; 1. 店铺资质要求&#xff1a;开设抖音小店需要满足一定的资质要求&#xff0c;包括拥有有效身份证件、年满18周岁、…

Oracle以逗号分隔的字符串拆分为多行数据实例详解

前言 近期在工作中遇到某表某字段是可扩展数据内容&#xff0c;信息以逗号分隔生成的&#xff0c;现需求要根据此字段数据在其它表查询相关的内容展现出来&#xff0c;第一想法是切割数据&#xff0c;以逗号作为切割符&#xff0c;以下为总结的实现方法&#xff0c;以供大家参…

设计中存在的误区

1、子组件&#xff0c;如果想要出现宽度和高度&#xff0c;要在子组件中的最大的盒子定义宽度和高度&#xff0c;才能出现 1.1 你在common.js定义是不管用的&#xff0c;要在自己的盒子中定义长度和高度

Spring 基础

目录 一、什么是 Spring 框架?二、Spring 包含的模块有哪些&#xff1f;2.1 版本2.2 Spring各模块依赖 三、Spring,Spring MVC,Spring Boot 之间什么关系? 一、什么是 Spring 框架? Spring 是一款开源的轻量级 Java 开发框架&#xff0c;旨在提高开发人员的开发效率以及系统…