Elastic Search(ES)Java 入门实操(3)数据同步

news2024/12/28 21:56:45

基本概念和数据查询代码:

Elastic Search (ES)Java 入门实操(1)下载安装、概念-CSDN博客

Elastic Search(ES)Java 入门实操(2)搜索代码-CSDN博客

想要使用 ES 来查询数据,首先得要 ES 里有数据,但是如果是后来引入的 ES,数据库上万条的数据肯定不能通过手动进行同步,需要使用其他方法进行同步。

数据同步分为全量同步和增量同步。

所谓全量同步,就是引入 ES 时将 MySQL 里的数据全部同步到 ES 里。增量同步就是当数据库的数据发生变化时,将变化的数据同步到 ES 里。

同步方法

定时任务

通过定时任务的方式,每隔一段时间进行同步。比如每一分钟同步一次。

优点:简单,占用资源少,不用引入第三方中间件

缺点:有时间差,数据一致性要求高的场景不适用

全量同步通过实现 CommandLineRunner 接口,在程序启动时执行。

/**
 * CommandLineRunner 接口,当spring启动时就执行方法
 */
@Component
public class FullSycnToEs implements CommandLineRunner {

    @Resource
    private ArticleService articleService;

    @Resource
    private ArticleEsDao articleEsDao;
    @Override
    public void run(String... args) throws Exception {
        //spring 启动就执行方法进行全量同步
        //1.从MySQL获取数据
        List<Article> articleList = articleService.list();
        if(CollectionUtils.isEmpty(articleList)){
            return;
        }
        //2.将数据转换为DTO
        List<ArticleEsDto> articleDtoList = articleList.stream().map(ArticleEsDto::toDto).collect(Collectors.toList());
        //3.将数据同步到ES
        articleEsDao.saveAll(articleDtoList);
        System.out.println("全量同步完成");
    }
}

增量同步使用 @ Scheduled 定时任务监控更新时间

注意启动类要加上注解 @EnableScheduling

/**
 * 定时任务执行数据同步
 */
@Component
public class InSyncToEs {


    @Resource
    private ArticleMapper articleMapper;

    @Resource
    private ArticleEsDao articleEsDao;

    @Scheduled(fixedRate = 100)
    public void run(){
        // 定时任务,将数据同步到es,根据更新时间来判断
        //假定3分钟内,如果更新时间大于3分钟之前的时间,就是更新了,获取这个数据存入到ES 中
        Date minUpdateTime = new Date(new Date().getTime() - 5* 60*1000L);
        List<Article> newArticles = articleMapper.getNewArticles(minUpdateTime);
        //判断是否有数据更新
        if(CollectionUtils.isEmpty(newArticles)){
            //没有数据更新
            System.out.println("没有数据更新");
            return;
        }
        //有数据更新,将数据转换成dto格式
        List<ArticleEsDto> articleEsDtoList = newArticles.stream().map(ArticleEsDto::toDto).collect(Collectors.toList());
        //将数据存入到ES中
        articleEsDao.saveAll(articleEsDtoList);
        System.out.println("数据同步完成");
    }
}

双写

写入数据库时同时同步到 ES 中,需要考虑 ES 同步失败了怎么办。

使用事务来保证一致性,如果 ES 同步失败了,可以通过定时任务 + 日志 + 告警进行检测和修复(补偿)

Logstash 数据同步管道

传输和处理数据的管道

下载地址:https://artifacts.elastic.co/downloads/logstash/logstash-7.17.21-windows-x86_64.zip

官方文档:Jdbc input plugin | Logstash Reference [7.17] | Elastic

同样的,需要注意版本,下载解压之后在 config 文件夹创建新的同步文件,建议不同的同步脚本创建不同的文件,不要在同一个文件下配置。

文件配置根据官方文档修改,MySQL jar包使用绝对路径即可,否则可能找不到 jar 包,jar 包可以自行准备,也可以从项目的 maven 仓库获取。

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  jdbc {
    // MySQL jar包路径
    jdbc_driver_library => "D:\software\logstash-7.17.21\config\mysql-connector-java-8.0.29.jar"
    // MySQL 驱动
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    // MySQL 连接地址
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    //账号密码
    jdbc_user => "root"
    jdbc_password => "1234"
    //动态 SQL
    statement => "SELECT * from article where 1=1"
    parameters => { "favorite_artist" => "Beethoven" }
    //定时执行,core 表达式
    schedule => "*/5 * * * * *"
  }
}

output {
    stdout { codec => rubydebug }
}

 配置好之后在 logstash 目录下执行下面的命令,完成初步从数据库获取数据

.\bin\logstash.bat -f .\config\my-task.conf

 成功获取数据

增量同步配置,使用 updateTime 来进行同步更新的数据。

完整 input 配置如下。

input {
  jdbc {
    jdbc_driver_library => "D:\software\logstash-7.17.21\config\mysql-connector-java-8.0.29.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    jdbc_user => "root"
    jdbc_password => "1234"
    // 动态查询语句,保证最后一条是最大的
    statement => "SELECT * from article where updateTime > :sql_last_value and updateTime < now() order by updateTime asc"
    // 查询参数的 hash,不用更改
    parameters => { "favorite_artist" => "Beethoven" }
    // 查询参数的类型,updatetime 是 timestamp 类型的
    tracking_column_type => "timestamp"
    // 查询参数
    tracking_column => "updatetime"
    // 设置为 true 时,将定义的查询参数值用作动态 SQL 中sql_last_value,false 时:sql_last_value 是上次查询时间
    use_column_value => true
    // 时区设置为上海,否则存在 8小时时差
    jdbc_default_timezone => "Asia/Shanghai"
    // core 表达式
    schedule => "*/5 * * * * *"
  }
}

配置好从 MySQL 获取的数据之后,就可以同步到 ES 中了。同样需要书写配置。

官方文档:Elasticsearch output plugin | Logstash Reference [7.17] | Elastic

output {
    stdout { codec => rubydebug }
    elasticsearch {
        //访问地址,就是本地 ES 端口
        hosts => "127.0.0.1:9200"
        // ES 索引
	    index =>"article_1"
        // 数据 id,从数据库获取
	    document_id => "%{id}"
    }

最终配置

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  jdbc {
    jdbc_driver_library => "D:\software\logstash-7.17.21\config\mysql-connector-java-8.0.29.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    jdbc_user => "root"
    jdbc_password => "1234"
    statement => "SELECT * from article where updateTime > :sql_last_value and updateTime < now() order by updateTime asc "
    parameters => { "favorite_artist" => "Beethoven" }
    tracking_column_type => "timestamp"
    tracking_column => "updatetime"
    use_column_value => true
    jdbc_default_timezone => "Asia/Shanghai"
    schedule => "*/5 * * * * *"
  }
}
// 筛选
filter{
	mutate{
        //重命名
		rename => {
			"updatetime" =>"updateTime"
			"createtime" => "createTime"
			"isdetele" => "isDetele"
		}
	}
}

output {
    stdout { codec => rubydebug }
    elasticsearch {
        hosts => "127.0.0.1:9200"
	index =>"article_1"
	document_id => "%{id}"
    }
}

同步成功! 

logstash 的优点:配置完成后使用比较方便,插件多

                缺点:要多维护组件,一般需要配合其他中间件,比如(kafka)

Canal

下载地址:Releases · alibaba/canal (github.com)

文档:QuickStart · alibaba/canal Wiki (github.com)

实时同步数据,通过监控 MySQL 的 binlog,当数据库发生修改时,会修改 binlog 文件,然后 canal 监听到就可以同步到 ES 中。

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,在 MySQL 目录下新建一个my.ini,配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant,直接在查询控制台执行如下命令

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

bin 目录下 startup 启动即可。

然后 Java 需要一个客户端,首先引入依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>

客户端代码

import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {


public static void main(String args[]) {
    // 创建链接
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                                                                                        11111), "example", "", "");
    int batchSize = 1000;
    int emptyCount = 0;
    try {
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();
        int totalEmptyCount = 120;
        while (emptyCount < totalEmptyCount) {
            Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                emptyCount++;
                System.out.println("empty count : " + emptyCount);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            } else {
                emptyCount = 0;
                // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                printEntry(message.getEntries());
            }

            connector.ack(batchId); // 提交确认
            // connector.rollback(batchId); // 处理失败, 回滚数据
        }

        System.out.println("empty too many times, exit");
    } finally {
        connector.disconnect();
    }
}

private static void printEntry(List<Entry> entrys) {
    for (Entry entry : entrys) {
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
            continue;
        }

        RowChange rowChage = null;
        try {
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        }

        EventType eventType = rowChage.getEventType();
        System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                System.out.println("-------&gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("-------&gt; after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }
}

private static void printColumn(List<Column> columns) {
    for (Column column : columns) {
        System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
    }
}

    
  

}

 

过程出现的问题

1. 在执行命令.\bin\logstash.bat -f .\config\my-task.conf  时报错

只需要更改 bin 目录下的 setup.bat 文件中的双引号去掉即可。 

2. canal 启动 报错

修改变量或者修改启动项

编辑 startup.bat,在文件中添加如下配置:

// 自己的 jdk 路径
set JAVA_HOME=C:\Users\p'b\.jdks\corretto-1.8.0_392
// 覆盖环境变量
set PATH=%JAVA_HOME%\bin;%PATH%

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1800675.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MAVEN:自定义模板Archetype的创建

目录 一、简介 二、具体步骤 三、 vscode通过模板创建项目 四、通过IDEA创建 一、简介 有时候MAVEN自带的模板库并不能满足我们创建项目的需求&#xff0c;为了能够快速创建项目&#xff0c;免去每次复杂的配置&#xff0c;所以我们需要自定义模板库&#xff0c;本次操作基于…

控制台输入javac命令输出的结果中的中文乱码解决方式

默认字符编码UTF-8无法解析中文。设置环境变量中 “JAVA_TOOL_OPTIONS” 的值为"UTF-8" 即可。 具体配置步骤&#xff1a; 桌面右键"我的电脑" --> 属性 高级系统设置 环境变量 用户变量中添加 JAVA_TOOL_OPTIONS 然后确定&#xff0c;保存即可。

Windows 更新根文件夹的修改时间

简介&#xff1a; Win10 系统不会根据深层目录文件更新主目录的修改时间. 一般解决办法是关闭 Winodws 搜索引擎。 win10文件夹不能自动更新了怎么办&#xff1f;_百度知道 本脚本通过递归遍历子目录和子文件&#xff0c;来更新根目录的时间。 使用内层目录和当前目录下的最新…

(杂交版)植物大战僵尸

1.为什么我老是闪退&#xff1f; 答&#xff1a;主页控制台把“后台运行”点开&#xff0c;尽量避免全屏就会好很多。 2.哪里下载&#xff1f; 答&#xff1a;夸克https://pan.quark.cn/s/973efb326f81 3.为啥我没有14个卡槽&#xff1f; 答&#xff1a;冒险没打&#xff0c;怪…

课时149:项目发布_基础知识_项目交付

1.1.1 项目交付 学习目标 这一节&#xff0c;我们从 基础知识、代码发布、小结 三个方面来学习 基础知识 简介 项目交付是一个涉及到多团队共同协作的事情&#xff0c;它包括 产品团队设计产品、研发团队开发产品、测试团队测试代码、运维团队发布代码和维护站点等工作。项…

运维 之 DNS域名解析

前言 我们每天打开的网站&#xff0c;他是如何来解析&#xff0c;并且我们怎么能得到网站的内容反馈的界面呢&#xff1f;那什么是DNS呢&#xff08;DNS&#xff08;DomainNameservice&#xff0c;域名服务&#xff0c;主要用于因特网上作为域名和IP地址相互映射&#xff09;那…

【Python】读取文件夹中所有excel文件拼接成一个excel表格 的方法

我们平常会遇到下载了一些Excel文件放在一个文件夹下&#xff0c;而这些Excel文件的格式都一样&#xff0c;这时候需要批量这些文件合并成一个excel 文件里。 在Python中&#xff0c;我们可以使用pandas库来读取文件夹中的所有Excel文件&#xff0c;并将它们拼接成一个Excel表…

【UE5 刺客信条动态地面复刻】实现无界地面01:动态生成

2024.6.4更新 昨天半夜意识到生成Cube的方案不合适&#xff0c;又开始到处找动态地面的方法&#xff0c;发现了我想要的效果直接可以用nigara实现&#xff01;&#xff01;&#xff01;&#xff01; 于是这个部分就暂时告一段落&#xff0c;今季开始新的方向的学习。 为了快速…

Flink⼤状态作业调优实践指南:状态报错与启停慢篇

摘要&#xff1a;本文整理自俞航翔、陈婧敏、黄鹏程老师所撰写的大状态作业调优实践指南。由于内容丰富&#xff0c;本文分享终篇状态报错与启停慢篇&#xff0c;主要分为以下四个部分&#xff1a; 检查点和快照超时的诊断与调优 作业快速启动和扩缩容方案 总结 阿里云企业级…

1.vue2.x-初识及环境搭建

目录 1.下载nodejs v16.x 2.设置淘宝镜像源 3.安装脚手架 4.创建一个项目 5.项目修改 代码地址&#xff1a;source-code: 源码笔记 1.下载nodejs v16.x 下载地址&#xff1a;Node.js — Download Node.js 2.设置淘宝镜像源 npm config set registry https://registry.…

如何删除电脑端口映射?

在使用电脑进行网络连接时&#xff0c;有时需要进行端口映射以实现不同设备之间的信息远程通信。当这些端口映射不再需要时&#xff0c;我们需要及时删除它们以确保网络的安全和稳定。本文将介绍如何删除电脑端口映射的方法。 操作系统自带的工具 大多数操作系统都提供了自带…

【docker】centos7配置docker镜像阿里云加速

国内从 DockerHub 拉取镜像有时会遇到困难&#xff0c;由于网络原因&#xff0c;下载一个Docker官方镜像可能会需要很长的时间&#xff0c;甚至下载失败。此时可以配置镜像加速器。Docker 官方和国内很多云服务商都提供了国内加速器服务。 测试了几次阿里云的加速是最快的。 …

OpenCV学习(4.6) 图像梯度

1.目标 在本教程中&#xff1a; 你会学到如何找到图像的梯度&#xff0c;边缘等。你会学到如下函数&#xff1a;**cv.Sobel()&#xff0c;cv.Scharr()&#xff0c;cv.Laplacian()** 等。 图像梯度是图像处理中的一个基本概念&#xff0c;它用于测量图像亮度变化的强度和方向…

springCloud中将redis共用到common模块

一、 springCloud作为公共模块搭建框架 springCloud 微服务模块中将redis作为公共模块进行的搭建结构图&#xff0c;如下&#xff1a; 二、redis 公共模块的搭建框架 如上架构&#xff0c;代码如下pom.xml 关键代码&#xff1a; <dependencies><!-- SpringBoot Boo…

【JMeter接口测试工具】第二节.JMeter基本功能介绍(中)【入门篇】

文章目录 前言四、信息头管理器五、Jmeter参数化 5.1 用户自定义的变量 5.2 csv批量添加 5.3 用户参数 5.4 随机数函数 5.5 计数器函数 5.6 时间函数六、Jmeter断言 6.0 断言介绍 6.1 响应断言 6.2 大小断言 6.3 持续时间断…

AI炒股:用Kimi获取美股的历史成交价格并画出股价走势图

在Kimi中输入提示词&#xff1a; 你是一个Python编程专家&#xff0c;要完成一个编写Python脚本的任务&#xff0c;具体步骤如下&#xff1a; 用akshare库获取谷歌(股票代码&#xff1a;105.GOOG)、亚马逊(股票代码&#xff1a;105.AMZN )、苹果(股票代码&#xff1a;105.AAP…

内存经验分享

目录 内存统计工具 /proc/meminfo Buddy ​​​​​​​​​​​​​​Slub ​​​​​​​Procrank /proc/pid/smaps ​​​​​​​Dumpsys meminfo 内存评估 内存泄漏 Lmk 水位调整 内存统计工具 /proc/meminfo 可以提供整体内存信息&#xff0c;各字段表示的意思如…

【论文速读】| BIOCODER:一个具有上下文实用知识的生物信息学代码生成基准测试

本次分享论文&#xff1a;BIOCODER: A Benchmark for Bioinformatics Code Generation with Contextual Pragmatic Knowledge 基本信息 原文作者&#xff1a;Xiangru Tang, Bill Qian, Rick Gao, Jiakang Chen, Xinyun Chen, Mark Gerstein 作者单位&#xff1a;耶鲁大学, Go…

Vitis HLS 学习笔记--MAXI位宽拓展

目录 1. 简介 2. 用法解析 2.1 命令语法 2.2 实例展示 3. 注意事项 4. 总结 1. 简介 在 Vitis 工具流程中&#xff0c;Vitis HLS 能够自动将 m_axi 接口端口的大小调整为 512 位&#xff0c;以改善突发访问能力。但是&#xff0c;端口宽度自动调整仅支持标准 C 语言数据…

2024年自然语言处理科学与信息检索技术国际会议(ICNLPSIRT 2024)

2024年自然语言处理科学与信息检索技术国际会议(ICNLPSIRT 2024) 2024 International Conference on Natural Language Processing Science and Information Retrieval Technology (ICNLPSIRT 2024) 会议地点&#xff1a;武汉&#xff0c;中国 网址&#xff1a;http://www.i…