【已解决】Java 中使用 ES 高级客户端库 RestHighLevelClient 清理百万级规模历史数据

news2025/1/10 5:53:00

🎉工作中遇到这样一个需求场景:由于ES数据库中历史数据过多,占用太多的磁盘空间,需要定期地进行清理,在一定程度上可以释放磁盘空间,减轻磁盘空间压力。

🎈在经过调研之后发现,某服务项目每周产生的数据量已经达到千万级别,单日将近能产生两百万的数据量写入到 ES 数据库中,平均每个小时最少产生 10w+ 条数据,加上之前的历史数据,目前生产环境 ES 数据量已经达到两亿一千四百八十万的数据。并且随着当前业务量的爆发式增长,数据增长量急剧飙升,在未来一年内每周产生的数据量有望达到 3kw-5kw 左右。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

💡因此,对 ES 数据库中历史数据进行清理势在必行,为了能够释放磁盘空间,并且还要保证业务方能够进行日常问题的排查定位,决定从两个月前的数据开始清理,方案如下:

  • 编写定时任务,每天凌晨三点清理两个月前的那一天数据,之所以选择凌晨三点是因为那时候的 CPU 以及内存占用率较低。
  • 清理一天的数据时,根据时间段进行清理,每个小时清理一次,避免内存中存放太多的数据,导致内存溢出。
  • 清理 ES 数据时,需要先查询出数据,而 ES 默认最多只能查询 1w 条数据,如果当次需要删除的数据量超过 1w 条,普通的查询操作无法完全删除数据。因此,需要采用滚动查询的方式,滚动查询结果保持时间需要设置合理,不能太长,否则也可能会导致内存溢出。

根据以上的思路方案,设计的定时清理ES历史数据代码如下:

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

/**
 * 清理ES历史数据定时任务
 */
@Component
public class CleanESHistoryDataTask {

    private static final Logger LOGGER = LoggerFactory.getLogger(CleanESHistoryDataTask.class);

    @Resource
    private RestHighLevelClient restHighLevelClient;

    /**
     * 根据索引名称删除当前日期两个月前的那一天的历史文档数据
     * @param jobContext
     */
    @Scheduled
    public void cleanESHistoryData(JobContext jobContext) {
    	// jobContext为定时任务中回传数据
        String indexName = jobContext.getData();
        if (StringUtils.isBlank(indexName)) {
            LOGGER.warn("ES索引名称不能为空!");
            return;
        }
        long startTimeMillis = System.currentTimeMillis();
        String twoMonthsAgoDate = DateTool.format(DateUtils.addMonths(new Date(), -1), DateTool.DF_DAY);
        try {
            String startTimeStr = twoMonthsAgoDate + " 00:00:00";
            // 初始化时间,形如2023-08-06 00:00:00
            Date initialStartTime = DateTool.parse(startTimeStr, DF_FULL);
            // 每次循环清理一个小时历史文档数据,循环24次清理完一天的历史文档数据
            for (int i = 0; i < 24; i++) {
                Date startTime = initialStartTime;
                startTime = DateUtils.addHours(startTime, i);
                Date endTime = DateUtils.addHours(startTime, 1);
                LOGGER.info("正在清理索引:[{}],时间:{} 至 {}的历史文档数据...", indexName, DateTool.format(startTime, DF_FULL), DateTool.format(endTime, DF_FULL));
                long currentStartTimeMillis = System.currentTimeMillis();
                // 指定操作的索引库
                SearchRequest searchRequest = new SearchRequest(indexName);
                // 构造查询条件,指定查询的时间范围,每次最多写入1000条数据至内存,减轻服务器内存压力
                SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.rangeQuery("createTimeStr.keyword")
                                                                                    .from(DateTool.format(startTime, DF_FULL))
                                                                                    .to(DateTool.format(endTime, DF_FULL)))
                                                                                    .size(1000);
                // 设置滚动查询结果在内存中的过期时间为1min
                Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
                // 将滚动以及构造的查询条件放入查询请求
                searchRequest.scroll(scroll).source(searchSourceBuilder);
                SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
                // 记录要滚动的ID
                String scrollId = searchResponse.getScrollId();
                SearchHit[] hits = searchResponse.getHits().getHits();
                while (hits != null && hits.length > 0) {
                    // 创建批量处理请求对象
                    BulkRequest bulkRequest = new BulkRequest();
                    for (SearchHit hit : hits) {
                        DeleteRequest deleteRequest = new DeleteRequest(indexName, hit.getId());
                        bulkRequest.add(deleteRequest);
                    }
                    // 执行批量删除请求操作
                    restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
                    // 构造滚动查询条件,继续滚动查询
                    SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
                    scrollRequest.scroll(scroll);
                    searchResponse = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
                    scrollId = searchResponse.getScrollId();
                    hits = searchResponse.getHits().getHits();
                }
                // 当前滚动查询结束,清除滚动,释放服务器内存资源
                ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
                clearScrollRequest.addScrollId(scrollId);
                restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
                LOGGER.info("清理索引:[{}],时间:{} 至 {}的历史文档数据成功,耗时{}ms", indexName, DateTool.format(startTime, DF_FULL), DateTool.format(endTime, DF_FULL), (System.currentTimeMillis() - currentStartTimeMillis));
            }
            LOGGER.info("[cleanESHistoryData] 定时任务-清理索引:[{}],时间:{}的历史文档数据成功,耗时{}ms", indexName, twoMonthsAgoDate, (System.currentTimeMillis() - startTimeMillis));
        } catch (Exception e) {
            LOGGER.error(String.format("[cleanESHistoryData] 定时任务-清理索引:[{}],时间:{}的历史文档数据失败,耗时{}ms", indexName, twoMonthsAgoDate, (System.currentTimeMillis() - startTimeMillis)), e);
        }
    }
}

其中,需要注意以下几点

  • 在 Java 中对 ES 进行操作,这里使用的是 ES 的高级客户端组件 RestHighLevelClient
  • @Scheduled 注解为自研定时任务工具注解,外界无法使用,在使用定时任务时需要自己选择合适的定时任务框架。
  • DateTool 工具类为自研工具类,外界同样无法使用,在以上代码段中就是用于对 java.util.Date 类型进行转换为字符串,DF_FULLDateTool.DF_DAY 均是常量,它们的值分别为 yyyy-MM-dd HH:mm:ssyyyy-MM-dd

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/841911.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI一键生成短视频

AI一键生成推文短视频 阅读时长&#xff1a;10分钟 本文内容&#xff1a; 结合开源AI&#xff0c;一键生成短视频发布到常见的某音&#xff0c;某手平台&#xff0c;狠狠赚一笔 前置知识&#xff1a; 1.基本的 python 编程知识 2.chatGPT 使用过 3.stable diffution 使用过 成果…

一键开启ChatGPT“危险发言”

‍ ‍ 大数据文摘授权转载自学术头条 作者&#xff1a;Hazel Yan 编辑&#xff1a;佩奇 随着大模型技术的普及&#xff0c;AI 聊天机器人已成为社交娱乐、客户服务和教育辅助的常见工具之一。 然而&#xff0c;不安全的 AI 聊天机器人可能会被部分人用于传播虚假信息、操纵舆…

冠达管理:稳增长政策密集加码 顺周期板块有望持续表现

上星期A股商场回暖显着&#xff0c;首要宽基指数大都震荡收涨&#xff1b;日均成交额上升至约9600亿元&#xff1b;北向资金延续净买入&#xff0c;周净买入A股124.7亿元。职业层面&#xff0c;方针预期催化下&#xff0c;顺周期方向的金融、房地产等职业领涨。 机构以为&#…

方法区内存溢出及常量池

22 方法区-定义 是所有线程共享的一块区域。 存储了和类结构相关信息。运行时常量池&#xff0c; 方法区在虚拟机启动时被创建&#xff0c;逻辑上是堆的组成部分。方法区内存不足&#xff0c;也会导致oom异常。 是一个概念上的东西&#xff0c; 1.6使用永久代作为方法区&#…

Mybatis引出的一系列问题-spring多数据源配置

在日常开发中我们都是以单个数据库进行开发&#xff0c;在小型项目中是完全能够满足需求的。但是&#xff0c;当我们牵扯到像淘宝、京东这样的大型项目的时候&#xff0c;单个数据库就难以承受用户的CRUD操作。那么此时&#xff0c;我们就需要使用多个数据源进行读写分离的操作…

在Linux服务器上搭建Git

环境 服务器&#xff1a;Ubuntu 客户端&#xff1a;Win11 1、在服务器上安装Git&#xff08;服务器中处理&#xff09; 在服务器上执行git --version 如果出现&#xff1a; 则&#xff0c;已经安装Git&#xff0c;跳过此步骤。 如果没有&#xff0c;则&#xff1a; 执行…

前端个人年度工作述职报告(二十篇)

前端个人年度工作述职报告篇1 尊敬的各位领导、各位同仁&#xff1a; 大家好!按照20__年度我公司就职人员工作评估的安排和要求&#xff0c;我认真剖析、总结了自己的工作情况&#xff0c;现将本人工作开展情况向各位领导、同仁做以汇报&#xff0c;有不妥之处&#xff0c;希…

ElasticSearch详细操作

ElasticSearch搜索引擎详细操作以及概念 文章目录 ElasticSearch搜索引擎详细操作以及概念 1、_cat节点操作1.1、GET/_cat/nodes&#xff1a;查看所有节点1.2、GET/_cat/health&#xff1a;查看es健康状况1.3_、_GET/_cat/master&#xff1a;查看主节点1.4、GET/_cat/indices&a…

内存快照:宕机后,Redis如何实现快速恢复?RDB

AOF的回顾 回顾Redis 的AOF的持久化机制。 Redis 避免数据丢失的 AOF 方法。这个方法的好处&#xff0c;是每次执行只需要记录操作命令&#xff0c;需要持久化的数据量不大。一般而言&#xff0c;只要你采用的不是 always 的持久化策略&#xff0c;就不会对性能造成太大影响。 …

OpenAI因担心隐私问题而阻止GPT-4图像功能的发展

据《纽约时报》报道&#xff0c;GPT-4的图像能力可以识别某些个人。 OpenAI一直在测试其支持图像识别的多模态GPT-4版本&#xff0c;以便计划中的广泛发布。然而&#xff0c;据周二《纽约时报》报道&#xff0c;出于对其可能识别特定个体的担忧&#xff0c;公众访问被限制了。…

(2023国赛必看)零基础挑战一周拿下数学建模国奖

1、 数学建模国赛介绍 1.1 数学建模国赛是什么&#xff1f;如何评奖 全国大学生数学建模竞赛是全国高校规模最大的课外科技活动之一。该竞赛每年9月&#xff08;一般在上旬某个周末的星期五至下周星期一共3天&#xff0c;72小时&#xff09;举行&#xff0c;竞赛面向全国大专院…

使用vscode远程登录以及本地使用的配置(插件推荐)

1、远程登陆ssh 1.1打开vscode插件商店&#xff0c;安装remote-ssh插件 远程ssh添加第三方插件&#xff1a;vscode下链接远程服务器安装插件失败、速度慢等解决方法_vscode远程安装不上扩展_Emphatic的博客-CSDN博客 转到定义&#xff0c;选中代码->鼠标右键->转到定义…

Linux:在使用UEFI固件的计算机上内核是如何被启动的

前言 启动计算机通常不是一件难事&#xff1a;按下电源键&#xff0c;稍等片刻&#xff0c;你就能看到一个登录界面&#xff0c;再输入正确的密码&#xff0c;就可以开启一天的网上冲浪之旅了。 但偶尔这件事没那么顺利&#xff0c;有时候迎接你的不是熟悉的登录界面&#xf…

SSM(Vue3+ElementPlus+Axios+SSM前后端分离)--功能实现[五]

文章目录 SSM--功能实现实现功能09-带条件查询分页显示列表需求分析/图解思路分析代码实现测试分页条件查询带条件分页查询显示效果 实现功能10-添加家居表单前端校验需求分析/图解思路分析代码实现完成测试测试页面效果 实现功能11-添加家居表单后端校验需求分析/图解思路分析…

【HTML】<input>

分类 text password number button reset submit hidden radio checkbox file image color range tel email&#xff08;火狐有校验&#xff0c;360浏览器无校验。&#xff09; url datetime&#xff08;火狐、360浏览器不支持&#xff09; search date、month、week、time、da…

计算机网络-三种交换方式

计算机网络-三种交换方式 电路交换(Circuit Switching) 电话交换机接通电话线的方式称为电路交换从通信资源分配的角度来看&#xff0c;交换(Switching)就是按照某种方式动态的分配传输线路的资源 电话交换机 为了解决电话之间通信两两之间连线过多&#xff0c;所以产生了电话…

【Docker】docker镜像+nginx部署vue项目:

文章目录 一、文档&#xff1a;二、打包vue项目&#xff1a;三、配置nginx&#xff1a;四、配置Dockerfile&#xff1a;五、构建镜像&#xff1a;六、运行容器&#xff1a;七、最终效果&#xff1a; 一、文档&#xff1a; 【1】菜鸟教程&#xff1a;https://www.runoob.com/do…

windows下以指定用户访问SMB服务器进行读写

一 概述 最近遇到一个问题&#xff0c;linux 的 smb服务器开启匿名访问&#xff0c;windows访问linux文件夹不需要用户名密码就可以进去使用&#xff0c;但是存在一个问题&#xff0c;ssh连接到linux 后修改的文件&#xff0c;在windows已smb方式下打开某个文件修改 是没有权限…

HTML5 Canvas和Svg:哪个简单且好用?

HTML5 Canvas 和 SVG 都是基于标准的 HTML5 技术&#xff0c;可用于创建令人惊叹的图形和视觉体验。 首先&#xff0c;让我们花几句话介绍HTML5 Canvas和SVG。 什么是Canvas? Canvas&#xff08;通过 标签使用&#xff09;是一个 HTML 元素&#xff0c;用于在用户计算机屏幕…

Vue3+SpringBoot快速开发模板

起因&#xff1a;个人开发过程经常会使用到Vue3SpringBoot技术栈来开发项目&#xff0c;每次在项目初始化时都需要涉及一些重复的整理工作&#xff0c;于是结合一些个人觉得不错的前后端模板进行整合&#xff0c;打通一些大多数项目都需要的实现的基础功能&#xff0c;以便于快…