Java 中使用 ES 高级客户端库 RestHighLevelClient 清理百万级规模历史数据

news2025/1/9 1:52:20

🎉工作中遇到这样一个需求场景:由于ES数据库中历史数据过多,占用太多的磁盘空间,需要定期地进行清理,在一定程度上可以释放磁盘空间,减轻磁盘空间压力。

🎈在经过调研之后发现,某服务项目每周产生的数据量已经达到千万级别,单日将近能产生两百万的数据量写入到 ES 数据库中,平均每个小时最少产生 10w+ 条数据,加上之前的历史数据,目前生产环境 ES 数据量已经达到两亿一千四百八十万的数据。并且随着当前业务量的爆发式增长,数据增长量急剧飙升,在未来一年内每周产生的数据量有望达到 3kw-5kw 左右。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

💡因此,对 ES 数据库中历史数据进行清理势在必行,为了能够释放磁盘空间,并且还要保证业务方能够进行日常问题的排查定位,决定从两个月前的数据开始清理,方案如下:

  • 编写定时任务,每天凌晨三点清理两个月前的那一天数据,之所以选择凌晨三点是因为在 Grafana 查看了生产环境的集群监控情况,凌晨两点至四点之间的集群、索引的查询以及写入 QPS 都比较低。

在这里插入图片描述

  • 清理一天的数据时,根据时间段进行清理,每个小时清理一次,避免内存中存放太多的数据,导致内存溢出。
  • 清理 ES 数据时,需要先查询出数据,而 ES 默认最多只能查询 1w 条数据,如果当次需要删除的数据量超过 1w 条,普通的查询操作无法完全删除数据。因此,需要采用滚动查询的方式,滚动查询结果保持时间需要设置合理,不能太长,否则也可能会导致内存溢出。

根据以上的思路方案,设计的定时清理ES历史数据代码如下:

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

/**

  • 清理ES历史数据定时任务
    */
    @Component
    public class CleanESHistoryDataTask {

    private static final Logger LOGGER = LoggerFactory.getLogger(CleanESHistoryDataTask.class);

    @Resource
    private RestHighLevelClient restHighLevelClient;

    /**

    • 根据索引名称删除当前日期两个月前的那一天的历史文档数据
    • @param jobContext
      */
      @Scheduled
      public void cleanESHistoryData(JobContext jobContext) {
      // jobContext为定时任务中回传数据
      String indexName = jobContext.getData();
      if (StringUtils.isBlank(indexName)) {
      LOGGER.warn(“ES索引名称不能为空!”);
      return;
      }
      long startTimeMillis = System.currentTimeMillis();
      String twoMonthsAgoDate = DateTool.format(DateUtils.addMonths(new Date(), -1), DateTool.DF_DAY);
      try {
      String startTimeStr = twoMonthsAgoDate + " 00:00:00";
      // 初始化时间,形如2023-08-06 00:00:00
      Date initialStartTime = DateTool.parse(startTimeStr, DF_FULL);
      // 每次循环清理一个小时历史文档数据,循环24次清理完一天的历史文档数据
      for (int i = 0; i < 24; i++) {
      Date startTime = initialStartTime;
      startTime = DateUtils.addHours(startTime, i);
      Date endTime = DateUtils.addHours(startTime, 1);
      LOGGER.info(“正在清理索引:[{}],时间:{} 至 {}的历史文档数据…”, indexName, DateTool.format(startTime, DF_FULL), DateTool.format(endTime, DF_FULL));
      long currentStartTimeMillis = System.currentTimeMillis();
      // 指定操作的索引库
      SearchRequest searchRequest = new SearchRequest(indexName);
      // 构造查询条件,指定查询的时间范围,每次最多写入1000条数据至内存,减轻服务器内存压力
      SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.rangeQuery(“createTimeStr.keyword”)
      .from(DateTool.format(startTime, DF_FULL))
      .to(DateTool.format(endTime, DF_FULL)))
      .size(1000);
      // 设置滚动查询结果在内存中的过期时间为1min
      Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
      // 将滚动以及构造的查询条件放入查询请求
      searchRequest.scroll(scroll).source(searchSourceBuilder);
      SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
      // 记录要滚动的ID
      String scrollId = searchResponse.getScrollId();
      SearchHit[] hits = searchResponse.getHits().getHits();
      while (hits != null && hits.length > 0) {
      // 创建批量处理请求对象
      BulkRequest bulkRequest = new BulkRequest();
      for (SearchHit hit : hits) {
      DeleteRequest deleteRequest = new DeleteRequest(indexName, hit.getId());
      bulkRequest.add(deleteRequest);
      }
      // 执行批量删除请求操作
      restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
      // 构造滚动查询条件,继续滚动查询
      SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
      scrollRequest.scroll(scroll);
      searchResponse = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
      scrollId = searchResponse.getScrollId();
      hits = searchResponse.getHits().getHits();
      }
      // 当前滚动查询结束,清除滚动,释放服务器内存资源
      ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
      clearScrollRequest.addScrollId(scrollId);
      restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
      LOGGER.info(“清理索引:[{}],时间:{} 至 {}的历史文档数据成功,耗时{}ms”, indexName, DateTool.format(startTime, DF_FULL), DateTool.format(endTime, DF_FULL), (System.currentTimeMillis() - currentStartTimeMillis));
      }
      LOGGER.info(“[cleanESHistoryData] 定时任务-清理索引:[{}],时间:{}的历史文档数据成功,耗时{}ms”, indexName, twoMonthsAgoDate, (System.currentTimeMillis() - startTimeMillis));
      } catch (Exception e) {
      LOGGER.error(String.format(“[cleanESHistoryData] 定时任务-清理索引:[{}],时间:{}的历史文档数据失败,耗时{}ms”, indexName, twoMonthsAgoDate, (System.currentTimeMillis() - startTimeMillis)), e);
      }
      }
      }

其中,需要注意以下几点

  • 在 Java 中对 ES 进行操作,这里使用的是 ES 的高级客户端组件 RestHighLevelClient
  • @Scheduled 注解为自研定时任务工具注解,外界无法使用,在使用定时任务时需要自己选择合适的定时任务框架。
  • DateTool 工具类为自研工具类,外界同样无法使用,在以上代码段中就是用于对 java.util.Date 类型进行转换为字符串,DF_FULLDateTool.DF_DAY 均是常量,它们的值分别为 yyyy-MM-dd HH:mm:ssyyyy-MM-dd

在这里插入图片描述

🎈通过观察监控可以发现,在凌晨三点执行定时任务清理 ES 历史数据期间,集群、索引查询 QPS 以及 CPU 利用率指标都明显飙升。因此,清理 ES 数据时一定要避开流量高峰期,避免在流量高峰期清理数据时造成资源实例宕机,造成生产事故。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/916294.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

qq windows版客户端0day复现——远程代码执行(七夕小礼物)

##ps&#xff1a;本文章仅用来分享&#xff0c;请勿将文章内的相关技术用于非法目的&#xff0c;请勿将文章内的相关技术用于非法目的&#xff0c;请勿将文章内的相关技术用于非法目的&#xff01;&#xff01;如有非法行为与本文章作者无任何关系。一切行为以遵守《中华人民共…

电力巡检三维数字化管理的新方案:图新地球电力版

电力工业是国民经济发展的重要基础能源产业&#xff0c;是世界各国经济发展战略中的优先发展重点。当前中国电力行业运行平稳&#xff0c;电力消费持续增长&#xff0c;电力装机结构延续绿色低碳发展态势&#xff0c;同时投资规模日益扩大。随着全民用电量持续快速增长&#xf…

从头到尾说一次 Spring 事务管理(器) | 京东云技术团队

事务管理&#xff0c;一个被说烂的也被看烂的话题&#xff0c;还是八股文中的基础股之一。​ 本文会从设计角度&#xff0c;一步步的剖析 Spring 事务管理的设计思路&#xff08;都会设计事务管理器了&#xff0c;还能玩不转&#xff1f;&#xff09; 为什么需要事务管理&…

隐秘的角落:Java连接Oracle提示Connection timed out

前言 这个报错相信各位后端开发都不陌生&#xff0c;大体的原因就那么几种&#xff1a; 检查网络连接&#xff1a;确保您的计算机与数据库服务器之间的网络连接正常。尝试通过其他方式验证您的网络连接是否正常。 检查数据库服务器状态&#xff1a;确保数据库服务器正在运行&…

如何快速了解一家企业的各类信息?

我们在生活和工作会遇到一些情形&#xff0c;需要我们去查找一些企业的信息来推进。这时候如何快速查找到企业的信息呢&#xff1f; 根据场景不同&#xff0c;所需要的企业信息也是不同的&#xff0c;有的可能只需要企业的基本信息&#xff0c;有的情况需要企业的多维度信息&a…

Linux需要掌握哪些?

Linux运维工程师的基本工作之一是搭建相关编程语言的运行环境&#xff0c;使程序能够高效、稳定、安全地在服务器上运行。优秀的Linux运维工程师不但需要拥有架设服务器集群的能力&#xff0c;还需要拥有使用不同的编程语言开发常用的自动化运维工具或平台的能力&#xff0c;从…

SciencePub学术 | 计算机及交叉类重点SCIE征稿中

SciencePub学术 刊源推荐: 计算机及交叉类重点SCIE征稿中&#xff01;信息如下&#xff0c;录满为止&#xff1a; 一、期刊概况&#xff1a; 计算机土地类重点SCIE 【期刊简介】IF&#xff1a;1.0-1.5&#xff0c;JCR4区&#xff0c;中科院4区&#xff1b; 【版面类型】正刊…

LTDC之存储器映射闪存

对于大多数项目&#xff0c;建议使用外部闪存&#xff0c;因为这允许应用程序使用多个大型图像。 即便最普通的应用程序&#xff0c;内部闪存也可能会很快被占用完。 1.配置QSPI&#xff08;嵌入式基础知识&#xff0c;此处不做分析&#xff09; 2.编写W25Q256配置代码&#xf…

django+MySQL购物商城系统(含源码+论文)

对购物商城管理的流程进行科学整理、归纳和功能的精简&#xff0c;通过软件工程的研究方法&#xff0c;结合当下流行的互联网技术&#xff0c;最终设计并实现了一个简单、易操作的购物商城系统。内容包括系统的设计思路、系统模块和实现方法。系统使用过程主要涉及到管理员和用…

[JavaWeb]【九】web后端开发-SpringBootWeb案例(菜单)

目录 一、准备工作 1.1 需求 1.2 环境搭建 1.2.1 准备数据库&表 1.2.2 创建springboot工程 1.2.3 配置application.properties & 准备对应实体类 1.2.3.1 application.properties 1.2.3.2 实体类 1.2.3.2.1 Emp类 1.2.3.2.2 Dept类 1.2.4 准备对应的Mapper、…

[C#][原创]操作注册表一些注意点

C#注册表只需要引入 using Microsoft.Win32; C#注册表操作都是通过2个类Registry和RegistryKey进行所有操作。但是有些基本注意事项经常忘记&#xff0c;不常用就很容易忘记。 第一&#xff0c;打开注册表&#xff0c;第2个bool参数问题&#xff1a; RegistryKey key Regi…

算法与数据结构(九)--并查集

并查集是一种树型的数据结构&#xff0c;并查集可以高校地进行如下操作&#xff1a; *查询元素p和元素q是否在同一组 *合并元素p和元素q所在的组 一.并查集结构 并查集也是一种树型结构&#xff0c;这种树的要求比较简单&#xff1a;1.每个元素都唯一的对应一个结点&#xff…

海外ios应用商店优化排名因素之关键词

与Google Play Store相比&#xff0c;在Apple的App Store中&#xff0c;应用描述不会影响关键词排名。不过有一个专门针对App Store的关键词列表&#xff0c;我们可以在其中放置相关关键词。 1、关键词列表的限制仅为100个字符。 使用排名的竞争性较低的关键词&#xff0c;尝试…

Ubuntu无法连接外网

配置环境时&#xff0c;发现实验室服务器这阵子不能连接外网&#xff0c;网上看看了解决方案&#xff0c;确定是DNS的问题&#xff0c;解决方案如下&#xff1a; 首先&#xff0c;我尝试ping www.baidu.com,是可以ping通的&#xff0c;内网访问没问题。排除是否为网络连接问题…

校企合作 | 大势智慧受邀参与北斗共同体建设

8月16日&#xff0c;长江工业职业学院&#xff08;后简称“长江工院”&#xff09;副校长刘文胜&#xff0c;质管处处长黄世涛&#xff0c;测绘信息工程系党总支书记刘飞、系副主任陈志兰、系教师陈文玲一行莅临武汉大势智慧科技有限公司&#xff08;后简称“大势智慧”&#x…

什么是数字化和数字化转型?

数字化和数字化转型是企业、组织和流程现代化背景下经常使用的术语&#xff0c;以应对技术在我们生活中日益重要的作用。虽然它们是相关的概念&#xff0c;但它们具有不同的含义。 数字化是指将模拟信息转换为数字格式的过程。涉及获取物理信息&#xff0c;例如文本、图像、声…

FairyGUI编辑器的弹窗操作【插件】

之前在FairyGUI编辑器菜单扩展中&#xff0c;我使用了App.Alert("复制失败")来提示操作是否成功。这篇则会说一下我们可以使用的弹窗提示&#xff0c;以及做到类似资源发布成功时的“发布成功”飘窗。 打开APP的API脚本&#xff0c;可以看到有很多公开方法&#xff…

HCIP-Datacom:一篇掌握IPSec VPN的原理与配置!!!

一、背景 随着Internet的发展&#xff0c;越来越多的企业直接通过Internet进行互联&#xff0c;但由于IP协议未考虑安全性&#xff0c;而且Internet上有大量的不可靠用户和网络设备&#xff0c;所以用户业务数据要穿越这些未知网络&#xff0c;根本无法保证数据的安全性&#x…

LeetCode——有效的括号

这里&#xff0c;我提供一种用栈来解决的方法&#xff1a; 思路&#xff1a;栈的结构是先进后出&#xff0c;这样我们就可以模拟栈结构了&#xff0c;如果是‘&#xff08;’、‘{’、‘[’任何一种&#xff0c;直接push进栈就可以了&#xff0c;如果是‘}’、‘&#xff09;’…

计算机视觉入门 5)自定义卷积网络

系列文章目录 计算机视觉入门 1&#xff09;卷积分类器计算机视觉入门 2&#xff09;卷积和ReLU计算机视觉入门 3&#xff09;最大池化计算机视觉入门 4&#xff09;滑动窗口计算机视觉入门 5&#xff09;自定义卷积网络计算机视觉入门 6&#xff09; 数据集增强&#xff08;D…