数据同步MySQL -> Elasticsearch

news2024/11/13 9:50:30

大家好我是苏麟,今天聊聊数据同步 .

数据同步

一般情况下,如果做查询搜索功能,使用 ES 来模糊搜索,但是数据是存放在数据库 MySQL 里的,所以说我们需要把 MySQL 中的数据和 ES 进行同步,保证数据一致(以 MySQL 为主)。

MySQL =>ES(单向)

同步方式

首次安装完 ES,把 MySQL 数据全量同步到 ES 里,写一个单次脚本 4 种方式 , 全量同步(首次)+增量同步(新数据)

1.定时任务 (推荐 : 简单)

定时任务 : 比如1分钟1次,找到 MySQL 中过去几分钟内(至少是定时周期的2 倍)发生改变的数据,然后更新到 ES.

  • 优点:简单易懂、占用资源少、不用引入第三方中间件
  • 缺点:有时间差
  • 应用场景:数据短时间内不同步影响不大、或者数据几乎不发生修改
2.双写 

双写 : 写数据的时候,必须也去写 ES;更新删除数据库同理。(事务:建议先保证 MySQL写成功,如果ES 写失败了,可以通过定时任务 + 日志 +告警进行检测和修复(补偿))

  • 优点 : 不知道
  • 缺点 : 繁琐
3. Logstash

用 Logstash 数据同步管道 (一般要配合 kafka 消息队列 + beats 采集器)

  • 优点 : 用起来方便,插件多
  • 缺点 : 成本更大 : 一般要配合其他组件使用 (比如 kafka) , 维护成本 : 多维护一个组件 , 学习成本 : 学习使用
4.Canal (推荐 : 简单 , 实时性非常强)

Canal 监听 MySQL Binlog,实时同步

  • 优点 : 实时同步 , 实时性非常强
  • 缺点 :  忽略不计   (MySQL8版本可能连接失败)

定时任务

找到 MySQL 中过去几分钟内发生改变的数据,然后更新到 ES.

双向写入

写数据的时候,也去写 ES .

这个不推荐!

Logstash

传输 处理 数据的管道

文章 : Getting Started with Logstash | Logstash Reference [7.17] | Elastic

下载 : https://artifacts.elastic.co/downloads/logstash/logstash-7.17.9-windows-x86_64.zip

快速开始 : Running Logstash on Windows | Logstash Reference [7.17] | Elastic 

这里需要学习成本 , 有兴趣的小伙伴自己了解 , 这里不过多赘述 .

订阅数据库流水的同步方式 Canal

地址 : GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件

原理 : 数据库每次修改时,会修改 binlog 文件,只要监听该文件的修改,就能第一时间得到消息并处理canal : 帮你监听 binlog,并解析 binlog 为你可以理解的内容。 

它伪装成了 MySQL 的从节点,获取主节点给的 binlog,如图:

快速开始 : QuickStart · alibaba/canal Wiki · GitHub

windows 系统,找到你本地的 mysql 安装目录,在根目录下新建 my.ini 文件:

Linux 系统,找到你本地的 mysql 安装目录,在根目录下新建 my.cnf 文件:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权 canal 链接 MySQL 账号具有作为 MSQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

这里有个报错 java 找不到,修改 startup.bat 脚本为你自己的 java home

set JAVA_HOME=C:\Users\59278\.jdks\corretto-1.8.0_302 (自己MySQL路径)

set PATH=%JAVA_HOME%\bin;%PATH%

Java 中引入依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>

Demo  

import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {


public static void main(String args[]) {
    // 创建链接
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                                                                                        11111), "example", "", "");
    int batchSize = 1000;
    int emptyCount = 0;
    try {
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();
        int totalEmptyCount = 120;
        while (emptyCount < totalEmptyCount) {
            Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                emptyCount++;
                System.out.println("empty count : " + emptyCount);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            } else {
                emptyCount = 0;
                // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                printEntry(message.getEntries());
            }

            connector.ack(batchId); // 提交确认
            // connector.rollback(batchId); // 处理失败, 回滚数据
        }

        System.out.println("empty too many times, exit");
    } finally {
        connector.disconnect();
    }
}

private static void printEntry(List<Entry> entrys) {
    for (Entry entry : entrys) {
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
            continue;
        }

        RowChange rowChage = null;
        try {
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        }

        EventType eventType = rowChage.getEventType();
        System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                System.out.println("-------&gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("-------&gt; after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }
}

private static void printColumn(List<Column> columns) {
    for (Column column : columns) {
        System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
    }
}

    
  

}

在启动之后 修改MySQL中数据 , java控制台就能实时输出 .

这期就到这里 , 下期见 !

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1468371.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

力扣645. 错误的集合(排序,哈希表)

Problem: 645. 错误的集合 文章目录 题目描述思路复杂度Code 题目描述 思路 1.排序 1.对nums数组按从小到大的顺序排序; 2.遍历数组时若判断两个相邻的元素则找到重复元素&#xff1b; 3.记录一个整形变量prev一次置换当前位置元素并与其作差&#xff0c;若差等于2着说明缺失的…

【vue】如何打开别人编译后的vue项目

文件结构如下&#xff0c;编译后的文件放在dist中。 dist的文件结构大约如下&#xff0c;文件名称随项目 1.新建app.js文件 const express require(express);const app express();const port 8080;app.use(express.static(dist));app.listen(port, () > console.log); …

Bluesky数据采集框架-1

Bluesky是一个用于实验控制和科学数据和元数据采集的库。它强调以下特点&#xff1a; 1、实时&#xff0c;流式数据&#xff1a;可用于嵌入可视化和处理。 2、丰富元数据&#xff1a;获取和组织来方便复制性和可检索性。 3、实验通用性&#xff1a;对完全不同的硬件无缝地重…

c#高级——插件开发

案例&#xff1a;WinForm计算器插件开发 1.建立插件库&#xff0c;设置各种自己所需的插件组件 如下图所示&#xff1a;进行了计算器的加减法插件计算组件 Calculator_DLL为总插件父类 Calculator_DLL_ADD 为插件子类的控件对象 Calculator_DLL_Sub Calculator_DLL_Factory 为…

JavaSec 基础之 XXE

文章目录 XMLReaderSAXReaderSAXBuilderDocumentBuilderUnmarshaller**SAXParserFactory**XMLReaderFactoryDigester总结 XMLReader public String XMLReader(RequestBody String content) {try {XMLReader xmlReader XMLReaderFactory.createXMLReader();// 修复&#xff1a…

ESP8266智能家居(4)——开发APP基础篇

1.前期准备 安装好Android studio 开发环境 准备一台完好的安卓手机 手机要处于开发者模式 设置 --->关于手机---> 一直点击版本号 &#xff08;不同手机进入开发者模式的步骤可能不太一样&#xff09; 进入开发者模式后&#xff0c;找到辅助功能&#xff0c;打开开…

2.22日学习打卡----正则表达式

2.22日学习打卡 目录&#xff1a; 2.22日学习打卡正则表达式什么是正则表达式&#xff1f;正则表达式的作用正则表达式特点基础语法表格元字符Java 中正则表达式的使用正则表达式语法规则内容限定单个字符限定范围字符限定取反限定 长度限定长度限定符号预定义字符正则表达式的…

Qt的QFileSystemModel与QTreeView、QTableView、QListView的组合使用

1.相关描述 QFileSystemModel与QTreeView、QTableView、QListView的组合&#xff0c;当QTreeView点击发生改变&#xff0c;QTableView和QListView也会发生变化 2.相关界面 3.相关代码 mainwindow.cpp #include "mainwindow.h" #include "ui_mainwindow.h"…

微服务-微服务Spring Security6实战

1. Spring Security介绍 1.1 Spring Security定义 Spring Security 是一个能够为基于 Spring 的企业应用系统提供声明式的安全访问控制解决方案的安全框 架。 Spring Security 主要实现了 Authentication &#xff08;认证&#xff0c;解决 who are you? &#xff09; 和…

跳槽前应该做好哪些准备?

第一次求职也好&#xff0c;还是换工作也罢&#xff0c;都需要有严谨的考虑。对于已经工作上班的朋友来说&#xff0c;切不可轻易地辞掉工作&#xff0c;想要跳槽&#xff0c;一定要三思而后行&#xff0c;有一个周密的部署。跳槽有好处&#xff0c;也有弊端&#xff0c;频繁的…

黄仁勋最新专访:机器人基础模型可能即将出现,新一代GPU性能超乎想象

最近&#xff0c;《连线》的记者采访了英伟达CEO黄仁勋。 记者表示&#xff0c;与Jensen Huang交流应该带有警告标签&#xff0c;因为这位Nvidia首席执行官对人工智能的发展方向如此投入&#xff0c;以至于在经过近 90 分钟的热烈交谈后&#xff0c;我&#xff08;指代本采访的…

手机单目相机内参标定

使用软件&#xff1a; 参考我之前的文章&#xff1a; 软件地址:https://github.com/DavidGillsjo/VideoIMUCapture-Android/releases 棋盘标定板下载 链接: https://pan.baidu.com/s/1wiPJsEf87Vc0D7KwJnt3GA?pwd1234 提取码: 1234 过程 1.使用下载的软件录制一段视频&am…

重学Java 18.学生管理系统项目

臣无祖母&#xff0c;无以至今日&#xff0c;祖母无臣&#xff0c;无以终余年 母孙二人&#xff0c;更相为命&#xff0c;是以区区不能废远 —— 陈情表.李密 —— 24.2.20 一、编写JavaBean public class Student {//学号private int id;//姓名private String name;//年龄pr…

MySQL死锁产生的原因和解决方法

一.什么是死锁 要想知道MYSQL死锁产生的原因,就要知道什么是死锁?在了解什么是死锁之前,先来看一个概念:线程安全问题 1.线程安全问题 1.1什么是线程安全问题 线程安全问题&#xff0c;指的是在多线程环境当中&#xff0c;线程并发访问某个资源&#xff0c;从而导致的原子性&a…

module ‘json‘ has no attribute ‘dumps‘

如果在使用Python的json模块时遇到AttributeError: module json has no attribute dumps错误&#xff0c;通常是因为在Python环境中json模块不支持dumps方法。这种情况可能是因为Python的json模块被重命名或修改过导致的。 解决方法可以尝试以下几种&#xff1a; 1.检查Pytho…

围剿尚未终止 库迪深陷瑞幸9.9阳谋

文|智能相对论 作者|霖霖 总能被“累了困了”的打工人优先pick的咖啡&#xff0c;刚复工就顺利站上话题C位。 #瑞幸9.9元一杯活动缩水#的话题才爬上新浪微博热搜&#xff0c;“库迪咖啡河北分公司运营总监带头坑害河北联营商”的实名举报帖就出现在了小红书&#xff0c;一时…

复旦大学MBA聚劲联合会:洞见智慧,拓宽思维格局及国际化视野

12月2日&#xff0c;“焕拥时代 俱创未来”聚劲联合会俱创会年度盛典暨俱乐部募新仪式圆满收官。16家复旦MBA俱乐部、200余名同学、校友、各界同仁齐聚复旦管院&#xff0c;一起在精彩纷呈的圆桌论坛里激荡思想&#xff0c;在活力四射的俱乐部风采展示中凝聚力量。      以…

第四十二回 假李逵翦径劫单身 黑旋风沂岭杀四虎-python读写csv和json数据

李逵答应了宋江三件事&#xff1a;不可吃酒&#xff0c;独自前行&#xff0c;不带板斧。李逵痛快答应了&#xff0c;挎一口腰刀&#xff0c;提着朴刀&#xff0c;带了一锭大银子&#xff0c;三五个小银子就下山去了。 宋江放心不下&#xff0c;于是请同乡朱贵也回家一趟&#…

Mybatis总结--传参

MyBatis 传递参数&#xff1a;从 java 代码中把参数传递到 mapper.xml 文件 六、一个简单参数&#xff1a; Dao 接口中方法的参数只有一个简单类型&#xff08; java 基本类型和 String &#xff09;&#xff0c; 占位符 #{ 任意字符 } &#xff0c;和方法的参数名无关…

浅谈SpringMVC

什么是MVC模式 MVC&#xff1a;MVC是一种设计模式 MVC的原理图&#xff1a; 分析&#xff1a; 1&#xff1a;M-Model 模型&#xff08;完成业务逻辑&#xff1a;有javaBean构成&#xff0c;servicedaoentity&#xff09; 2&#xff1a;V-View 视图&#xff08;做界面的展示…