Kafka磁盘写满日志清理操作

news2024/9/26 3:24:45

最近项目组的kafka集群,老是由于应用端写入kafka topic的消息太多,导致所在的broker节点占满,导致其他的组件接连宕机。

这里和应用端沟通可以删除1天之前的消息来清理磁盘,并且可以调整topic的消息存活时间。

一、调整Topic的消息存活时长删除消息

kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name topicName --alter --add-config retention.ms=86400000

如上调整topic的消息存活时长为为1天,当执行完之后执行查询topic详细信息,可以看到已经发生了修改,并且过一会过期的消息会被删除。

kafka-topics --bootstrap-server localhost:9092 --describe --topic topicName

二、不修改Topic消息存活时长删除消息

1.登录到相应的机器上。

2.找到写满的磁盘,删除掉不需要的业务数据。数据清理原则:

  • 不可直接删除Kafka的数据目录,避免造成不必要的数据丢失。
  • 找到占用空间较多或者明确不需要的Topic,选择其中某些Partition,从最早的日志数据开始删除。删除segment及相应地index和timeindex文件。不要清理内置的Topic,例如__consumer_offsets和_schema等。

3.重启磁盘被写满的相应的Broker节点,使日志目录online。

参考:Kafka磁盘写满时如何运维操作_开源大数据平台E-MapReduce-阿里云帮助中心 (aliyun.com)

怎么删除kafka中的数据-火山引擎 (volcengine.com)

三、Kafka消息清理策略

在Kafka中,存在数据过期的机制,称为data expire。如何处理过期数据是根据指定的policy(策略)决定的,而处理过期数据的行为,即为log cleanup。

在Kafka中有以下几种处理过期数据的策略:

log.cleanup.policy=delete(Kafka中所有用户创建的topics,默认均为此策略)

  • 根据数据已保存的时间,进行删除(默认为1周)
  • 根据log的max size,进行删除(默认为-1,也就是无限制)

log.cleanup.policy=compact(topic __consumer_offsets 默认为此策略)

  • 根据messages中的key,进行删除操作
  • 在active segment 被commit 后,会删除掉old duplicate keys
  • 无限制的时间与空间的日志保留

自动清理Kafka中的数据可以控制磁盘上数据的大小、删除不需要的数据,同时也减少了对Kafka集群的维护成本。

那Log cleanup 在什么时候发生呢?

  • 首先值得注意的是:log cleanup 在partition segment 上发生
  • 更小/更多的segment,也就意味着log cleanup 发生的频率会上升
  • Log cleanup 不应该频繁发生=> 因为它会消耗CPU与内存资源
  • Cleaner的检查会在每15秒进行一次,由log.cleaner.backoff.ms 控制

log.cleanup.policy=delete

log.cleanup.policy=delete 的策略,根据数据保留的时间、以及log的max size,对数据进行cleanup。控制数据保留时间以及log max size的参数分别为:

log.retention.hours:指定数据保留的时常(默认为一周,168)

  • 将参数调整到更高的值,也就意味着会占据更多的磁盘空间
  • 更小值意味着保存的数据量会更少(假如consumer 宕机超过一周,则数据便会再未处理前即丢失)

log.retention.bytes:每个partition中保存的最大数据量大小(默认为-1,也就是无限大)

  • 再控制log的大小不超过一个阈值时,会比较有用

在到达log cleanup 的条件后,cleaner会自动根据时间或是空间的规则进行删除,新数据仍写入active segment:

针对于这个参数,一般有以下两种使用场景,分别为:

log保留周期为一周,根据log保留期进行log cleanup:

  • log.retention.hours=168 以及 log.retention.bytes=-1

log保留期为无限制,根据log大小进行进行log cleanup:

  • log.retention.hours=17520以及 log.retention.bytes=524288000

其中第一个场景会更常见。

Log Compaction

Log compaction用于确保:在一个partition中,对任意一个key,它所对应的value都是最新的。

这里举个例子:我们有个topic名为employee-salary,我们希望维护每个employee当前最新的工资情况。

左边的是compaction前,segments中的数据,右边为compaction 后,segments中的数据,其中有部分key对应的value有更新:

  

可以看到在log compaction后,相对于更新后的key-value message,旧的message被删除。

Log Compaction 有如下特点:

  • messages的顺序仍然是保留的,log compaction 仅移除一些messages,但不会重新对它们进行排序
  • 一条message的offset是无法改变的(immutable),如果一条message缺失,则offset会直接被跳过
  • 被删除的records在一段时间内仍然可以被consumers访问到,这段时间由参数delete.retention.ms(默认为24小时)控制

需要注意的是:Kafka 本身是不会组织用户发送duplicate data的。这些重复数据也仅会在一个segment在被commit 的时候做重复数据删除,所以consumer仍会读取到这部分重复数据(如果客户端有发的话)。

Log Compaction也会有时失败,compaction thread 可能会crash,所以需要确保给Kafka server 足够的内存用于做这些操作。如果log compaction异常,则需要重启Kafka(此为一个已知的bug)。

Log Compaction也无法通过API手动触发(至少到现在为止是这样),只能server端自动触发。

下面是一个 Log Compaction过程的示意图:

正在写入的records仍会被写入Active Segment,已经committed segments会自动做compaction。此过程会遍历所有segments中的records,并移除掉所有需要被移除的messages。

Log compaction由上文提到的log.cleanup.policy=compact进行配置,其中:

  • Segment.ms(默认为7天):在关闭一个active segment前,所需等待的最长时间
  • Segment.bytes(默认为1G):一个segment的最大大小
  • Min.compaction .lag.ms(默认为0):在一个message可以被compact前,所需等待的时间
  • Delete.retention.ms(默认为24小时):在一条message被加上删除标记后,在实际删除前等待的时间
  • Min.Cleanable.dirty.ratio(默认为0.5):若是设置的更高,则会有更高效的清理,但是更少的清理操作触发。若是设置的更低,则清理的效率稍低,但是会有更多的清理操作被触发

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1130822.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

手写 Promise(2)实例方法与静态方法的实现

一:什么是 Promise Promise 是异步编程的一种解决方案,其实是一个构造函数,自己身上有all、reject、resolve这几个方法,原型上有then、catch等方法。 Promise对象有以下两个特点。 (1)对象的状态不受…

[③ADRV902x]: Digital Filter Configuration(接收端)

前言 本篇博客主要总结了ADRV9029 Rx接收端链路中各个滤波器的配置。配置不同的滤波器系数以及不同的参数,可以对输入的数字信号灵活得做decimation处理,decimation信号抽取,就是降低信号采样率的过程。 Receiver Signal Path 下图为接收端…

macbook2024免费mac系统优化清理软件CleanMyMac X

清理电脑的操作系统可能是我们一直以来的习惯,从windows系统到mac系统,我们一直在寻求最好的清理方法,能够有效地清理操作系统对于电脑来说是非常重要的。今天小编想和大家一起讨论使用在macbook上的清理软件,清理macbook的空间可…

在React中,什么是状态(state)?如何更新组件的状态?

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 欢迎来到前端入门之旅!感兴趣的可以订阅本专栏哦!这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

C语言程序设计——输入三个整数x,y,z,请把这三个数由小到大输出

题目:输入三个整数x,y,z,请把这三个数由小到大输出。 程序分析:我们想办法把最小的数放到x上,先将x与y进行比较, 如果x>y则将x与y的值进行交换,然后再用x与z进行比较,如果x>z则将x与z的值…

谈谈Net-SNMP软件

Net-SNMP是一个开源的SNMP软件套件,它提供了SNMP代理(snmpd)和SNMP工具(如snmpget、snmpwalk等),可以用于监控和管理网络设备。 Net-SNMP最初是从UC Davis的SNMP软件衍生而来,现在已经成为广泛…

SpringAOP源码解析之advice构建排序(二)

上一章我们知道Spring开启AOP之后会注册AnnotationAwareAspectJAutoProxyCreator类的定义信息,所以在属性注入之后initializeBean的applyBeanPostProcessorsAfterInitialization方法执行的时候调用AnnotationAwareAspectJAutoProxyCreator父类(AbstractAutoProxyCre…

通过怪物展示Demo理解游戏设计模式中的迭代器模式

点击上方亿元程序员关注和★星标 引言 大家好,我是亿元程序员,一位有着8年游戏行业经验的主程。 本系列是《和8年游戏主程一起学习设计模式》,让糟糕的代码在潜移默化中升华,欢迎大家关注分享收藏订阅。 今天我们要来聊一聊游戏…

开启生成式AI的探索之旅,亚马逊云科技分享生成式AI热门案例

现今,生成式AI为企业争先讨论的热门话题,上云出海为企业转型的重中之重。无论你是行业新贵还是中小企业,探索新的模式、创新迭代业务都是不容忽视的重点,下面就来介绍几个亚马逊云科技帮助企业创新的案例。 开启生成式AI的探索之旅…

深入理解 MySQL 中的锁和MVCC机制

文章目录 锁:数据访问的保护者1. 了解锁的基本概念2. 锁的使用场景3. 示例:MySQL中的锁 MVCC:多版本并发控制1. MVCC的工作原理2. MVCC的优点3. 示例:MySQL中的MVCC 如何选择合适的锁和MVCC1. 确定隔离级别2. 避免过度使用锁3. 监…

Spring IOC 和 AOP

核心概念 咱们这节就讲完了,在这节中我们讲了两个大概念,一个叫做IOC,一个叫做DI IOC是什么?是用对象的时候不要自己用new而是由外部提供,而spring在进行实现的时候是谁提供,就是IOC容器给你提供。 DI是什…

什么是脚本文件,脚本的执行,脚本格式等

1.脚本文件是什么? 脚本文件是包含一系列计算机命令的文本文件,通常用于自动化任务、自定义功能或执行特定操作。这些命令通常按照一定的编程语法和语义规则编写,以便计算机能够逐行解释和执行它们。脚本文件通常包含了一组操作,…

lua-web-utils和proxy设置示例

以下是一个使用lua-web-utils和proxy的下载器程序: -- 首先安装lua-web-utils库 local lwu require "lwu" ​ -- 获取服务器 local function get_proxy()local proxy_url "duoipget_proxy"local resp, code, headers, err lwu.fetch(proxy_…

工业通信网关常用的工业通信协议

在工业领域中常常有不同的设备协同工作,而这些设备的通信协议和数据格式也有所差异,要想实现不同通信设备之间的数据传输互通,工业网关是一个重要的设备。 什么是工业网关 工业网关是一种能够连接多种不同设备并实现数据的收集、传输、处理和…

Vuepress 三分钟搭建一个精美的文档或博客

大家好,我是凌览。 作为一个程序员,相信每个人都想折腾一个属于自己的博客。技术文章的输出是我们程序员能力的一种体现,也是一种非常好的个人总结。 网上有很多文档编写网站及工具,比如语雀、石墨等等。但多数需要付费&#xf…

工业RFID设备如何实现抗干扰功能?

在工业环境中存在多种干扰因素,如灰尘、水渍、油污等都会影响条码枪的信息识别,虽然RFID技术可以实现非接触的识别,无视油污、灰尘等环境,但仍会收到电流、震动、电磁信号等因素的干扰。下面我们就一起来了解一下,工业…

触摸屏与施耐德PLC之间MODBUS无线通讯

一、 硬件连接 1、PLC通讯接口说明: 2、通讯电缆图: 二、PLC设置 1. 配置端口: 双击串行线路—弹出右侧设置窗口---设置串口通讯参数 2. 添加MODBUS协议。 ① 右击串口线路,选择添加设备: ② 选择现场总线&#xf…

JAVA代码审计-纵向越权漏洞分析

查看这个cms系统后台管理员 添加用户的页面 点击添加管理员 这个模块只有管理员拥有,普通用户没有这个模块。 打开源码分析是否存在越权漏洞。 ------------------------------------------------------------------------------------------------------------ …