kafka 消息日志原理 指定偏移量消费 指定时间戳消费

news2025/1/12 20:36:15

Kafka 日志详解

Apache Kafka日志存储在物理磁盘上各种数据的集合,日志按照topic分区进行文件组织,每一个分区日志由一个或者多个文件组成。生产者发送的消息被顺序追加到日志文件的末尾。

在这里插入图片描述

如上图所述,Kafka主题被划分为3个分区。在Kafka中,分区是一个逻辑工作单元,其中记录被顺序附加分区上 (kafka只能保证分区消息的有序性,而不能保证消息的全局有序性)。但是分区不是存储单元,分区进一步划分为Segment - 段,这些段是文件系统上的实际文件。为了获得更好的性能和可维护性,可以创建多个段,而不是从一个巨大的分区中读取,消费者现在可以更快地从较小的段文件中读取。创建具有分区名称的目录,并将该分区的所有段作为各种文件进行维护。图中 topic主题分为3个分区

  • Partition 0 - 目前有三个segment文件段组成,Segment0、Segment1已经写满,目前Segment 2处于活跃状态
  • Partition 1
  • Partition 2

Segment

文件说明

|── my-topic-0
   ├── 00000000000000000000.index
   ├── 00000000000000000000.log
   ├── 00000000000000000000.timeindex
   ├── 00000000000000001007.index
   ├── 00000000000000001007.log
   ├── 00000000000000001007.snapshot
   ├── 00000000000000001007.timeindex

文件说明:

  • **.log 文件 - 此文件包含实际记录,并将记录保持到特定偏移量,文件名描述了添加到此文件的起始偏移量
  • .index 文件 - ** 索引文件,记录偏移量映射到 .log 文件的字节偏移量,此映射用于从任何特定偏移量读取记录
  • **.timeindex 文件- 时间戳索引文件,此文件包含时间戳到记录偏移量的映射,该映射使用.index文件在内部映射到记录的字节偏移量。这有助于从特定时间戳访问记录
  • .snapshot 文件 - 包含用于避免重复记录序列ID的生产者快照。出现Leader选举时使用,避免出现数据重复

数据长度

之前提到过,log文件的文件名表示该文件的起始偏移量。那么从上面的文件我们可以分析出,第一个日志段00000000000000000000.log包含从偏移量0到偏移量1006的记录。原因是下一个段00000000000000001007.log具有从偏移量1007开始的记录,这称为活动段。

在这里插入图片描述

当前活跃的Segment段文件是唯一可用于读取和写入的文件,而用户可以使用其他日志段(非活动)读取数据。当活动段变满(由log.segment.bytes配置,默认为1 GB)或配置的时间(log.roll.hours或log.roll.ms,默认为7天)过去时,该段将被滚动。这意味着活动段将以只读模式关闭并重新打开,并且将以读写模式创建新的段文件(活动段)。

log.roll.hours

Segment 日志保留的时间配置,单位为小时,默认168小时,即7天。

Type:int
Default:168
Valid Values:[1,…]
Importance:high
Update Mode:read-only

在生产环境中,该参数需要结合业务实际情况进行合理配置,否则就会出现磁盘爆满的问题。

我之前的工作经验中出现过此类问题,由于生产者产生数据频率较快,在2-3天之内就已经将500G的硬盘占满,但是kafka默认保留7天日志,导致数据没有及时清理,从而导致磁盘占满的问题。后面经过权衡,配置了48小时,之后再没有出现暴磁盘的现象。

位移索引

索引有助于消费者从任何指定偏移量或使用任何时间范围读取数据。如前所述,.index文件包含一个索引,该索引将逻辑偏移量映射到.log文件中记录的字节偏移量。您可能希望每个记录都可以使用此映射,但它不能以这种方式工作 。

如何在索引文件中生成新的索引项由log.index.interval.bytes参数定义,默认值为4096字节。这意味着在日志中每添加4096个字节后,就会向索引文件中添加一个索引项。假设生产者向Kafka主题发送消息,占100字节。在这种情况下,在日志文件中每追加41条记录(41*100=4100字节)后,将向.index文件中添加一个新的索引项。

在这里插入图片描述

消费者从指定偏移量位置读取数据的,步骤大致如下:

  1. 根据topic名称搜索.index文件。例如,如果偏移量为1191,将搜索其名称值小于1191的索引文件。索引文件的命名约定与日志文件相同(这一点非常重要)
  2. 在.index文件中搜索请求的偏移量所在的索引项
  3. 使用映射的字节偏移量访问**.log**文件,并开始使用该字节偏移量的记录

时间索引

消费者可能还希望从特定的时间戳读取记录。这就是使用到.timeindex索引文件。它维护一个时间戳和偏移量映射(映射到.index文件中的索引项),映射到.log文件中的实际字节偏移量。

在这里插入图片描述

如图索引,根据时间戳读取数据,比根据位移读取数据要复杂一些,多经历了一个数据查找的步骤。

指定读取

现在写两个按照指定位移/时间戳读取消息的demo。

位移读取

public class OffsetConsumer {

    public static void main(String[] args) {
        String bootstrapServers = "127.0.0.1:9092";
        String topic = "topic_t40";

        // create consumer configs
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // create consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // assign
        TopicPartition partitionToReadFrom = new TopicPartition(topic, 0);
        long offsetToReadFrom = 200L;
        consumer.assign(Arrays.asList(partitionToReadFrom));

        // seek to offset 200
        consumer.seek(partitionToReadFrom, offsetToReadFrom);

        boolean keepOnReading = true;

        // poll for new data
        while(keepOnReading){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records){
                System.out.println("Message received " + record.value() + ", partition " + record.partition() + ", offset=" + record.offset());
            }
        }
    }
}

在这里插入图片描述

如上图所示,指定读取分区0,从偏移量200开始读取数据

时间戳读取

public class TimestampConsumer {

    public static void main(String[] args) throws Exception{
        String topicName = "topic_t40";

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "app_hp1");
        props.put("client.id", "client_01");
        props.put("enable.auto.commit", true);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,   StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        List<PartitionInfo> partitions = consumer.partitionsFor(topicName);
        List<TopicPartition> topicPartitionList = partitions
                .stream()
                .map(info -> new TopicPartition(topicName, info.partition()))
                .collect(Collectors.toList());
        consumer.assign(topicPartitionList);

        Map<TopicPartition, Long> partitionTimestampMap = topicPartitionList.stream()
                .collect(Collectors.toMap(tp -> tp, tp -> 1672239981330L));
        Map<TopicPartition, OffsetAndTimestamp> partitionOffsetMap = consumer.offsetsForTimes(partitionTimestampMap);
// Force the consumer to seek for those offsets
        partitionOffsetMap.forEach((tp, offsetAndTimestamp) -> consumer.seek(tp, offsetAndTimestamp.offset()));

        boolean keepOnReading = true;
        while(keepOnReading){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records){
                System.out.println("Message received " + record.value() + ", partition " + record.partition() + ", offset=" + record.offset() + ", timestamp=" + record.timestamp());
            }
        }
    }
}

在这里插入图片描述

如上图所示,打印出来的消息时间都大于指定时间。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/131424.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vscode使用跳板机(密钥)进入内网并连接内网中其它机器(密码)

经过简单测试 1、不能像xshell一样选择服务器的密钥登陆&#xff0c;只能通过将本机的公钥传到服务器上 2、不能使用本地socket5做代理登录 3、不能使用系统代理登录 一、使用密钥连接到跳板机 1、内网穿透 2、将本机公钥上传到服务器上 1&#xff09;建立密钥对 无论是win…

redis集群 mac安装

1.安装redis mac环境用brew install安装 brew install redis 安装好后默认配置启动单点服务 redis-server 注&#xff1a;brew默认程序安装在/usr/local/Cellar目录下 /usr/local/Cellar/redis 默认配置文件在 /usr/local/etc/redis.conf 2.创建配置文件 准备创建6个节…

谣言检测数据集

1 PHEME-R 这是一个在PHEME FP7项目的新闻学用例中收集和注释的数据集。这些谣言与9个不同的突发新闻相关。它是为分析社交媒体谣言而创建的&#xff0c;包含由谣言推文发起的推特对话&#xff1b;对话包括对这些谣言推文的回应推文。这些推文已经被注解为支持度、确定性和证…

VS2012安装教程

我要学只有我们两个人懂得C语言。 安装包&#xff1a;https://pan.baidu.com/s/1YR7Xk9Zlv7zQWCsERdVgIQ [提取码]&#xff1a;stvi 1、右键以管理员身份运行 “vs_ultimate.exe” 2、编辑软件安装位置&#xff0c;然后点击同意许可&#xff0c;之后点下一步即可&#xff01; 3…

mongoDB聚合查询

管道 管道在Unix和Linux中一般用于将当前命令的输出结果作为下一个命令的参数。MongoDB的聚合管道将MongoDB文档在一个管道处理完毕后将结果传递给下一个管道处理。管道操作是可以重复的。 聚合管道操作 可参考菜鸟文档&#xff1a;菜鸟文档 命令 功能描述 $project指定输出…

shell第四天作业——流程控制之循环

题目 一、for创建20个用户&#xff0c;用户前缀由用户输入&#xff0c;用户初始密码由用户输入。 二、for循环ping测试指定网段的主机&#xff0c;网段由用户输入。 三、使用for/while实现批量主机root密码的修改 一、for创建20个用户&#xff0c;用户前缀由用户输入&#x…

2022年已然要结束了,一起来分享下你的故事吧!2023年的接力棒已经递到手里,千言万语不如一句Fighting!

【系列专栏】&#xff1a;博主结合工作实践输出的&#xff0c;解决实际问题的专栏&#xff0c;朋友们看过来&#xff01; 《QT开发实战》 《嵌入式通用开发实战》 《从0到1学习嵌入式Linux开发》 《Android开发实战》 《实用硬件方案设计》 长期持续带来更多案例与技术文章分享…

c++语法欠缺地方

sizeof是用来计算变量占多大内存的&#xff0c;单位是字节&#xff08;byte&#xff09;&#xff1b;sizeof 后面跟类型时&#xff0c;必须加上括号&#xff0c;例如sizeof(double);后面跟变量可以不用加括号&#xff0c;例如&#xff1a;sizeof d%d是以十进制形式输出有符号整…

CDP集群卸载过程

CDP集群卸载过程 1. 登录到Cloudera Manager&#xff0c;并停止整个集群服务。 2. 停用并移除所有Parcel 3. “停用”CDH7的Parcel。 4. 从主机删除”CDH6的Parcel 5. 删除集群 6. 登录server机器&#xff0c;停止CM Server服务 systemctl stop cloudera-scm-server 7. 移除…

如何实现高性能点赞(三)

数据库设计 数据库表中至少要包含三个字段&#xff1a;被点赞用户id&#xff0c;点赞用户id&#xff0c;点赞状态。再加上主键id&#xff0c;创建时间&#xff0c;修改时间就行了。 建表语句 对应的对象 UserLike 数据库操作 操作数据库同样封装在接口中 LikedService L…

【2.2】服务拆分--服务远程调用

服务拆分--服务远程调用1 案例--根据订单id查询订单功能2 远程调用方式分析2.1 注册RestTemplate2.2 发http请求3 总结1 案例–根据订单id查询订单功能 需求&#xff1a;根据订单id查询订单的同时&#xff0c;把订单所属的用户信息一起返回。 由上一节的测试结果可以看出&…

opencv-python常用函数解析及参数介绍(五)——腐蚀与膨胀

腐蚀与膨胀前言膨胀腐蚀开运算与闭运算礼帽与黑帽运用膨胀和腐蚀获得图像轮廓前言 有些时候图片上会有一些划痕或者污渍&#xff0c;会影响图片的质量&#xff0c;假设我有一张写有“艾醒”的图片&#xff0c;但是有花花绿绿的划痕和污渍&#xff0c;这时我们就可以运用腐蚀与…

五、Web自动化测试

Web自动化测试5.1 Selenium Web 自动化5.1.1 Selenium 和 Robot Framework SeleniumLibrary 库介绍安装 robotframework-seleniumlibrary导入 SeleniumLibrary5.1.2 Open Browser 和 Close Browser5.1.3 Input Text5.1.4 Click Button5.1.5 Click Element5.1.6 Click Link5.1.7…

2022CTF培训(十)IOT 相关 CVE 漏洞分析

附件下载链接 这里选择的设备是一款家用路由器&#xff0c;型号为 D-Link DIR-850L(EOL) 。由于该款路由器已停产&#xff0c;官网无法下载到固件&#xff0c;不过目前这个网站还能下载到相关的固件&#xff0c;当然附件中也会提供需要分析的固件。 固件解密 以 DIR850LB1_FW…

归一化 (Normalization)、标准化 (Standardization)和中心化/零均值化 (Zero-centered)

目录 一、概念 1、归一化&#xff08;Normalization&#xff09;&#xff1a; 2、标准化&#xff08;Standardization&#xff09;&#xff1a; 3、中心化/零均值化&#xff08;zero-centered&#xff09;&#xff1a; 二、联系和差异&#xff1a; 三、标准化和归一化的多种…

Faster RCNN网络源码解读(Ⅵ) --- RPN网络代码解析(上)RPNHead类与AnchorsGenerator类解析

目录 一、代码作用&#xff08;rpn_function.py&#xff09; 二、代码解析 2.1 RPNHead类 2.2 AnchorsGenerator类 2.2.1 初始化函数__init__ 2.2.2 正向传播过程 forward 2.2.3 set_cell_anchors生成anchors模板 2.2.4 generate_anchors生成anchors 2.2.5 cached_g…

【Linux】vim 中批量添加注释

本期主题&#xff1a;vim 中批量添加注释博客主页&#xff1a;小峰同学分享小编的在Linux中学习到的知识和遇到的问题小编的能力有限&#xff0c;出现错误希望大家不吝赐 此文主要介绍两种方法&#xff1a;方法一 &#xff1a;块选择模式&#xff1b;方法二: 替换命令 &#x…

Java基础随手记

数组 数组的使用 数组可以存放多个同一类型的数据&#xff0c;数组也是一种数据类型&#xff0c;是引用类型。即&#xff1a;数组就是一组数据 问题引入 传统的解决方式 使用数组来解决 可以看到&#xff0c;我们创建了一个double类型元素的数组&#xff0c;将我们要计算…

buuctf-misc-[GKCTF 2021]你知道apng吗1

先下载附件&#xff0c;快要过年了&#xff0c;十二月份还没发过文章&#xff0c;紧急写一篇。 下载文件后缀名为apng 搜索一下APNG&#xff08;基于PNG的位图动画格式&#xff09;_百度百科 利用火狐浏览器可以打开 类似gif图片的格式&#xff0c;用专门工具进行拆解&#xf…

MySQL内部的核心组件

mysql前言 1.MySQL的驱动 2.数据库的连接池 3.MySQL的工作线程 4.SQL接口 5.SQL解析器 6.查询优化器 7.执行器组件 8.存储引擎接口 1.MySQL的驱动是做什么的&#xff1f; 尤其记得刚刚学习MySQL的时候&#xff0c;引入的pom坐标&#xff1a;mysql-connector-java&#xff0c;这…