ElasticSearch 批量插入漏数据

news2025/1/23 1:59:39

项目场景:

项目中需要把Mysql数据同步到ElasticSearch中


问题描述

数据传输过程中数据不时出现丢失的情况,偶尔会丢失一部分数据,本地测试也无法复现,后台程序也没有报错,一到正式环境就有问题,很崩溃

这里是批量操作的代码

private void bulk(List<IndexRequest> indexRequests) throws Exception {
        try {
            // 在这里可以对你获取到的批量结果数据进行需要的业务处理
            BulkProcessor bulkProcessor = BulkProcessor.builder(
                            (req, bulkListener) -> restHighLevelClient.bulkAsync(req, RequestOptions.DEFAULT, bulkListener),
                            new BulkProcessor.Listener() {
                                private int totalCount = 0;

                                @Override
                                public void beforeBulk(long executionId, BulkRequest request) {
                                }

                                @Override
                                public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                                    // 统计条数并输出信息
                                    int count = response.getItems().length;
                                    totalCount += count;
                                    log.info("批量操作 [{}] 成功执行了{}条请求,共处理了{}条数据", executionId, count, totalCount);
                                }

                                @Override
                                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                                    log.error("数据处理失败,执行id为{},错误信息为:{}", executionId, failure);
                                }
                            }
                    )
                    .setConcurrentRequests(esproperties.getThreadSize())/*并发请求的数量。默认为1。*/
                    .setFlushInterval(TimeValue.timeValueSeconds(30)) // 固定30s必须刷新一次
                    .setBulkSize(new ByteSizeValue(10L, ByteSizeUnit.MB)) // 5MB batch size
                    .setBulkActions(esproperties.getBulkActions()) // 每次执行最多处理5000个请求
                    .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                    .build();
            indexRequests.forEach(bulkProcessor::add);
            bulkProcessor.close();
        } catch (Exception e) {
            e.printStackTrace();
            throw new Exception(e);
        }
    }

原因分析:

当时想到的问题是这里是不是数据格式有问题,因为采用的是异步,就是错误了也不会影响到其它数据的插入

接着就定位到了这段代码,想想是不是哪里没有处理错误的数据信息,所以没有打印出来,果然发现了BulkResponse 这个类,是可以处理每个错误信息的,接着就优化了代码如下

其实只需要修改afterBulk 方法,遍历出现的异常就能够打印出导入不进去的错误信息

 @Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
 // 统计条数并输出信息
// int count = response.getItems().length;
// totalCount += count;
//  log.info("批量操作 [{}] 成功执行了{}条请求,共处理了{}条数据", executionId, count, totalCount);
 if (response.hasFailures()){
     for (BulkItemResponse itemResponse : response) {
         if (itemResponse.isFailed()) {
             log.info("数据写入失败:错误信息为:{}",itemResponse.getFailureMessage());
         }
     }
// log.info("数据写入失败:{}",response.buildFailureMessage());
 }
}

解决方案:

接着修改代码后把新的包放上去,执行,终于找到了错误信息

下面是错误信息的截图
在这里插入图片描述

报错 Limit of total fields 1000 这里就能看出来,是字段数量大于1000了,因为我的是宽表,而之前创建的索引字段数量都是小于1000的,新的索引结构数量大于1000,找到问题就好办了

在kibana执行下面脚本修改字段限制,根据实际情况来,没有kibana就写出curl 请求
在这里插入图片描述

PUT 你的索引名/_settings
{
  "index": {
    "mapping.total_fields.limit": 2000
  }
}

总结

  1. 没有测试好宽表字段比较多的情况
  2. 写代码的时候以为很简单不会出现问题,所以日志也比较随便。
  3. 日常开发要打印好日志,它能够在出现错误的情况下,很快的帮我们定位出问题所在。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1164364.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

泰坦陨落2报错找不到msvcr120/msvcp120丢失实用解决方法

在计算机使用过程中&#xff0c;我们可能会遇到各种问题&#xff0c;其中之一就是msvcr120.dll缺失。msvcr120.dll是Microsoft Visual C Redistributable的一个组件&#xff0c;它包含了运行许多基于Windows的应用程序所必需的库文件。当这个文件丢失或损坏时&#xff0c;可能会…

【JAVA学习笔记】59 - JUnit框架使用、本章作业

项目代码 https://github.com/yinhai1114/Java_Learning_Code/tree/main/IDEA_Chapter15/src/com/yinhai/homework JUnit测试框架 1.基本介绍 1. JUnit是一个Java语言的单元测试框架 2.多数Java的开发环境都已经集成了JUnit作为单元测试的工具 2.如何使用 创建方法后&#x…

LeetCode 70.爬楼梯 + 记忆化搜索 + 递推 + 动态规划 + 空间优化

关于此题的我的往期文章&#xff1a; leetCode 70.爬楼梯 动态规划_呵呵哒(&#xffe3;▽&#xffe3;)"的博客-CSDN博客https://blog.csdn.net/weixin_41987016/article/details/133325224?spm1001.2014.3001.5501 上i-1层楼梯&#xff0c;有 dfs(i-1) 种方法&#x…

Sqlyog 无法连接 8 版本的mysql caching_sha2_password could not be loaded

Sqlyog 无法连接 8 版本的mysql caching_sha2_password could not be loaded 1.问题背景 近期系统对Mysql 版本进行了升级&#xff0c;由原来的 5.7升至 8版本&#xff0c;在现场使用Sqlyog 作为数据库连接软件时&#xff0c;发现连接失败。 2.问题现象 使用Sqlyog配置完连…

Oracle 19c 可插拔数据库PDB的创建方式

多租户容器数据库架构图总览 多租户容器数据库组成部分&#xff1a; 1.有且仅有一个CDB Root(CDB$ROOT)&#xff0c;它包含了Root和所有PDB数据库的元数据和数据字典信息。 2.有且仅有一个Seed PDB(PDB$SEED),它的作用是创建其他PDB的模板&#xff0c;它是只读库&#xff0c;…

C语言实现俄罗斯方块游戏

文章目录 1 前言2 游戏截图3 源代码 1 前言 本文介绍的是我空闲时间用C语言写的一个俄罗斯方块游戏&#xff0c;整个程序只有一个文件&#xff0c;实现了基本的游戏功能&#xff0c;但还是有些缺陷&#xff0c;希望有心之士能够继续完善&#xff0c;感谢各位&#xff01; 2 游戏…

redis rdb aof

appendonly yes # appendfsync always appendfsync everysec # appendfsync no E:\Document_Redis_Windows\redis-2.4.5-win32-win64\64bit appendonly.aof

如何实现异步通知的重试机制

工作中经常要和第三方做对接&#xff0c;比如支付、电子合同等系统。操作成功之后&#xff0c;第三方会发送异步的通知&#xff0c;返回最终的处理结果&#xff0c;使用异步而不是使用同步通知&#xff0c;是为了加快系统响应速度&#xff0c;防止线程阻塞。任务处理完成后通过…

力扣 upper_bound 和 lower_bound

&#x1f468;‍&#x1f3eb; 34. 在排序数组中查找元素的第一个和最后一个位置 &#x1f338; AC code 2023版 class Solution {public int[] searchRange(int[] nums, int target) {int[] res { -1, -1 };if(nums.length 0)return res;int l 0;int r nums.length - 1;…

GD32F303RCT6不小心将读保护开启,导致后续程序烧不进去的解决办法

这里写自定义目录标题 错误现象判断读保护开启的方法用JLink-commander查看选项字节地址处的值 解锁读保护 错误现象 用j-flash v7.68b软件通过ARM仿真器设置接口为SWD烧录编译好的目标.bin文件&#xff0c;第一次烧录成功&#xff0c;后面再也烧录不进&#xff0c;出先现象 如…

【Unity实战】最全面的库存系统(一)

文章目录 先来看看最终效果前言定义物品定义人物背包物品插槽数据拾取物品物品堆叠绘制UI移动拖拽物品选中物品跟随鼠标移动背包物品交换物品拆分物品物品堆叠完结先来看看最终效果 前言 它又来了,库存系统我前面其实一句做过很多次了,但是这次的与以往的不太一样,这个将是…

UI设计一定不能错过的4款常用工具

虽然设计审美很重要&#xff0c;但软件只是一种工具&#xff0c;但就像走楼梯和坐电梯到达顶层一样&#xff0c;电梯的效率显然更高&#xff0c;易于使用的设计工具也是如此。让我们了解一下UI设计的主流软件&#xff0c;以及如何选择合适的设计软件。 即时设计 软件介绍 即…

蓝桥白皮书16.0版——1、STEMA 考试综述

目 录 ​​​​​​​STEMA 考试综述 STEMA 考试组别 STEMA 试题数量与计分点 STEMA 考试时长 STEMA 考试成绩计算与发布 STEMA 考试成绩计算 STEMA 考试分数区间 STEMA 考试成绩百分比 STEMA 考试命题原则 STEMA 考试范围 科技素养组考试范围 推荐初级考生阅读 &#xff1…

leetcode刷题 - SQL - 简单

目录 1. 175组合两个表 左外连接 2. 181. 超过经理收入的员工 3. 182. 查找重复的电子邮箱 4. 196. 删除重复的电子邮箱 5. 197. 上升的温度 日期作差 6. 511. 游戏玩法分析 I 7. 577. 员工奖金 null条件运算 8. 584. 寻找用户推荐人 9. 586. 订单最多的客户 10. 595. 大的国家…

频谱仪超外差和零中频架构

文章目录 超外差结构零中频结构接收机结构发射机结构 优缺点对比附录相关词汇多次变频的形象解释 参考文献 频谱仪的本质就是一个超宽带、超宽调谐范围、高动态范围的通信接收机&#xff0c; 频谱仪的原理即通信接收机的原理。 遇到高频率高带宽谐波成分复杂的通信信号的话&am…

uniapp:人脸识别功能,已测试,直接复制源码,支持H5,APP安卓。不依赖任何js,SDK。

先看效果图H5: APP效果图: H5:H5端代码用.html实现,uniapp打包H5拉相机有问题,不多赘述。 时间原因没有适配,直接用的px单位, 注意:本地无法测试,必须传到线上之后,通过https访问才能正常开启摄像头!!! <!DOCTYPE html> <html lang="en">…

python二进制文件转nc(以PIOMAS海冰厚度数据为例)

一、数据下载 数据网址Polar Science Center PIOMAS Variables on Model Grid (uw.edu) 以其中的海冰厚度数据为例进行转化 点击下载需要的年份&#xff1a; 首先要明白&#xff0c;二进制文件是4个字节按顺序依次存储所有数据&#xff0c;因此heff.H1979是没有记录对应的地…

CN考研真题知识点二轮归纳(5)

本轮的最后一贴&#xff0c;真题中涉及计网的部分彻底总结完&#xff01;后期的3轮总结可能会上一些大题&#xff0c;比如路由转发、子网划分什么的&#xff0c;以及重点的背诵内容~ 上期目录&#xff1a; CN考研真题知识点二轮归纳&#xff08;4&#xff09;https://jslhyh32…

19.10 Boost Asio 同步文件传输

在原生套接字编程中我们介绍了利用文件长度来控制文件传输的方法&#xff0c;本节我们将采用另一种传输方式&#xff0c;我们通过判断字符串是否包含goodbye lyshark关键词来验证文件是否传输结束了&#xff0c;当然了这种传输方式明显没有根据长度传输严谨&#xff0c;但使用这…

创新工具箱!重塑手机页面原型设计体验

在2024年&#xff0c;随着移动设备的普及和用户对移动体验的要求不断提升&#xff0c;手机页面原型设计工具变得越来越重要。在这篇文章中&#xff0c;我将为您推荐几款在2024年非常流行且值得一试的手机页面原型设计工具。 Pixso Pixso是一款基于云端的协作设计工具&#xf…