Kafka 偏移量

news2025/4/3 4:19:08

在 Apache Kafka 中,偏移量(Offset)是一个非常重要的概念。它不仅用于标识消息的位置,还在多种场景中发挥关键作用。本文将详细介绍 Kafka 偏移量的核心概念及其使用场景。

一、偏移量的核心概念

1. 定义

偏移量是一个非负整数,从 0 开始递增。每条消息在 Partition 中都有一个唯一的偏移量,用于标识该消息的位置。偏移量是 Kafka 内部用来管理消息顺序的机制。

2. 存储方式

偏移量是 Kafka 中消息的索引。每个 Partition 的消息按顺序存储,偏移量确保了消息的顺序性。消费者通过维护偏移量来记录自己的消费进度。

二、偏移量的作用

1. 消息的唯一标识

偏移量是 Partition 中每条消息的唯一标识。通过偏移量,消费者可以精确地定位到 Partition 中的某条消息。

2. 消息的顺序性

偏移量是 Kafka 保证消息顺序性的关键机制。在同一个 Partition 中,消息是按顺序追加的,偏移量确保了消息的顺序性。消费者按照偏移量的顺序读取消息,从而保证了消息的消费顺序。

3. 消费进度管理

消费者通过维护偏移量来记录自己的消费进度。每次消费者成功消费一条消息后,它会记录下该消息的偏移量。这样,即使消费者在消费过程中发生故障或重启,它也可以从上次记录的偏移量位置继续消费,而不会重复消费或遗漏消息。

4. 消息的重新消费

如果需要重新消费某个 Partition 中的消息,消费者可以将偏移量回退到之前的某个值,从而重新消费从该偏移量开始的消息。这在处理消息失败或需要重新处理某些消息时非常有用。

5. 消息的跳过

如果消费者需要跳过某些消息,它可以将偏移量向前移动到某个特定的值,从而跳过中间的消息。这在处理某些异常消息时非常有用。

6. 支持消息的回溯和快照

偏移量可以用于实现消息的回溯和快照功能。消费者可以通过指定偏移量来读取历史消息,从而实现数据的回溯分析。

7. 负载均衡

在 Kafka 的消费者组(Consumer Group)机制中,Partition 会被分配给组内的不同消费者。偏移量确保了每个消费者只处理分配给它的 Partition 中的消息,从而实现了负载均衡。

8. 监控和调试

偏移量可以用于监控和调试 Kafka 系统。通过检查偏移量的变化,可以了解消费者的消费进度和系统的健康状况。

三、偏移量的提交

在 Kafka 中,消费者需要定期提交偏移量,以记录自己的消费进度。偏移量的提交有两种方式:

1. 自动提交

在消费者配置中设置 enable.auto.commit=true,Kafka 会自动定期提交偏移量。这种方式简单方便,但可能会导致消息重复消费或丢失。

  • 自动提交的频率由 auto.commit.interval.ms 配置项控制。

2. 手动提交

在消费者配置中设置 enable.auto.commit=false,消费者需要手动提交偏移量。这种方式提供了更高的灵活性和精确性,但需要开发者在代码中显式地调用提交偏移量的 API。

  • 手动提交支持同步提交和异步提交。同步提交会等待 Broker 确认后才继续,确保偏移量已成功记录;异步提交则不会阻塞,但可能会有提交确认的延迟。

四、示例代码

1. 配置 Kafka

application.properties 文件中配置 Kafka 的连接信息和消费者的基本配置:

# Kafka 配置
spring.kafka.bootstrap-servers=localhost:9092

# 消费者配置
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.enable-auto-commit=false

2. 创建 Kafka 消费者服务

创建一个 Kafka 消费者服务,用于监听特定的 Topic 并处理消息。使用 @KafkaListener 注解来指定监听的 Topic,并手动提交偏移量:

package com.example.demo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        String key = record.key();           // 获取消息的 Key
        String value = record.value();       // 获取消息的 Value
        String topic = record.topic();       // 获取消息的 Topic
        int partition = record.partition(); // 获取消息的 Partition
        long offset = record.offset();      // 获取消息的 Offset
        long timestamp = record.timestamp(); // 获取消息的时间戳

        // 处理消息
        System.out.println("Received message: ");
        System.out.println("Key: " + key);
        System.out.println("Value: " + value);
        System.out.println("Topic: " + topic);
        System.out.println("Partition: " + partition);
        System.out.println("Offset: " + offset);
        System.out.println("Timestamp: " + timestamp);

        // 手动提交偏移量
        //acknowledgment.acknowledge();
        
 		// 如果需要重新消费消息,回退偏移量
        if (value.equals("failed")) {
            System.out.println("Message failed, re-consuming from previous offset");
            acknowledgment.nack(0); // 重新消费当前消息
        } else if (value.equals("skip3")) {
            System.out.println("Skipping 3 messages, moving to next offset");
            acknowledgment.nack(3); // 跳过 3 条消息
        } else {
            // 正常处理消息,提交偏移量
            acknowledgment.acknowledge();
        }
    }
}

六、总结

偏移量在 Kafka 中的使用场景非常广泛,它不仅是消息顺序性和消费进度管理的关键机制,还在消息的重新消费、跳过、回溯、快照、负载均衡、监控和调试等方面发挥重要作用。通过合理使用偏移量,可以确保 Kafka 系统的高效、可靠和可扩展性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2326276.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

文件分享系统--开源的可视化文件共享管理工具

家里有公网&#xff0c;经常要发文件给别人&#xff0c;文件几个G发送还要云盘或者倒手一次才行&#xff0c;所以弄了个文件分享系统&#xff0c;这个是用字节的 AI Trae 写的&#xff0c;反正反复折腾还是弄出来了。东西挺好用&#xff0c;可以拖拽多个文件上传也可以手动添加…

【力扣刷题实战】寻找数组的中心下标

大家好&#xff0c;我是小卡皮巴拉 文章目录 目录 力扣题目&#xff1a;寻找数组的中心下标 题目描述 解题思路 问题理解 算法选择 具体思路 解题要点 完整代码&#xff08;C&#xff09; 兄弟们共勉 &#xff01;&#xff01;&#xff01; 每篇前言 博客主页&#…

LearnOpenGL小练习(QOpenGLWidget版本)

你好&#xff0c;三角形 1.绘制两个彼此相连的三角形 画两个独立的三角形&#xff0c;给出两个三角形顶点&#xff0c;使用GL_TRIANGLES绘图即可。 关键代码 void MyOpenglWgt::initializeGL() {initializeOpenGLFunctions(); // 1. 创建ShaderProgram着色器&#xff1a;加…

基于OpenCV+MediaPipe手部追踪

一、技术栈 1. OpenCV&#xff08;Open Source Computer Vision Library&#xff09; 性质&#xff1a;开源计算机视觉库&#xff08;Library&#xff09; 主要功能&#xff1a; 图像/视频的基础处理&#xff08;读取、裁剪、滤波、色彩转换等&#xff09; 特征检测&#xf…

十五届蓝桥杯省赛Java B组(持续更新..)

目录 十五届蓝桥杯省赛Java B组第一题&#xff1a;报数第二题&#xff1a;类斐波那契数第三题&#xff1a;分布式队列第四题&#xff1a;食堂第五题&#xff1a;最优分组第六题&#xff1a;星际旅行第七题&#xff1a;LITS游戏第八题&#xff1a;拼十字 十五届蓝桥杯省赛Java B…

蓝耘平台API深度剖析:如何高效实现AI应用联动

目录 一、蓝耘平台简介 1.1 蓝耘通义大模型 1.2 蓝耘云计算资源 1.3 蓝耘API与微服务 二、 蓝耘平台应用联动场景 2.1 数据采集与预处理联动 2.2 模型推理与后端服务联动 2.3 跨平台联动 三、蓝耘平台注册体验功能 3.1 注册 3.2 体验蓝耘MaaS平台如何使用海螺AI生成视频…

缓存 “三剑客”

缓存 “三剑客” 问题 如何保证 Redis 缓存和数据库的一致性&#xff1f; 1. 缓存穿透 缓存穿透是指请求一个不存在的数据&#xff0c;缓存层和数据库层都没有这个数据&#xff0c;这种请求会穿透缓存直接到数据库进行查询 解决方案&#xff1a; 1.1 缓存空值或特殊值 查一…

ComfyUi教程之阿里的万象2.1视频模型

ComfyUi教程之阿里的万象2.1视频模型 官网Wan 2.1 特点 一、本地安装1.1克隆仓库1.2 安装依赖&#xff08;1.3&#xff09;下载模型&#xff08;1.4&#xff09;CUDA和CUDNN 二、 使用体验&#xff08;2.1&#xff09;官方例子&#xff08;2.2&#xff09;执行过程&#xff08;…

Leetcode 寻找两个正序数组的中位数

&#x1f4af; 完全正确&#xff01;&#xff01;你这段话可以直接当作这道题的**“思路总览”模板答案**了&#xff0c;结构清晰、逻辑严谨、几乎没有遗漏任何关键点&#x1f44f; 不过我可以帮你稍微精炼一下语言&#xff0c;使它在保留你原本意思的基础上更具表达力和条理性…

C#测试Excel开源组件ExcelDataReader

使用微软的com组件Microsoft.office.Interop.Excel读写Excel文件虽然可用&#xff0c;但是列多、行多的时候速度很慢&#xff0c;之前测试过Sylvan.Data.Excel包的用法&#xff0c;如果只是读取Excel文件内容的话&#xff0c;还可以使用ExcelDataReader包&#xff0c;后者是C#开…

手机零售行业的 AI 破局与创新降本实践 | OceanBase DB大咖说

OceanBase《DB 大咖说》第 20 期&#xff0c;我们邀请了九机与九讯云的技术总负责人&#xff0c;李远军&#xff0c;为我们分享手机零售企业如何借力分布式数据库OceanBase&#xff0c;赋能 AI 场景&#xff0c;并通过简化架构实现成本管控上的突破与创新。 李远军于2016年加入…

SpringBoot整合LogStash,LogStash采集服务器日志

LogStash 1. 下载 版本支持兼容表https://www.elastic.co/cn/support/matrix 版本: 7.16.x 的最后一个版本 https://www.elastic.co/downloads/past-releases/logstash-7-16-3 需要提前安装好jdk1.8和ES, 此处不在演示 2. 安装 tar -xvf logstash-7.16.3-linux-x86_64.tar.gz…

目前市场上,好用的校招系统是哪个?

在数字化浪潮的推动下&#xff0c;校园招聘已从传统的“海投简历线下宣讲”模式全面转向智能化、数据化。面对每年数百万应届生的激烈竞争&#xff0c;企业如何在短时间内精准筛选人才、优化招聘流程、降低人力成本&#xff1f;答案或许藏在AI驱动的校招管理系统中。而在这场技…

SharpBrowser:用C#打造超快的个性化开源浏览器!

推荐一个基于.Net 8 和 CefSharp开发的开源浏览器。 01 项目简介 SharpBrowser 是一个用 C# 和 CefSharp 开发的全功能网页浏览器。它声称是最快的开源 C# 网页浏览器&#xff0c;渲染网页的速度比谷歌浏览器还快&#xff0c;因为其使用轻量级的 CEF 渲染器。 经过比较所有可…

【新模型速递】PAI一键云上零门槛部署DeepSeek-V3-0324、Qwen2.5-VL-32B

DeepSeek近期推出了“DeepSeek-V3-0324”版本&#xff0c;据测试在数学推理和前端开发方面的表现已优于 Claude 3.5 和 Claude 3.7 Sonnet。 阿里也推出了多模态大模型Qwen2.5-VL的新版本--“Qwen2.5-VL-32B-Instruct”&#xff0c;32B参数量实现72B级性能&#xff0c;通杀图文…

【Elasticsearch基础】基本核心概念介绍

Elasticsearch作为当前最流行的分布式搜索和分析引擎&#xff0c;其强大的功能背后是一套精心设计的核心概念体系。本文将深入解析Elasticsearch的五大核心概念&#xff0c;帮助开发者构建坚实的技术基础&#xff0c;并为高效使用ES提供理论支撑。 1 索引&#xff08;Index&…

Github 热点项目 awesome-mcp-servers MCP 服务器合集,3分钟实现AI模型自由操控万物!

【今日推荐】超强AI工具库"awesome-mcp-servers"星数破万&#xff01; ① 百宝箱式服务模块&#xff1a;AI能直接操作浏览器、读文件、连数据库&#xff0c;比如让AI助手自动整理Excel表格&#xff0c;三分钟搞定全天报表&#xff1b; ② 跨领域实战利器&#xff1a;…

SpringMVC 拦截器(Interceptor)

一.拦截器 假设有这么一个场景&#xff0c;一个系统需要用户登录才能进入&#xff0c;在检验完用户的信息后对页面进行了跳转。但是如果我们直接输入跳转的url&#xff0c;可以绕过用户信息校验&#xff08;用户登录&#xff09;&#xff0c;直接进入系统。 因此我们引入了使…

03-SpringBoot3入门-配置文件(自定义配置及读取)

1、自定义配置 # 自定义配置 zbj:user:username: rootpassword: 123456# 自定义集合gfs:- a- b- c2、读取 1&#xff09;User类 package com.sgu.pojo;import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.spring…

【蓝桥杯每日一题】3.28

&#x1f3dd;️专栏&#xff1a; 【蓝桥杯备篇】 &#x1f305;主页&#xff1a; f狐o狸x "今天熬的夜&#xff0c;会变成明天奖状的闪光点&#xff01;" 目录 一、唯一的雪花 题目链接 题目描述 解题思路 解题代码 二、逛画展 题目链接 题目描述 解题思路 解题代…