Kafka - 3.x offset位移不完全指北

news2025/1/13 13:27:03

文章目录

  • offset的默认维护位置
    • 消费`__consumer_offsets` 案例
  • 自动提交offset
    • Code
  • 手动提交offset
    • Code 同步提交
    • Code 异步提交
  • 指定offset 消费 (auto.offset.reset = earliest | latest | none |)
  • 数据漏消费和重复消费分析

在这里插入图片描述


offset的默认维护位置

由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。

Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets

在__consumer_offsets主题里面采用key+value的方式存储数据。

  • key是groupId+topic+分区号
  • value是当前offset的值。

每个一段时间,kafka内部就会对这个topic进行compact(压实),即每个groupId+topic+分区号就保留最新的数据。
在这里插入图片描述


消费__consumer_offsets 案例

  1. __consumer_offsets 为kafka中的topic, 那就可以通过消费者进行消费

  2. 在配置文件config/consumer.properties中添加配置exclude.internal.topics=false,默认就是true,表示不能消费系统主题。我们为了查看系统主题数据,需要将参数修改为false。
    在这里插入图片描述

  3. 在命令行创建一个新的topic

[root@localhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.126.171:9092 --create --topic testArtisan --partitions 2
Created topic testArtisan.

  1. 启动生产者向主题testArtisan 中生产数据
    在这里插入图片描述

  2. 启动消费者消费主题testArtisan 中的数据

在这里插入图片描述
注意:指定消费者组的名称,能够更好的观察数据存储位置(key—>groupId+toipc+分区号)。

  1. 启动消费者消费主题__consumer_offsets
[root@localhost bin]# ./kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server  192.168.126.171:9092 --consumer.config ../config/consumer.properties  --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

在这里插入图片描述


自动提交offset

Kafka的自动提交offset机制是一种用于管理消费者在消费消息时的偏移量(offset)的方式。这机制的主要特点是自动地将已成功消费的消息的offset提交给Kafka,而不需要消费者显式地去追踪和提交offset。以下是其工作原理的简要概述:

  1. 消费者订阅Topic:消费者在启动时订阅一个或多个Kafka Topic,以开始消费消息。

  2. 消息消费:消费者从订阅的Topic中拉取消息,并进行处理。一旦成功处理一条消息,消费者会自动记录该消息的offset。

  3. 自动提交offset:根据配置,消费者可以定期自动提交成功消费的消息的offset给Kafka集群。这意味着消费者不需要手动追踪每个分区的offset,Kafka会代替其执行这项任务。

  4. 配置参数:消费者可以通过配置以下两个参数来控制自动提交offset的方式:

    • enable.auto.commit:指定是否启用自动提交offset,默认为true
    • auto.commit.interval.ms:指定自动提交offset的时间间隔,默认为5秒。
  5. 注意事项:自动提交offset的机制便捷,但也需要注意以下几点:

    • 如果开启自动提交,消费者在处理消息时,offset将在后台自动提交。这可能导致消息在失败时被重新处理,因此消费者需要处理消息处理失败的情况。
    • 自动提交的时间间隔需要根据具体需求来配置,以兼顾数据处理的实时性和offset提交的频率。

在这里插入图片描述

自动提交offset机制简化了消费者代码,降低了维护的复杂性。但在某些情况下,需要注意确保消息处理的幂等性,以防止重复处理已经提交的消息。如果需要更精确的offset控制,或者需要在消息处理失败时执行自定义逻辑,消费者也可以选择禁用自动提交,手动管理offset。

Code

package com.artisan.pc;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomConsumer2 {

    public static void main(String[] args) {
        // 1.创建消费者的配置对象
        Properties properties = new Properties();

        // 2.给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.171:9092");
        // 配置序列化 必须
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费者组 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "artisan-group");

        // 是否自动提交offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // 提交offset的时间周期,默认5s,
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");


        // 3. 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // 4. 订阅主题
        consumer.subscribe(Arrays.asList("artisan"));

        // 5. 拉取数据打印
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            // 6. 遍历并输出消费到的数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}
    

手动提交offset

Kafka允许消费者以两种方式来管理offset,即消费者可以选择自动提交offset或手动提交offset。在手动提交offset的机制中,消费者有更多的控制权和灵活性,可以在确保消息被处理后再提交offset。以下是手动提交offset的简要描述:

  1. Offset的概念:在Kafka中,每个消费者都有一个当前的offset,表示它在分区中已经读取到的位置。Offset是一个标识,用来追踪消费者在每个分区中的读取位置。

  2. 手动提交offset:手动提交offset是指消费者自己负责告知Kafka Broker已经成功处理了一批消息,并提交了offset。这样的机制让消费者能够更细粒度地控制offset的提交时机。

  3. 何时提交offset:消费者可以在处理消息后手动提交offset,通常在以下情况下提交:

    • 在消息成功处理后,即确认消息已被消费。
    • 周期性地,以确保即使消费者失败,它不会重新处理相同的消息。
  4. 提交offset的方法:Kafka提供了两种主要的手动提交offset的方法:

    • commitSync():这是同步提交offset的方法,消费者会等待直到offset提交成功后才继续处理消息。
    • commitAsync():这是异步提交offset的方法,消费者会提交offset,但不会等待确认。
  5. 手动提交的注意事项

    • 手动提交offset需要谨慎,因为如果offset提交不正确,可能会导致消息被重复消费或者丢失。
    • 消费者需要确保offset提交的原子性,以避免提交失败的情况。
    • 如果消费者处理了消息但在提交offset之前失败,可能需要实施一些恢复机制,以避免数据丢失或重复处理。

手动提交offset的机制使消费者更有控制权,允许它们以适应不同的处理需求。然而,这也增加了一些复杂性,需要谨慎处理offset提交以确保数据的一致性和可靠性。自动提交offset与手动提交offset相比,更容易实施,但可能不适用于需要更细粒度控制的情况。

在这里插入图片描述

Code 同步提交

由于同步提交offset有失败重试机制,故更加可靠,以下为同步提交offset的示例。

package com.artisan.pc;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomConsumerByHand {

    public static void main(String[] args) {
        // 1. 创建kafka消费者配置类
        Properties properties = new Properties();
        // 2. 添加配置参数
        // 添加连接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.171:9092");
        // 配置序列化 必须
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // 配置消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        // 是否自动提交offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        // 提交offset的时间周期
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

        // 3. 创建kafka消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // 4. 设置消费主题  形参是列表
        consumer.subscribe(Arrays.asList("artisan"));

        // 5. 消费数据
        while (true) {
            // 6. 读取消息
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));

            // 7. 输出消息
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.value());
            }
            // 同步提交offset
            consumer.commitSync();
        }

    }
}
    

Code 异步提交

虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交offset的方式。

package com.artisan.pc;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomConsumerByHandAsync {

    public static void main(String[] args) {
        // 1. 创建kafka消费者配置类
        Properties properties = new Properties();
        // 2. 添加配置参数
        // 添加连接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.171:9092");
        // 配置序列化 必须
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // 配置消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        // 是否自动提交offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        // 提交offset的时间周期
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

        // 3. 创建kafka消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // 4. 设置消费主题  形参是列表
        consumer.subscribe(Arrays.asList("artisan"));

        // 5. 消费数据
        while (true) {
            // 6. 读取消息
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));

            // 7. 输出消息
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.value());
            }
            // 异步提交offset
            consumer.commitAsync(new OffsetCommitCallback() {
                /**
                 * 回调函数输出
                 * @param offsets   offset信息
                 * @param exception 异常
                 */
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    // 如果出现异常打印
                    if (exception != null) {
                        System.err.println("Commit failed for " + offsets);
                    }
                }
            });
        }

    }
}
    

指定offset 消费 (auto.offset.reset = earliest | latest | none |)

auto.offset.reset = earliest | latest | none |

在这里插入图片描述

当Kafka中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?
(1)earliest:自动将偏移量重置为最早的偏移量
(2)latest(默认值):自动将偏移量重置为最新偏移量
(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常


数据漏消费和重复消费分析

  1. 问题:无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。
  2. 漏消费:先提交offset后消费,有可能造成数据的漏消费;
  3. 重复消费:而先消费后提交offset,有可能会造成数据的重复消费。

在这里插入图片描述

思考:怎么才能做到既不漏消费也不重复消费呢?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1154865.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

云栖大会所感所想

离云栖大会圆满结束已经过去8天&#xff0c;第一次参加云栖4天时间收获颇丰&#xff0c;很多场景依旧历历在目&#xff0c;彷如昨日。我是从一个从事iava开发的角度&#xff0c;带着提升自己认知&#xff0c;拓展自己解决问题思路&#xff0c;学习业内人士及前辈的成长历程去参…

选择适合家具建材类跨境电商企业的企业邮箱解决方案

郑州&#xff0c;一座充满活力的内陆城市&#xff0c;近年来在跨境电商领域崛起&#xff0c;吸引了越来越多的国际关注。N公司是一家主营家居建材、厨房卫浴产品等生活耐用消费品的公司&#xff0c;以往主要是业务主要在北美&#xff0c;因为品质比较好&#xff0c;在美国已经打…

H5 Vue跳转小程序

准备工作 绑定域名 先登录微信公众平台进入 公众号设置 的 功能设置 里填写 JS接口安全域名。 控制台报 invalid url domain&#xff0c;就是当前地址没配在安全域名里 引入JS文件 npm install weixin-js-sdk 通过config接口注入权限验证配置 wx.config({debug: true, appI…

Fourier分析导论——第2章——Fourier级数的基本属性(E.M. Stein R. Shakarchi)

第 2 章 Fourier级数的基本属性(Basic Properties of Fourier Series) Nearly fifty years had passed without any progress on the question of analytic representation of an arbitrary function, when an assertion of Fourier threw new light on the subject. Thus…

贝叶斯神经网络用于学习曲线的概率预测【ICLR 2017】

论文下载地址&#xff1a;Excellent-Paper-For-Daily-Reading/hyper-parameters at main 类别&#xff1a;超参数 时间&#xff1a;2023/10/30 摘要 面对不同的神经网络结构、超参数和训练协议&#xff0c;通常需要检查生成学习曲线&#xff0c;以快速终止超参数设置不佳的…

《数据安全与流通:技术、架构与实践》新书发布

随着数据成为关键生产资料和要素&#xff0c;国内外数据安全相关的法律法规在快速完善&#xff0c;数据安全技术也在快速发展。5月25-26日&#xff0c;由星环科技、上海数据交易所、上海大数据联盟、财联社联合主办的向星力未来数据技术峰会 &#xff08;FDTC&#xff09;上&am…

Data Stream 复习(考试向)

Data Stream Review OverallUniform SamplingBloom FilterMisra-Gries AlgorithmCountMin Sketch AlgorithmCount Sketch Algorithm Overall Uniform Sampling Bloom Filter 一个箱子没有球的概率可以表示为 (1 - 1/n)^m 的原因是基于以下逻辑&#xff1a; 对于第一个球&#x…

Vue+OpenLayers6入门到实战进阶案例汇总目录,兼容OpenLayers7和OpenLayers8

本篇作为《VueOpenLayers入门教程》和《VueOpenLayers实战进阶案例》所有文章的二合一汇总目录&#xff0c;方便查找。 本专栏源码是由OpenLayers结合Vue框架编写。 本专栏从Vue搭建脚手架到如何引入OpenLayers依赖的每一步详细新手教程&#xff0c;再到通过各种入门案例和综合…

[双指针] (三) LeetCode LCR 179. 查找总价格为目标值的两个商品 和 15. 三数之和

[双指针] (三) LeetCode LCR 179. 查找总价格为目标值的两个商品 和 15. 三数之和 文章目录 [双指针] (三) LeetCode LCR 179. 查找总价格为目标值的两个商品 和 15. 三数之和查找总价格为目标值的两个商品题目分析解题思路代码实现总结 三数之和题目分析解题思路代码实现总结 …

Web APIs——其他事件

1、页面加载事件 加载外部资源&#xff08;如图片、外联CSS和JavaScript等&#xff09;加载完毕时触发的事件 为什么要学&#xff1f; 有些时候需要等页面资源全部处理完了做一些事情老代码喜欢把script写head中&#xff0c;这时候直接找dom元素找不到 事件名&#xff1a;load …

内存DMA及设备内存控制详解

序言 对于PCIe 设备&#xff08;PCIe Endpoint&#xff09;来说&#xff0c;其和CPU CORE、DRAM 的交互&#xff0c;主要涉及两种类型的内存访问&#xff1a; 设备内存访问&#xff1a;PCIe 设备的 Device Memory&#xff08;设备内存&#xff09;的访问&#xff0c;例如CPU …

java.net.URISyntaxException: Illegal character in query at index

现象 现象调用httpGet请求时&#xff0c;报错&#xff0c;如下&#xff1a; 原因 因为调用的url里有特殊字符 如单引号&#xff0c;双引号&#xff0c;等号&#xff0c;& | 等 解决方案 使用url带参构造方法&#xff0c;会对url里面的特殊字符进行转义处理 URL url n…

Python-常用的量化交易代码片段

算法交易正在彻底改变金融世界。通过基于预定义标准的自动化交易,交易者可以以闪电般的速度和比以往更精确的方式执行订单。如果您热衷于深入了解算法交易的世界,本指南提供了帮助您入门的基本代码片段。从获取股票数据到回溯测试策略,我们都能满足您的需求! 1. 使用 YFina…

k8s从私有仓库拉取镜像

从私有仓库拉取镜像 | Kubernetes 准备开始 你必须拥有一个 Kubernetes 的集群&#xff0c;同时你必须配置 kubectl 命令行工具与你的集群通信。 建议在至少有两个不作为控制平面主机的节点的集群上运行本教程。可以通过 Minikube 构建一个你自己的集群&#xff0c;或者你可以…

网管的利器之NMap

在进行网络管理过程中&#xff0c;网管会借助很多的工具比如付费的一些产品&#xff0c;比如漏洞扫描、安全隐患发现、网络设备管理、上网行为管理等。 更多的情况下&#xff0c;网管员使用一些DOS命令或者免费的工具进行&#xff0c;比如前面介绍过的PingInfoView.exe、WinMTR…

机器学习(六)构建机器学习模型

1.9构建机器学习模型 我们使用机器学习预测模型的工作流程讲解机器学习系统整套处理过程。 整个过程包括了数据预处理、模型学习、模型验证及模型预测。其中数据预处理包含了对数据的基本处理&#xff0c;包括特征抽取及缩放、特征选择、特征降维和特征抽样&#xff1b;我们将…

lambda表达式 - c++11

文章目录&#xff1a; lambda表达式概念lambda表达式语法函数对象与lambda表达式 lambda表达式概念 lambda 表达式是 c11 中引入的一种匿名函数&#xff0c;它可以在需要函数对象的地方使用&#xff0c;可以用作函数参数或返回值。lambda 表达式可以看作是一种局部定义的函数对…

mysql之用户管理、权限管理、密码管理

用户管理 创建用户create user 杨20.0.0.13 identified by 123; 用户重命名rename user 杨20.0.0.13 to yang20.0.0.13; 删除用户drop user 杨20.0.0.13; 权限管理 查看用户权限show grants for 杨20.0.0.13; 赋予用户权限grant all privileges on *.* to 杨localhost id…

文章导读助你高效成长

文章目录 Java基础篇MySQL数据库篇Redis缓存篇 &#x1f4d5;我是廖志伟&#xff0c;一名Java开发工程师、Java领域优质创作者、CSDN博客专家、51CTO专家博主、阿里云专家博主、清华大学出版社签约作者、产品软文创造者、技术文章评审老师、问卷调查设计师、个人社区创始人、开…

超低直流电阻测试仪

KDZD5510半导体体积电阻率测试仪是一款针对超低直流电阻测试专门设计开发的一款高精度测试仪&#xff0c;界面清爽、操作便捷&#xff1b;量程范围为&#xff1a;0.01uΩ~10MΩ&#xff1b;显示位数为五位半&#xff1b;自动双向电流测试&#xff0c; 同时脉冲式的测试方式避免…