数据库同步 Elasticsearch 后数据不一致,怎么办?

news2024/12/23 5:50:52

1、实战线上问题

  • Q1:Logstash 同步 postgreSQL 到 Elasticsearch 数据不一致。

在使用 Logstash 从 pg 库中将一张表导入到 ES 中时,发现 ES 中的数据量和 PG 库中的这张表的数据量存在较大差距。如何快速比对哪些数据没有插入?导入过程中,Logstash 日志没有异常。PG 中这张表有 7600W。

  • Q2:mq 异步双写数据库、es 的方案中,如何保证数据库数据和 es 数据的一致性?

2、推荐解决方案之一——ID 比较法

如下示例,仅拿问题1举例验证,问题2原理一致。

2.1 方案探讨

要找出哪些数据没有插入到 Elasticsearch 中,可以采用以下方法:

  • 确保 Logstash 配置文件中的 input 插件的 JDBC 驱动程序正确配置,以便从 PostgreSQL 数据库中提取所有数据。注意 statement 参数,确保它选择了所有需要的数据。

  • 检查 Logstash 配置文件的 output 插件,确保正确配置了 Elasticsearch 的连接参数。同时,检查是否有过滤器在导入过程中过滤掉了部分数据。

  • 在 Logstash 配置文件中添加一个 stdout 插件,将从 PostgreSQL 数据库中读取的数据记录到文件中。

例如,可以添加以下内容:

output {
  elasticsearch {
    ...Elasticsearch 配置...
  }
  stdout {
    codec => json_lines
    path => "/path/to/logstash_output.log"
  }
}

将 Logstash 输出文件与 PostgreSQL 数据库中的原始数据进行比较,以找出未导入的数据。可以使用 Python、Shell 脚本或其他编程语言编写一个简单的脚本来执行此操作。

如果 Logstash 输出文件中的记录数与 PostgreSQL 数据库中的记录数一致,但 Elasticsearch 中的记录数不一致,请检查 Elasticsearch 集群的健康状况和日志。确认集群是否在接收和索引数据时遇到问题。

如果问题仍然存在,尝试将批量操作的大小减小,以减轻 Elasticsearch 和 Logstash 的负担。可以通过在 Logstash 配置文件的 output 插件中设置 flush_size 和 idle_flush_time 参数来实现。

处理大量数据时,可能需要调整 Logstash 和 Elasticsearch 的性能和资源配置。根据硬件和网络条件,可能需要优化批量操作、JVM 设置、线程池大小等方面的设置。

2.2 比较脚本的实现

以下是一个简单的 Shell 脚本示例,用于比较 Logstash 输出文件(JSON 格式)和 PostgreSQL 数据库中的数据。该脚本将比较特定字段(如 id)以确定哪些数据可能未导入到 Elasticsearch。

832634d81a787608ec2eec747c428c1e.png

首先,从 PostgreSQL 数据库中导出数据,将其保存为 CSV 文件:

COPY (SELECT id FROM your_table) TO '/path/to/postgres_data.csv' WITH

接下来,创建一个名为 compare.sh 的 Shell 脚本:

#!/bin/bash
# 将 JSON 文件中的 ID 提取到一个文件中
jq '.id' /path/to/logstash_output.log > logstash_ids.txt

# 删除 JSON 中的双引号
sed -i 's/"//g' logstash_ids.txt

# 对 Logstash 和 PostgreSQL 的 ID 文件进行排序
sort -n logstash_ids.txt > logstash_ids_sorted.txt
sort -n /path/to/postgres_data.csv > postgres_ids_sorted.txt

# 使用 comm 比较两个已排序的 ID 文件
comm -23 postgres_ids_sorted.txt logstash_ids_sorted.txt > missing_ids.txt

# 输出结果
echo "以下 ID 在 Logstash 输出文件中未找到:"
cat missing_ids.txt

为脚本添加可执行权限并运行:

chmod +x compare.sh

./compare.sh

此脚本会比较 logstash_output.log 和 postgres_data.csv 文件中的 ID。如果发现缺失的 ID,它们将被保存在 missing_ids.txt 文件中,并输出到控制台。请注意,该脚本假设已经安装了 jq(一个命令行 JSON 处理器)。如果没有,请先安装 jq

3、推荐方案二——Redis 加速对比

在这种情况下,可以使用 Redis 的集合数据类型来存储 PostgreSQL 数据库和 Logstash 输出文件中的 ID。接下来,可以使用 Redis 提供的集合操作来找到缺失的 ID。

9869638a802246d944be657c1bb71feb.png

以下是一个使用 Redis 实现加速比对的示例:

首先,从 PostgreSQL 数据库中导出数据,将其保存为 CSV 文件:

COPY (SELECT id FROM your_table) TO '/path/to/postgres_data.csv' WITH CSV HEADER;

安装并启动 Redis。

使用 Python 脚本将 ID 数据加载到 Redis:

import redis
import csv

# 连接到 Redis

r = redis.StrictRedis(host='localhost', port=6379, db=0)

# 从 PostgreSQL 导出的 CSV 文件中加载数据
with open('/path/to/postgres_data.csv', newline='') as csvfile:
    csv_reader = csv.reader(csvfile)
    next(csv_reader)  # 跳过表头
    for row in csv_reader:
        r.sadd('postgres_ids', row[0])

# 从 Logstash 输出文件中加载数据
with open('/path/to/logstash_output.log', newline='') as logstash_file:
    for line in logstash_file:
        id = line.split('"id":')[1].split(',')[0].strip()
        r.sadd('logstash_ids', id)

# 计算差集
missing_ids = r.sdiff('postgres_ids', 'logstash_ids')

# 输出缺失的 ID
print("以下 ID 在 Logstash 输出文件中未找到:")
for missing_id in missing_ids:
    print(missing_id)

这个 Python 脚本使用 Redis 集合数据类型存储 ID,然后计算它们之间的差集以找到缺失的 ID。需要先安装 Python 的 Redis 库。可以使用以下命令安装:

pip install redis

这个脚本是一个基本示例,可以根据需要修改和扩展它。使用 Redis 的优点是它能在内存中快速处理大量数据,而不需要在磁盘上读取和写入临时文件。

4、小结

方案一:使用 Shell 脚本和 grep 命令

  • 优点:

(1)简单,易于实现。

(2)不需要额外的库或工具。

  • 缺点:

(1)速度较慢,因为它需要在磁盘上读写临时文件。

(2)对于大数据量的情况,可能会导致较高的磁盘 I/O 和内存消耗。

方案二:使用 Redis 实现加速比对

  • 优点:

(1)速度更快,因为 Redis 是基于内存的数据结构存储。

(2)可扩展性较好,可以处理大量数据。

  • 缺点:

(1)实现相对复杂,需要编写额外的脚本。

(2)需要安装和运行 Redis 服务器。

根据需求和数据量,可以选择合适的方案。如果处理的数据量较小,且对速度要求不高,可以选择方案一,使用 Shell 脚本和 grep 命令。这种方法简单易用,但可能在大数据量下表现不佳。

如果需要处理大量数据,建议选择方案二,使用 Redis 实现加速比对。这种方法速度更快,能够有效地处理大数据量。然而,这种方法需要额外的设置和配置,例如安装 Redis 服务器和编写 Python 脚本。

在实际应用中,可能需要根据具体需求进行权衡,以选择最适合的解决方案。

推荐阅读

  1. 全网首发!从 0 到 1 Elasticsearch 8.X 通关视频

  2. 重磅 | 死磕 Elasticsearch 8.X 方法论认知清单

  3. 如何系统的学习 Elasticsearch ?

  4. 2023,做点事

0475a0c3837bfc5e665cc986d8056496.jpeg

更短时间更快习得更多干货!

和全球 近2000+ Elastic 爱好者一起精进!

c3d16daa12c043e4c294b9b8ee545b62.gif

比同事抢先一步学习进阶干货!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/433130.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

六:内存回收

内存回收: 应用程序通过 malloc 函数申请内存的时候,实际上申请的是虚拟内存,此时并不会分配物理内存。 当应用程序读写了这块虚拟内存,CPU 就会去访问这个虚拟内存, 这时会发现这个虚拟内存没有映射到物理内存&…

系统安全及应用

目录 一、账号安全控制 1)系统账号清理 2)密码安全控制 chage命令 示例 3)命令历史限制 4)终端自动注销 总结 账号安全 密码安全 二、系统引导和登录控制 1)使用su命令切换用户 用途及用…

【学习笔记】字节数据和字节字符串(b“ “)那些事

文章目录 0 前言1 先来看看C语言中怎么处理这种字节数据1.1 使用总结 2 再来看看Python当中是怎么处理字节数据的 0 前言 最近在尝试用PyQt做一个上位机,遇到很多关于字节字符串的问题,这里简单总结几个关键点。 1 先来看看C语言中怎么处理这种字节数据…

RK3568平台开发系列讲解(Linux系统篇)共享内存的创建和映射过程

🚀返回专栏总目录 文章目录 一、共享内存的创建和映射过程流程梳理二、如何创建共享内存?三、如何将共享内存映射到虚拟地址空间?沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇我们一起学习下共享内存的创建和映射过程。 一、共享内存的创建和映射过程流程梳…

推荐5款重度电脑用户也未必知道的小众软件

作为一个重度电脑用户,你可能会经常使用一些软件来完成各种任务和娱乐。但是你知道有哪些好用的WIN10软件吗?今天我就为你介绍一下我推荐的五款WIN10软件,它们分别是: 1.反恶意软件——Malwarebytes Malwarebytes是一款专业的反…

AOP与SpringBoot使用AOP实例

AOP:Aspect Oriented Programming(面向切面编程、面向方面编程),其实就是面向特定方法编程。 动态代理是面向切面编程最主流的实现。而SpringAOP是Spring框架的高级技术,旨在管理bean对象的过程中,主要通过…

浅谈thrift协议+举例通用mockserver如何实现

目录 简单来说: 举个例子: 简单来说: 1)是一个跨平台跨语言的通信协议,定义和创建跨语言服务,是一个高性能、轻量级RPC框架。 2)开发者无需关注不同语言/相同语言服务间如何通信,…

基于html+css的图片展示15

准备项目 项目开发工具 Visual Studio Code 1.44.2 版本: 1.44.2 提交: ff915844119ce9485abfe8aa9076ec76b5300ddd 日期: 2020-04-16T16:36:23.138Z Electron: 7.1.11 Chrome: 78.0.3904.130 Node.js: 12.8.1 V8: 7.8.279.23-electron.0 OS: Windows_NT x64 10.0.19044 项目…

什么是Python?

目录 Python简介 Python发展史 Python优点与缺点 Python现如今的应用领域 总结 Python简介 Python是一种高级、解释型、面向对象的动态编程语言,由Guido van Rossum在1989年创建,首次发布于1991年。 Python设计的哲学是优雅、清晰、简单&#xff0c…

Windows逆向安全(一)之基础知识(八)

if else嵌套 这次来研究if else嵌套在汇编中的表现形式,本次以获取三个数中最大的数这个函数为例子,分析if else的汇编形式 求三个数中的最大值 首先贴上代码: #include "stdafx.h"int result0; int getMax(int i,int j,int k)…

机器视觉工程师必须知道机器视觉精度要思考哪些

​在和客户交流项目技术要求,这个项目,我要求的精度是0.01mm? 第一个问题:什么是精度? 精度要求0.01mm: 1.视觉重复性极差?静态?动态? 2.视觉与第三方相关性差异?极差?相关性系数? 3.整体系统误差?机械重复性误差? 4.产品尺寸公差? 第二个问题:精度与公差…

完美解决丨TypeError: fun() takes 2 positional arguments but 3 were given

python def fun(a, b): return a b c fun(1, 2, 3) Traceback (most recent call last): File "test.py", line 5, in <module c fun(1, 2, 3) TypeError: fun() takes 2 positional arguments but 3 were given 上面的代码中&#xff0c; fun 函数定义了两…

最通俗的语言解释01背包问题(力扣416题javascript版本)

【声明】以下内容参考了代码随想录&#xff0c;不用作商业用途~ 先来看一个场景&#xff1a;有N件物品&#xff0c;背包最大的重量为W&#xff0c;第i件物品的重量为weight[i]&#xff0c;得到的价值为value[i]&#xff0c;每件物品只用一次&#xff08;即不能重复放进背包&…

【计算方法】正交区域查询---KD-Tree概念

一、说明 kd 树是一种二叉树数据结构&#xff0c;可以用来进行高效的 kNN 计算。kd 树算法偏于复杂&#xff0c;本篇将先介绍以二叉树的形式来记录和索引空间的思路&#xff0c;以便读者更轻松地理解 kd 树。 二、正交区域查找 2.1 定义 对于k维空间的张量数据表格&#xff0…

Unity RenderStreaming 云渲染3.1.0-exp.6 食用手册

Unity云渲染 &#x1f957;资源&#x1f364;兼容性&#x1f367;手机端连接&#x1f969;安装方法&#x1f35b;IP端口设置&#x1f371;官方案例尝鲜&#x1f332;导入案例&#x1f332;添加场景&#x1f332;启动WebApp&#x1f332;打开Menu场景&#x1f332;连接参数设置&…

300左右蓝牙耳机推荐哪个好?300元左右最好的蓝牙耳机

蓝牙耳机如今在我们的生活中太普遍了&#xff0c;记得疫情刚开始天天要戴口罩&#xff0c;口罩的绳子和耳机线相缠十分的不方便&#xff0c;所以更多的人选择蓝牙耳机&#xff0c;下面整理了几款300元左右的蓝牙耳机品牌。 一、南卡小音舱Lite2蓝牙耳机 售价&#xff08;&…

4-数据结构

数据结构&#xff08;data structure&#xff09; 1. 简介 数据结构是在计算机中组织与存储数据的方式 如果想要表示“一排数字”&#xff0c;自然想到使用「数组」数据结构 数组的存储方式可以表示数字的相邻关系、顺序关系&#xff0c;但至于其中存储的是整数int&#xff0c…

【数据结构:线性表】顺序表

⚡线性表 线性表&#xff08;linear list&#xff09;是n个具有相同特性的数据元素的有限序列。 线性表是一种在实际中广泛使 用的数据结构&#xff0c;常见的线性表&#xff1a;顺序表、链表、栈、队列、字符串... 线性表在逻辑上是线性结构&#xff0c;也就说是连续的一条直…

网络请求实战-RESTFUL约定和Postman工具

RESTFUL协议 表现层状态转化&#xff08;Representational state transfer&#xff09; 资源、表示和转换 资源&#xff08;Resource&#xff09; 服务端的一个资源 拥有URL 表示&#xff08;Representation&#xff09; 服务端的资源在客户端的表示 客户端拥有操作服务…

three.js之scene

THREE.Scene对象有时被称为场景图&#xff0c;可以用来保存所有图形场景的必要信息。在Three.js中&#xff0c;这意味着THREE.Scene保存所有对象、光源和渲染所需的其他对象。 本节主要是构建一个基本场景&#xff0c;然后可以通过gui添加&#xff0c;删除场景里的对象等。 效果…