Datax同步MySQL到ES

news2025/1/13 13:43:44

Datax同步MySQL到ES

    • 1、在MySQL中建表
    • 2、在ES建立索引
    • 3、构建从MySQL到ES的Datax的Json任务
    • 4、运行mysql2es.json脚本
    • 以下是工作中做过的ETL,如有需要,可以私信沟通交流,互相学习,一起进步

1、在MySQL中建表

  • 建表语句

    • CREATE TABLE `user` (
        `id` int(11) NOT NULL,
        `name` varchar(255) DEFAULT NULL,
        `age` varchar(255) DEFAULT NULL,
        `create_date` datetime DEFAULT NULL,
        PRIMARY KEY (`id`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
      
  • 插入数据

    • INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (1, '小明1', '22','2023-06-02 11:26:04');
      INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (2, '小明2', '22','2023-06-02 11:26:04');
      INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (3, '小明3', '22','2023-06-02 11:26:04');
      INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (4, '小明4', '22','2023-06-02 11:26:04');
      INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (5, '小明5', '23','2023-06-02 11:26:04');
      INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (6, '小明6', '23','2023-06-02 11:26:05');
      INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (7, '小明7', '23','2023-06-02 11:26:05');
      INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (8, '小明8', '23','2023-06-02 11:26:05');
      INSERT INTO `user`(`id`, `name`, `age`,`create_date`) VALUES (9, '小明9', '23','2023-06-02 11:26:05');
      

2、在ES建立索引

  • 建立索引语句

    • 我这里使用Kibana工具连接ES进行操作的,也可以使用Postman进行操作

    • Kibana操作语句

      # 创建索引
      PUT /user
      {
          "mappings" : {
            "properties" : {
              "id" : {
                "type" : "keyword"
              },
              "name" : {
                "type" : "text"
              },
              "age" : {
                "type" : "keyword"
              },
              "create_date" : {
                "type" : "date",
                "format": "yyyy-MM-dd HH:mm:ss"
              }
      	}	
        }
      }
      
    • Postman操作语句

      • 地址输入
      http://localhost:9200/user
      

请添加图片描述
Json文本输入

    {
        "mappings" : {
          "properties" : {
            "id" : {
              "type" : "keyword"
            },
            "name" : {
              "type" : "text"
            },
            "age" : {
              "type" : "keyword"
            },
            "create_date" : {
              "type" : "date",
              "format": "yyyy-MM-dd HH:mm:ss"
            }
    	}	
      }
    }

  • 当出现以下信息代表创建索引成功
    {
        "acknowledged": true,
        "shards_acknowledged": true,
        "index": "user"
    }
    

3、构建从MySQL到ES的Datax的Json任务

[root@hadoop101 ~]# vim mysql2es.json
# 添加以下内容
{
  "job": {
    "setting": {
      "speed": {
        "channel": 8
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              "id",
              "id",
              "name",
              "age",
              "date_format(create_date,'%Y-%m-%d %H:%I:%s')"
            ],
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://192.168.xx.xxx:3306/bigdata"
                ],
                "table": [
                  "user"
                ]
              }
            ],
            "password": "xxxxxx",
            "username": "root",
            "where": "",
            "splitPk": "id"
          }
        },
        "writer": {
          "name": "elasticsearchwriter",
          "parameter": {
            "endpoint": "http://192.168.xx.xxx:9200",
            "accessId": "root",
            "accessKey": "root",
            "index": "user",
            "type": "_doc",
            "settings": {"index" :{"number_of_shards": 5, "number_of_replicas": 1}},
            "batchSize": 5000,
            "splitter": ",",
            "column": [
              {
                "name": "pk",
                "type": "id"
              },
              {
                "name": "id",
                "type": "keyword"
              },
              {
                "name": "name",
                "type": "text"
              },
              {
                "name": "age",
                "type": "keyword"
              },
              {
                "name": "create_date",
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss"
              }
            ]
          }
        }
      }
    ]
  }
}
  • 参数介绍

    • reader:datax的source(来源)端
    • reader.cloumn::读取mysql的字段名
    • reader.connection.jdbcUrl:MySQL连接的url
    • reader.connection.table:读取MySQL的表名
    • reader.password:连接MySQL的用户名
    • reader.username:连接MySQL的密码
    • reader.where:读取MySQL的过滤条件
    • reader.splitPk:读取MySQL时按照哪个字段进行切分
    • writer:datax的sink(去处)端
    • writer.endpoint:ElasticSearch的连接地址
    • writer.accessId:http auth中的user
    • writer.accessKey:http auth中的password

    注意:假如Elasticsearch没有设置用户名,也需要给accessId和accessKey值,不然就报错了,可以给赋值root,root

    • writer.index:Elasticsearch中的index名

    • writer.type:Elasticsearch中index的type名

    • writer.settings:创建index时候的settings, 与Elasticsearch官方相同

    • writer.batchSize:每次批量数据的条数

    • writer.splitter:如果插入数据是array,就使用指定分隔符

    • writer.column:Elasticsearch所支持的字段类型

      • 下面是column样例字段类型
      "column": [
                    # 使用数据库id作为es中记录的_id,业务主键(pk):_id 的值指定为某一个字段。
                    {"name": "pk", "type": "id"}, 
                    { "name": "col_ip","type": "ip" },
                    { "name": "col_double","type": "double" },
                    { "name": "col_long","type": "long" },
                    { "name": "col_integer","type": "integer" },
                    { "name": "col_keyword", "type": "keyword" },
                    { "name": "col_text", "type": "text", "analyzer": "ik_max_word"},
                    { "name": "col_geo_point", "type": "geo_point" },
                    # ES日期字段创建需指定格式 yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis
                    { "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
                    { "name": "col_nested1", "type": "nested" },
                    { "name": "col_nested2", "type": "nested" },
                    { "name": "col_object1", "type": "object" },
                    { "name": "col_object2", "type": "object" },
                    { "name": "col_integer_array", "type":"integer", "array":true},
                    { "name": "col_geo_shape", "type":"geo_shape", "tree": "quadtree", "precision": "10m"}
                  ]
      
    • 如果时间类型需要精细到yyyy-MM-dd HH:mm:ss.SSSSSS,比如时间为:2023-06-02 13:39:57.000000

      • MySQL的cloumn端填写
      "date_format(create_date,'%Y-%m-%d %H:%I:%s.%f')"
      
      • Elasticsearch的cloumn填写
      { "name": "create_date", "type": "text"}
      
      • Elasticsearch创建索引时修改如下
      { "name": "create_date","type": "text"}
      

4、运行mysql2es.json脚本

  • 问题1: ConfigParser - 插件[mysqlreader,elasticsearchwriter]加载失败
    请添加图片描述

  • 解决问题:

    下面是编译好的elasticsearchwriter插件,放在DATAX_HOME/plugin/writer目录下面,就可以运行成功了,也可以自己从官网下载,然后自己编译好也可以用的

    • elasticsearchwriter百度网盘资源下载链接:
    链接:https://pan.baidu.com/s/1C_OeXWf_t5iVNiLquvEhjw 
    提取码:tutu 
    
  • 问题2:在运行从MySQL抽取500万条数据到Elasticsearch时,出现了datax传输200多万条数据卡住的情况,报错日志:I/O Exception … Broken pipe (write failed)

请添加图片描述

  • 解决问题:

    datax传输mysql到es卡在200多万这个问题和channel: 8和es的oom有一定的关系

    • 测试样例

    • 1、es batchsize为5000,不开启mysql切分"splitPk": “id”,卡住

    • 2、es batchsize为1000,不开启mysql切分"splitPk": “id”,不卡住

    • 3、es batchsize为5000,开启mysql切分"splitPk": “id”,不卡住

  • 总结:这个问题和mysql读取速率,es 写入速率有关,开启切分提高一下读取速率就不会卡住了

  • 成功运行截图:
    请添加图片描述

以下是工作中做过的ETL,如有需要,可以私信沟通交流,互相学习,一起进步

请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/601695.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何在Flutter 中启用空安全

目录 Step 1: 开启空安全设置 Step 2:检查可升级的三方库 Step 3:升级三方库 Step 4:代码适配支持空安全 Step 1: 开启空安全设置 Flutter 2默认启用了空安全,所以通过Flutter 2创建的项目是已经开启了空安全的检查的,不知道自己是什么版本的可以通…

【MCS-51单片机汇编语言】期末复习总结①——常见指令汇总(题型一)

文章目录 七大寻址方式重要指令转移堆栈加减乘除位操作跳转条件转移子程序常考题型 七大寻址方式 立即寻址:通过一个立即数来指定存储单元的地址,例如#41H;寄存器寻址:Rn(n0~7),A,B&#xff0c…

自学软件测试,一般人我劝你还是算了吧

软件测试自学是完全可以的,但是这句话并不代表人人都可以自学。 想转行软件测试,纯自学会遇到以下8个问题: 1、自学需要很高的自律性,你能做到吗? 2、自学在学习过程中会碰到很多困难,你都能解决吗&#xf…

直播入门手册

直播除了带货,现在越来越成了分享知识,增近主播和粉近距离互动的平台。最近看到抖音上越来越多的主播进行编程经验的分享,这是一个很好的传播知识的方式,以前我们学习编程技术一般看视频,其实视频的互动性没有直播那么…

智能家居数据分析:语音交互为用户偏好模式,使用最高达72%

哈喽大家好,近些年来,智能家居行业的发展无比迅速,同时,最近两年来人工智能技术的不断突破,对智能家居有着无与伦比的推动力。本期将为大家介绍下智能家居的行业形势。 通过数据可视化平台把报表数据处理后展示出来的…

(4)NUC980 RootFs

Buildroot是一个开源的嵌入式Linux系统构建工具,它可以帮助用户自动化构建Linux系统的各个部分,包括Linux内核、库、文件系统、应用程序等。通过Buildroot,用户可以根据需要选择和配置所需的软件包,从而构建一个定制化的Linux系统…

Nginx网站服务——编译安装及系统服务添加

Nginx网站服务——编译安装及系统服务添加 一、Nginx的相关知识1.Nginx的简介2. Apache与Nginx的区别3.Apache的优势4.Nginx的优势 二、Nginx编译安装1.关闭防火墙,将安装nginx所需软件包传到/opt目录下2.安装相关的依赖包3.创建运行用户、组4.编译安装Nginx5.检查、…

【Rust学习】web框架 Axum,提供REST API

cargo-watch:有修改就重启服务器,类似java web的热部署 安装:cargo install cargo-watch 使用:cargo watch -x run 这样每次有修改就会自动重启web服务 vscode插件Thunder Client(类似postman) hello,world 建议用cargo add的方式添加 […

视频压缩存储解决方案

一、背景介绍 随着视频技术的不断发展,人们对视频质量和存储需求的要求也越来越高。而视频文件的大小往往会给存储和传输带来诸多困扰。因此,如何有效地压缩视频文件成为了一个非常重要的问题。 二、压缩算法简介 有损压缩:通过去除视频中的…

SpringBoot3整合SpringSecurity,实现自定义接口权限过滤

接口权限过滤是指对于某些接口或功能,系统通过设定一定的权限规则,只允许经过身份认证且拥有相应权限的用户或应用程序进行访问和操作。这种技术可以有效地保护系统资源和数据安全,防止未授权的用户或程序进行恶意操作或非法访问。通常情况下…

广州华锐互动:智能虚拟人运用到短视频行业能带来哪些价值?

随着科学技术的不断发展,虚拟数字人呈现飞跃式发展,各式各样的虚拟数字人进入到我们的视野,而同样作为新风口的短视频行业,也成为了人们日常生活中不可或缺的一部分。那么,将这两者结合起来会带来哪些变化呢&#xff1…

C#开发串口调试助手实现modbusRTU通信

今天给大家搞个项目,跟我来,走过路过不要错过,看大V如何玩C#工业软件开发,搞事的目的是:掌握MODBUSRTU通信协议,掌握简单型串口调试助手开发,掌握串口通信过程 硬件产品:串口RS232温…

中移链资源管理介绍

中移链是基于EOS底层框架,在满足我国信息化监管需求、合规可控的前提下,打造的中国移动区块链服务平台。在中移链中主要包括CPU、RAM和NET三种资源。CPU资源是用于执行智能合约的计算能力,RAM资源用于存储智能合约和其它数据,NET资…

【京东API】京东app获得JD商品详情原数据接口

京东是中国最大的综合型电商网站之一,其app端是用户购买商品的主要途径之一。为了更好地满足用户的需求,开发人员提供了商品原数据接口,让第三方开发者可以获取京东商品的详细信息。 开发背景: 随着移动设备用户数量的不断增加&a…

2023最新 如何修改appstroe的开发者名称?

1、输入账号密码登录 http://itunesconnect.apple.com 2、点击app 3、点击我的账户account 4、页面往下滑动,点击更新信息 5、点击提供更新信息 6、根据需要修改新的信息 Hello, I want to change the company name in Chinese, according to changed to “xxx…

MySQL内存

结构 xtradb-innodb-internals-in-drawing InnoDB存储引擎体系结构 内存结构与磁盘结构 InnoDB存储结构 内存相关参数 在MySQL中,可以通过一些参数来控制内存的使用和管理。以下是一些常用的控制内存的参数: innodb_buffer_pool_size: 这是控制InnoD…

商场室内导航制作,商场导览图怎么做的?

商场导览图怎么做的?现在很多商场都比较大,往往需要借助地图才能快速找到想要去的店铺,比如在商场大厅展示商场楼层规划以及楼层具体商户等,让消费者了解商场的整体结构,有逛下去的欲望。重点标记出逃生通道、厕所、进…

是时候搭建一个自己的ChatGPT 了!

ChatGPT客户端-ChatBox https://github.com/Bin-Huang/chatbox 开源的 ChatGPT API (OpenAI API) 跨平台桌面客户端,Prompt 的调试与管理工具,也可以用作 ChatGPT Plus 平替。 如需找不到下载地址,可以私信留言。 ChatGPT Next Web https…

MySQL 自增列使用上的一些 “坑”

文章目录 前言1. 自增列空洞1.1 手动指定2.2 分配未使用 2. 自增列监控2.1 sys 库监控2.2 通用查询 3. 一些 BUG3.1 重启失效3.2 冲突问题 前言 MySQL 的规范中,一般都会建议表要有主键,常使用自增列作为主键字段,这和 MySQL 属于聚簇索引表…

【FlatpanelsHD】HDR生态系统追踪器

Dolby Vision被称为Profile 8.4,与基于pq(杜比实验室开发的感知量化技术,也是无处不在的HDR10的基础)的所有其他口味不同,它基于HLG或Hybrid Log Gamma,由BBC和NHK开发,主要用于电视直播。 用HLG捕捉HDR视频的相机并不…