ElasticSearch与MySQL如何进行数据同步?

news2024/10/23 18:44:44

ElasticSearch(ES)与MySQL进行数据同步的需求在实际开发中非常常见,尤其是在需要高效的全文搜索或者数据分析时,通常使用MySQL作为事务数据库,ES作为搜索和查询引擎。那么要实现MySQL与ElasticSearch的数据同步,可以采取多种方式。

常见的同步方式

  1. 手动同步
    • 在每次对MySQL进行增删改操作时,手动将数据更新到ElasticSearch。这种方法适用于小型项目,但在数据量大和频繁更新的场景下不太适用。
  2. 定时同步(全量/增量同步)
    • 定期从MySQL拉取数据(全量或增量),然后将数据同步到ElasticSearch中。例如,使用定时任务每隔一段时间执行同步。
  3. 使用数据库的增量日志(Binlog)进行同步
    • 通过捕获MySQL的Binlog(增量日志),当MySQL的数据发生变化时,实时同步到ElasticSearch。这种方式更加实时,且不需要定时全量更新。

具体的实现方案

方案一:基于消息队列的同步方案
  1. 数据写入MySQL时,发送同步消息到消息队列

    • 当应用向MySQL写入数据时,同时将数据变动的消息发送到消息队列(如RabbitMQ、Kafka等)。
  2. 消费者监听消息并同步数据到ElasticSearch

    • 消费者监听消息队列的变动消息,将数据同步到ElasticSearch。

优点

  • 保证实时性。
  • 能够处理高并发。

实现步骤

  • 应用在插入、更新或删除数据时,发送操作类型(如CREATEUPDATEDELETE)和数据内容到消息队列。
  • 消息消费者从队列中读取消息,根据操作类型将数据插入、更新或删除到ElasticSearch中。

示例代码

1. MySQL插入操作发送消息

// 保存数据到MySQL
orderMapper.insert(order);

// 发送消息到消息队列(以RabbitMQ为例)
rabbitTemplate.convertAndSend("orderSyncQueue", order);

2. 消费者同步数据到ElasticSearch

@RabbitListener(queues = "orderSyncQueue")
public void syncOrderToES(Order order) {
    // 判断操作类型,插入或更新ES中的数据
    IndexRequest indexRequest = new IndexRequest("orders").id(order.getId().toString()).source(order);
    elasticsearchClient.index(indexRequest, RequestOptions.DEFAULT);
}
zhuy
方案二:基于Binlog的实时同步方案

MySQL的Binlog记录了所有的增删改操作,通过解析这些日志,可以实时获取数据变动情况,并同步到ElasticSearch中。

1. 使用Canal进行同步

Canal 是阿里巴巴开源的一个MySQL binlog增量订阅&消费组件,可以用于实时地捕获MySQL的Binlog并同步数据到ElasticSearch。

步骤

  1. 启动MySQL的Binlog功能

    • 在MySQL中开启Binlog功能,并配置server_id(唯一标识),确保MySQL能够产生Binlog。
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

      2. 安装并配置Canal

        安装Canal,配置MySQL的连接信息。

        配置Canal去监听MySQL的表,捕获到数据变动时,获取Binlog日志并解析出增删改的操作。

     3.编写消费者逻辑

        当Canal捕获到数据变化时,将相应的数据同步到ElasticSearch。

示例代码:

配置Canal监听MySQL

canal:
  instance:
    dbUsername: root
    dbPassword: password
    dbHost: localhost
    dbPort: 3306
    dbName: order_db
    table: orders

捕获MySQL的Binlog变化

@EventListener
public void onOrderBinlogChange(CanalEntry.Entry entry) {
    List<CanalEntry.RowData> rowDatasList = entry.getRowChange().getRowDatasList();
    for (CanalEntry.RowData rowData : rowDatasList) {
        if (entry.getEventType() == CanalEntry.EventType.INSERT) {
            // 插入操作
            syncInsertToES(rowData.getAfterColumnsList());
        } else if (entry.getEventType() == CanalEntry.EventType.UPDATE) {
            // 更新操作
            syncUpdateToES(rowData.getAfterColumnsList());
        } else if (entry.getEventType() == CanalEntry.EventType.DELETE) {
            // 删除操作
            syncDeleteFromES(rowData.getBeforeColumnsList());
        }
    }
}

将数据同步到ElasticSearch

private void syncInsertToES(List<CanalEntry.Column> columns) {
    // 将MySQL数据转换成ES文档格式,并插入ElasticSearch
    IndexRequest indexRequest = new IndexRequest("orders").id(getColumnValue(columns, "id")).source(columnsToMap(columns));
    elasticsearchClient.index(indexRequest, RequestOptions.DEFAULT);
}

private void syncUpdateToES(List<CanalEntry.Column> columns) {
    // 更新ElasticSearch中的数据
    UpdateRequest updateRequest = new UpdateRequest("orders", getColumnValue(columns, "id")).doc(columnsToMap(columns));
    elasticsearchClient.update(updateRequest, RequestOptions.DEFAULT);
}

private void syncDeleteFromES(List<CanalEntry.Column> columns) {
    // 删除ElasticSearch中的数据
    DeleteRequest deleteRequest = new DeleteRequest("orders", getColumnValue(columns, "id"));
    elasticsearchClient.delete(deleteRequest, RequestOptions.DEFAULT);
}

注意:

在使用消息中间件(如RabbitMQ、Kafka)实现数据同步时,消息的发送是主动的,由应用程序或服务在执行增删改操作时,主动将消息发送到消息队列。而消息队列本身并不具备监听数据库变化的功能,它的角色是用来存储和传递消息,消息的生产和消费逻辑需要在应用程序中实现。

消息发送的流程:

  1. 生产者(业务逻辑层)主动发送消息: 当应用程序执行数据库的增、删、改操作时,需要主动地将这些操作的信息发送到消息队列中。这通常是在业务代码中,在操作数据库的同时添加发送消息的逻辑。例如,新增一条记录后,会主动发送一个"新增"的消息到队列中。

  2. 消息队列(MQ)接收消息: 消息队列(如RabbitMQ、Kafka)会接收生产者发送的消息,将消息存储在队列中,并根据配置将消息推送给消费者或等待消费者主动拉取。

  3. 消费者监听队列并处理消息: 消费者服务通过监听指定的队列来接收消息,接收到消息后,消费者根据消息类型(新增、修改、删除)来执行相应的操作,比如同步到ElasticSearch或进行其他数据处理操作。

2. 使用Debezium进行同步

Debezium 是一个开源的CDC(Change Data Capture)平台,也可以实时监听MySQL的变化并将数据同步到其他存储系统,包括ElasticSearch。

步骤

        1.安装并配置Debezium连接MySQL。

        2.配置监听的表以及变动捕获逻辑。

        3.实现数据同步逻辑,将数据变动同步到ElasticSearch。

总结

  1. 消息队列同步方案:适用于数据操作频繁的场景,能够保证高并发时的系统稳定性和实时性,常用RabbitMQ或Kafka等消息队列实现。

  2. Binlog同步方案:基于Canal或Debezium的同步可以实现更为实时的同步,能够捕获数据库级别的所有数据变化。Binlog方式不依赖应用层代码改动,适合于对MySQL增删改同步要求较高的场景。

  3. 定时同步方案:适用于不需要实时同步的场景,通过定时任务进行批量同步

不同方案各有优缺点,根据具体项目需求选择合适的同步方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2221789.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

每天花2分钟学数字化转型,第四讲:数字化转型

一文看懂&#xff1a;数字化转型是什么&#xff1f;以及数字化转型的根本任务与核心路径。 定义&#xff1a;数字化是人类社会的进化&#xff0c;绝不仅仅是一个企业的问题&#xff0c;也不是某一项技术的问题&#xff0c;而是时代的变迁。数字化转型指的是从当前信息化环境下…

RabbitMQ系列学习笔记(三)--工作队列模式

文章目录 一、工作队列模式原理二、工作队列模式实战1、抽取工具类2、消费者代码3、生产者代码4、查看运行结果 本文参考 尚硅谷RabbitMQ教程丨快速掌握MQ消息中间件rabbitmq RabbitMQ 详解 Centos7环境安装Erlang、RabbitMQ详细过程(配图) 一、工作队列模式原理 与简单模式相…

企业级 接口自动化测试框架:Pytest+Allure+Excel

1. Allure 简介 简介 Allure 框架是一个灵活的、轻量级的、支持多语言的测试报告工具&#xff0c;它不仅以 Web 的方式展示了简介的测试结果&#xff0c;而且允许参与开发过程的每个人可以从日常执行的测试中&#xff0c;最大限度地提取有用信息。 Allure 是由 Java 语言开发…

MySQL 【日期】函数大全(七)

目录 1、UNIX_TIMESTAMP() 将指定的日期/日期时间转为 UNIX 时间戳值。 2、WEEK() 返回给定日期位于当年的第几周。 3、WEEKDAY() 返回给定日期的工作日编号。 4、WEEKOFYEAR() 返回给定日期位于当年的第几周 5、YEAR() 提取日期的年份部分并作为数字返回。 6、YEARWEEK()…

Jmeter 实战 JDBC配置

​ JDBC JDBC&#xff08;Java Database Connectivity&#xff09;是一种用于执行SQL语句的Java API。通过这个API&#xff0c;可以直接连接并执行SQL脚本&#xff0c;与数据库进行交互。 使用JMeter压力测试时&#xff0c;操作数据库的场景 在使用JMeter进行接口压力测试时…

Gin 协程mysql客户端

一、Gin框架 mysql配置 这里选择yaml文件配置 二、配置读取 viper 读取yaml文件中对应配置 三、mysql 的协程客户端 文件位置 package databaseimport ("database/sql""fmt""github.com/spf13/viper""log""net/http"&quo…

JavaWeb 25.Vite

目录 一、Vite的介绍 二、Vite创建Vue3工程化项目 ViteVue3项目的创建、启动、停止 创建 启动 停止 干净感来源于对自我的驯服 —— 24.10.23 一、Vite的介绍 在浏览器支持 ES 模块之前&#xff0c;JavaScript 并没有提供原生机制让开发者以模块化的方式进行开发。这也正是我们…

Missing classes detected while running R8报错解决方案

Android 打包release版本时报错如下&#xff1a; > Task :printlib:minifyReleaseWithR8 FAILED AGPBI: {"kind":"error","text":"Missing classes detected while running R8. Please add the missing classes or apply additional ke…

canvas-editor首行缩进

canvas-editor中渲染部分的源码都在Draw.ts里&#xff0c;能找到computeRowList方法中并没有实现首行缩进相关的逻辑&#xff0c;但是实现了element.type ElementType.TAB的缩进&#xff0c;如图&#xff1a; 因此我们可以基于tab进行首行缩进的逻辑编写&#xff0c;在main.ts…

通过DevTools逃离Chrome沙盒(CVE-2024-6778和CVE-2024-5836)

介绍 这篇博文详细介绍了如何发现CVE-2024-6778和CVE-2024-5836的&#xff0c;这是Chromium web浏览器中的漏洞&#xff0c;允许从浏览器扩展&#xff08;带有一点点用户交互&#xff09;中进行沙盒逃逸。 简而言之&#xff0c;这些漏洞允许恶意的Chrome扩展在你的电脑上运行…

2015年-2017年 计算机技术专业 程序设计题(算法题)实战_c语言程序设计数据结构程序设计分析

文章目录 20151.C语言算法设计部分2.数据结构算法设计部分 20161.C语言算法设计部分2.数据结构算法设计部分 2017年1. C语言算法设计部分2.数据结构算法设计部分 2015 1.C语言算法设计部分 int total(int n) {if(n1) return 1;return total(n-1)n1; } //主函数测试代码已省略…

Android 15 推出新安全功能以保护敏感数据

Android 15 带来了增强的安全功能&#xff0c;可保护您的敏感健康、财务和个人数据免遭盗窃和欺诈。 它还为大屏幕设备带来了生产力改进&#xff0c;并对相机、消息和密钥等应用进行了更新。 Android 防盗保护 Google 开发并严格测试了一套全面的功能&#xff0c;以在盗窃之…

Ubuntu22.04 制作系统ISO镜像

第一步&#xff1a;安装软件-Systemback 1.如果已经添加过ppa&#xff0c;可以删除重新添加或者跳过此步 sudo add-apt-repository --remove ppa:nemh/systemback 2.添加ppa 我是ubuntu20&#xff0c;但这个软件最后支持的是 ubuntu16.04版本&#xff0c;所以加一个16版本…

【Docker】Harbor 私有仓库和管理

目录 一、搭建本地私有仓库 二、harbor简介&#xff08;特性、构成、架构的数据流向&#xff09; 2.1 什么是Harbor 2.2 Harbor的特性 2.3 Harbor的构成 2.4 Harbor的工作原理&#xff08;运行流程&#xff09; 三、harbor部署以及配置文件 1. 部署 Docker-Compose 服…

2010年国赛高教杯数学建模B题上海世博会影响力的定量评估解题全过程文档及程序

2010年国赛高教杯数学建模 B题 上海世博会影响力的定量评估 2010年上海世博会是首次在中国举办的世界博览会。从1851年伦敦的“万国工业博览会”开始&#xff0c;世博会正日益成为各国人民交流历史文化、展示科技成果、体现合作精神、展望未来发展等的重要舞台。请你们选择感兴…

Hadoop生态圈三大组件:HDFS的读写流程、MapReduce计算流程、Yarn资源调度

文章目录 1. HDFS的读写流程1.1 HDFS读流程1.2 HDFS写流程 2. MapReduce计算流程3. Yarn资源调度一、客户端请求资源二、Resource Manager处理请求三、任务资源计算与申请四、Resource Manager分配资源五、Node Manager执行任务六、任务执行与监控 1. HDFS的读写流程 1.1 HDFS…

C++ 中的友元(Friend)用法详解

什么是友元&#xff08;Friend&#xff09;&#xff1f;&#x1f46d; 友元 (C) | Microsoft Learn 在C中&#xff0c;友元&#xff08;Friend&#xff09;是一种机制&#xff0c;允许外部函数或类访问某个类的私有&#xff08;private&#xff09;或保护&#xff08;protecte…

[Unity Demo]从零开始制作空洞骑士Hollow Knight第十五集:制作更多地图,更多敌人,更多可交互对象

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、第一个代表性场景 1.制作更多敌人2.制作更多可交互对象二、第二个代表性场景 1.制作更多敌人2.制作更多可交互对象三、第三个代表性场景 1.制作更多敌人2.制…

【计算机网络 - 基础问题】每日 3 题(五十二)

✍个人博客&#xff1a;https://blog.csdn.net/Newin2020?typeblog &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/fYaBd &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 C 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞…

Solr5.5.0单机部署

Solr5.5.0集成Tomcat8部署 1、 准备条件 JDK1.7以上 Solr5.5.0部署包&#xff08;solr-5.5.0.zip&#xff09; Tomcat8部署包 &#xff08;apache-tomcat-8.5.16.tar.gz&#xff09; 上传Solr5.5.0和tomcat8 部署包并解压使用 2、准备部署 将./solr-5.5.0/server/solr-we…