SpringBoot整合DataX数据同步(自动生成job文件)

news2025/1/9 2:02:17

SpringBoot整合Datax数据同步

文章目录

  • SpringBoot整合Datax数据同步
    • 1.简介
        • 设计理念
      • DataX3.0框架设计
      • DataX3.0核心架构
        • 核心模块介绍
        • DataX调度流程
    • 2.DataX3.0插件体系
    • 3.数据同步
      • 1.编写job的json文件
      • 2.进入bin目录下,执行文件
    • 4.SpringBoot整合DataX生成Job文件并执行
        • 1.准备工作
        • 2.文件目录如图
        • 3.Mysql数据同步
        • 4.Elasticsearch写入Mysql数据
    • 5.Job文件参数说明
      • 1.MysqlReader
      • 2.MysqlWriter
      • 3.ElasticsearchWriter

1.简介

DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。

DataX 是一个异构数据源离线同步工具,致力于实现各种异构数据源之间稳定高效的数据同步功能。

Download DataX下载地址

Github主页地址:https://github.com/alibaba/DataX

请点击:Quick Start

在这里插入图片描述

  • 设计理念

    为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

DataX3.0框架设计

在这里插入图片描述

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX3.0核心架构

DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。

在这里插入图片描述

核心模块介绍
  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
DataX调度流程

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

  1. DataXJob根据分库分表切分成了100个Task。
  2. 根据20个并发,DataX计算共需要分配4个TaskGroup。
  3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

2.DataX3.0插件体系

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图,详情请点击:DataX数据源参考指南

类型数据源Reader(读)Writer(写)文档
RDBMS 关系型数据库MySQL读 、写
Oracle读 、写
OceanBase读 、写
SQLServer读 、写
PostgreSQL读 、写
DRDS读 、写
Kingbase读 、写
通用RDBMS(支持所有关系型数据库)读 、写
阿里云数仓数据存储ODPS读 、写
ADB
ADS
OSS读 、写
OCS
Hologres
AnalyticDB For PostgreSQL
阿里云中间件datahub读 、写
SLS读 、写
图数据库阿里云 GDB读 、写
Neo4j
NoSQL数据存储OTS读 、写
Hbase0.94读 、写
Hbase1.1读 、写
Phoenix4.x读 、写
Phoenix5.x读 、写
MongoDB读 、写
Cassandra读 、写
数仓数据存储StarRocks读 、写
ApacheDoris
ClickHouse读 、写
Databend
Hive读 、写
kudu
selectdb
无结构化数据存储TxtFile读 、写
FTP读 、写
HDFS读 、写
Elasticsearch
时间序列数据库OpenTSDB
TSDB读 、写
TDengine读 、写

3.数据同步

1.编写job的json文件

mysql数据抽取到本地内存

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              "id",
              "name",
              "amount",
            ],
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://127.0.0.1:3306/datax?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
                ],
                "table": [
                  "user"
                ]
              }
            ],
            "password": "root",
            "username": "root"
          }
        },
        "writer": {
            "name": "streamwriter",
            "parameter": {
                 "print": false,
                 "encoding": "UTF-8"
            }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": "1"
      }
    }
  }
}

mysqlWriter数据写入

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [
            {
                 "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column" : [
                            {
                                "value": "DataX",
                                "type": "string"
                            },
                            {
                                "value": 19880808,
                                "type": "long"
                            },
                            {
                                "value": "1988-08-08 08:08:08",
                                "type": "date"
                            },
                            {
                                "value": true,
                                "type": "bool"
                            },
                            {
                                "value": "test",
                                "type": "bytes"
                            }
                        ],
                        "sliceRecordCount": 1000
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "root",
                        "column": [
                            "id",
                            "name"
                        ],
                        "session": [
                        	"set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            "delete from test"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk",
                                "table": [
                                    "test"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

mysql数据同步

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              "id",
              "project_code",
              "category"
            ],
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://127.0.0.1:3306/test_local?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
                ],
                "table": [
                  "project_index"
                ]
              }
            ],
            "password": "root",
            "username": "root"
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "column": [
              "id",
              "project_code",
              "category"
            ],
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai",
                "table": [
                  "project_index"
                ]
              }
            ],
            "password": "root",
            "username": "root",
            "writeMode": "update"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": "1"
      }
    }
  }
}

2.进入bin目录下,执行文件

需python环境

python datax.py {YOUR_JOB.json}

4.SpringBoot整合DataX生成Job文件并执行

1.准备工作

下载datax,安装lib下的datax-commondatax-core的jar到本地maven仓库

依赖

  <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jdbc</artifactId>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>


        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.54</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>datax-core</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>datax-common</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.25</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.21</version>
        </dependency>


        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.26</version>
        </dependency>


        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>
spring:
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/test_local?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
    username: root
    password: root
    driver-class-name: com.mysql.cj.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource

server:
  port: 8080

# datax 相关配置,在生成文件时使用
datax:
  home: D:/software/datax/
  # job文件存储位置
  save-path: D:/software/datax/job/

属性配置

/**
 * datax配置
 * @author moshangshang
 */
@Data
@Component
@ConfigurationProperties("datax")
public class DataXProperties {


    private String home;

    private String savePath;

}

公共实体

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Content {

    private Reader reader;

    private Writer writer;

}
@Data
public class DataXJobRoot {
    private Job job;
}

@Data
public class Job {

    private List<Content> content;

    private Setting setting = new Setting();

}
@Data
public class Setting {

    private Speed speed = new Speed();

    @Data
    public static class Speed {
        private String channel = "1";
    }
}
public abstract class Parameter {
}
/**
 *  读取抽象类
 *  @author moshangshang
 */
@Data
public abstract class Reader {

    private String name;

    private Parameter parameter;

}
/**
 * 写入抽象类
 *  @author moshangshang
 */
@Data
public abstract class Writer {

    private String name;
    private Parameter parameter;
}

公共处理接口

public interface DataXInterface {

     /**
      * 获取读对象
      */
     Reader getReader(String table);


     /**
      * 获取写对象
      */
     Writer getWriter(String table);


     /**
      * 同类型读取写入,如mysql到mysql
      */
     String getJobTaskName(String readerTable,String writeTable);

     /**
      * 自定义读取写入
      * @param reader 读处理
      * @param write 写处理
      * @param suffix 文件名
      */
     String getJobTaskName(Reader reader,Writer write, String suffix);

}


/**
 * 接口抽象类
 * @author moshangshang
 */
@Component
public abstract class AbstractDataXHandler implements DataXInterface {

    @Autowired
    private DataXProperties dataXProperties;

    /**
     * 自定义读取写入
     * @param reader 读处理
     * @param write 写处理
     * @param suffix 文件名
     */
    @Override
    public String getJobTaskName(Reader reader, Writer write, String suffix) {
        DataXJobRoot root = new DataXJobRoot();
        Job job = new Job();
        root.setJob(job);
        Content content = new Content(reader,write);
        job.setContent(Collections.singletonList(content));
        String jsonStr = JSONUtil.parse(root).toJSONString(2);
        String fileName = "datax_job_"+ UUID.randomUUID().toString().replaceAll("-","") +"_"+suffix+".json";
        File file = FileUtil.file(dataXProperties.getSavePath(),fileName);
        FileUtil.appendString(jsonStr, file, "utf-8");
        return fileName;
    }



}

工具方法

@Repository
public class DatabaseInfoRepository {
    private final JdbcTemplate jdbcTemplate;

    @Autowired
    public DatabaseInfoRepository(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    /**
     * 获取所有表名
     */
    public List<String> getAllTableNames() {
        String sql = "SHOW TABLES";
        return jdbcTemplate.queryForList(sql, String.class);
    }

    /**
     * 根据表名获取字段信息
     */
    public List<Map<String, Object>> getTableColumns(String tableName) {
        String sql = "SHOW FULL COLUMNS FROM " + tableName;
        return jdbcTemplate.queryForList(sql);
    }
}
@Slf4j
@Service
public class DatabaseInfoService {

    private final DatabaseInfoRepository databaseInfoRepository;

    @Autowired
    public DatabaseInfoService(DatabaseInfoRepository databaseInfoRepository) {
        this.databaseInfoRepository = databaseInfoRepository;
    }

    public void printAllTablesAndColumns() {
        // 获取所有表名
        List<String> tableNames = databaseInfoRepository.getAllTableNames();

        // 遍历表名,获取并打印每个表的字段信息
        for (String tableName : tableNames) {
            System.out.println("Table: " + tableName);

            // 获取当前表的字段信息
            List<Map<String, Object>> columns = databaseInfoRepository.getTableColumns(tableName);

            // 遍历字段信息并打印
            for (Map<String, Object> column : columns) {
                System.out.println("  Column: " + column.get("Field") + " (Type: " + column.get("Type") + ")" + " (Comment: " + column.get("Comment") + ")");
            }
            // 打印空行作为分隔
            System.out.println();
        }
    }

    /** 查询指定表的所有字段列表 */
    public List<String> getColumns(String tableName) {
        List<String> list = new ArrayList<>();
        // 获取当前表的字段信息
        List<Map<String, Object>> columns = databaseInfoRepository.getTableColumns(tableName);

        // 遍历字段信息并打印
        for (Map<String, Object> column : columns) {
            list.add(column.get("Field").toString());
        }

        return list;
    }

    /** 查询指定表的所有字段列表,封装成HdfsWriter格式 */
    public List<HdfsWriter.Column> getHdfsColumns(String tableName) {
        List<HdfsWriter.Column> list = new ArrayList<>();
        // 获取当前表的字段信息
        List<Map<String, Object>> columns = databaseInfoRepository.getTableColumns(tableName);

        // 遍历字段信息并打印
        for (Map<String, Object> column : columns) {
            String name = column.get("Field").toString();
            String typeDb = column.get("Type").toString();
            String type = "string";
            if (typeDb.equals("bigint")) {
                type = "bigint";
            } else if (typeDb.startsWith("varchar")) {
                type = "string";
            } else if (typeDb.startsWith("date") || typeDb.endsWith("timestamp")) {
                type = "date";
            }
            HdfsWriter.Column columnHdfs = new HdfsWriter.Column();
            columnHdfs.setName(name);
            columnHdfs.setType(type);
            list.add(columnHdfs);
        }

        return list;
    }
}

datax的job任务json执行方法


/**
 * 执行器
 * @author moshangshang
 */
@Slf4j
@Component
public class DataXExecuter {

    @Autowired
    private DataXProperties dataXProperties;

    public void run(String fileName) throws IOException {
        System.setProperty("datax.home", dataXProperties.getHome());
        String filePath = dataXProperties.getSavePath()+fileName;
        String dataxJson = JSONUtil.parse(FileUtils.readFileToString(new File(filePath),"UTF-8")).toJSONString(2);
        log.info("datax log:{}",dataxJson);
        String[] dataxArgs = {"-job", filePath, "-mode", "standalone", "-jobid", "-1"};
        try {
            Engine.entry(dataxArgs);
        }catch (DataXException e){
            log.error("执行失败",e);
        } catch (Throwable throwable) {
            log.error("DataX执行异常,error cause::\n" + ExceptionTracker.trace(throwable));
        }

    }

}
2.文件目录如图

在这里插入图片描述

3.Mysql数据同步

1.编写mysql读写对象,继承读写接口

@Data
public class MysqlReader extends Reader {

    public String getName() {
        return "mysqlreader";
    }


    @Data
    public static class MysqlParameter extends Parameter {
        private List<String> column;
        private List<Connection> connection;
        private String password;
        private String username;
        private String splitPk;
        private String where;
    }

    @Data
    public static class Connection {
        private List<String> jdbcUrl;
        private List<String> table;
        private List<String> querySql;
    }
}

@EqualsAndHashCode(callSuper = true)
@Data
public class MysqlWriter extends Writer {

    public String getName() {
        return "mysqlwriter";
    }
    @EqualsAndHashCode(callSuper = true)
    @Data
    public static class MysqlParameter extends Parameter {
        private List<String> column;
        private List<Connection> connection;
        private String password;
        private String username;
        private String writeMode = "update";
    }

    @Data
    public static class Connection {
        private String jdbcUrl;
        private List<String> table;
    }
}

2.配置mysql读和写的数据库信息

/**
 * mysql读写配置
 * @author moshangshang
 */
@Data
@ConfigurationProperties("datax.mysql.reader")
public class DataXMysqlReaderProperties {


    private String url;

    private String password;

    private String username;


}
/**
 * mysql读写配置
 * @author moshangshang
 */
@Data
@ConfigurationProperties("datax.mysql.writer")
public class DataXMysqlWriterProperties {


    private String url;

    private String password;

    private String username;

}
# datax 相关配置,在生成文件时使用
datax:
  mysql:
    reader:
      url: jdbc:mysql://127.0.0.1:3306/test_local?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
      username: root
      password: root
    writer:
      url: jdbc:mysql://127.0.0.1:3306/ruoyi_local?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
      username: root
      password: root

2.编写mysql处理类,继承抽象处理接口。生成job文件

/**
 * mysql读写处理
 * @author moshangshang
 */
@Component
@EnableConfigurationProperties({DataXMysqlReaderProperties.class, DataXMysqlWriterProperties.class})
public class MysqlHandler extends AbstractDataXHandler{

    @Autowired
    private DatabaseInfoService databaseInfoService;

    @Autowired
    private DataXProperties dataXProperties;
    @Autowired
    private DataXMysqlReaderProperties dataXMysqlReaderProperties;
    @Autowired
    private DataXMysqlWriterProperties dataXMysqlWriterProperties;

    @Override
    public Reader getReader(String table) {
        MysqlReader reader = new MysqlReader();
        MysqlReader.MysqlParameter readerParameter = new MysqlReader.MysqlParameter();
        readerParameter.setPassword(dataXMysqlReaderProperties.getPassword());
        readerParameter.setUsername(dataXMysqlReaderProperties.getUsername());
        List<String> readerColumns = databaseInfoService.getColumns(table);
        readerParameter.setColumn(readerColumns);
        MysqlReader.Connection readerConnection = new MysqlReader.Connection();
        readerConnection.setJdbcUrl(Collections.singletonList(dataXMysqlReaderProperties.getUrl()));
        readerConnection.setTable(Collections.singletonList(table));
        readerParameter.setConnection(Collections.singletonList(readerConnection));
        reader.setParameter(readerParameter);
        return reader;
    }

    @Override
    public Writer getWriter(String table) {
        MysqlWriter writer = new MysqlWriter();
        MysqlWriter.MysqlParameter writerParameter = new MysqlWriter.MysqlParameter();
        writerParameter.setPassword(dataXMysqlWriterProperties.getPassword());
        writerParameter.setUsername(dataXMysqlWriterProperties.getUsername());
        List<String> columns = databaseInfoService.getColumns(table);
        writerParameter.setColumn(columns);
        MysqlWriter.Connection connection = new MysqlWriter.Connection();
        connection.setJdbcUrl(dataXMysqlWriterProperties.getUrl());
        connection.setTable(Collections.singletonList(table));
        writerParameter.setConnection(Collections.singletonList(connection));
        writer.setParameter(writerParameter);
        return writer;
    }

    @Override
    public String getJobTaskName(String readerTable,String writeTable) {
        DataXJobRoot root = new DataXJobRoot();
        Job job = new Job();
        root.setJob(job);
        Content content = new Content(getReader(readerTable),getWriter(writeTable));
        job.setContent(Collections.singletonList(content));
        String jsonStr = JSONUtil.parse(root).toJSONString(2);
        String fileName = "datax_job_"+ UUID.randomUUID().toString().replaceAll("-","") +"_h2h.json";
        File file = FileUtil.file(dataXProperties.getSavePath(),fileName);
        FileUtil.appendString(jsonStr, file, "utf-8");
        return fileName;
    }
}

3.调用执行器,执行任务job

@SpringBootTest
public class DataxTest {

    @Autowired
    private MysqlHandler mysqlHandler;


    @Autowired
    private DataXExecuter dataXExecuter;


    /**
     * 读t_user表同步到user
     */
    @Test
    public void test() throws IOException {
        String jobTask = mysqlHandler.getJobTaskName("t_user", "user");
        dataXExecuter.run(jobTask);
    }
    
    
    /**
     * 直接执行json文件
     */
    @Test
    public void test2() throws IOException {
        dataXExecuter.run("datax_job_83798b5f1766406289a44fe681dc8878_m2m.json");
    }
}

4.执行结果

在这里插入图片描述

4.Elasticsearch写入Mysql数据

注意事项:

  • es目前只支持写入不支持读取
  • mysql数据写入es时,需保证es与mysql的列数column相同,不支持类似mysql的部分字段写入
  • 需保证列的顺序相同,写入时不会根据name名称字段去自动对应,如果顺序不一致,则可能会转换错误。如id,name,写入name,id

原理:使用elasticsearch的rest api接口, 批量把从reader读入的数据写入elasticsearch

创建es索引映射

PUT datax_data
{
  "mappings": {
    "properties": {
      "name":{"type": "keyword"},
      "amount":{"type": "long"}
    }
  }
}

1.添加es配置和文件

spring:
  elasticsearch:
    #username:
    #password:
    #path-prefix:
    uris: http://127.0.0.1:9200
    #连接elasticsearch超时时间
    connection-timeout: 60000
    socket-timeout: 30000
# datax 相关配置,在生成文件时使用
datax:
  elasticsearch:
    writer:
      url: http://127.0.0.1:9200
      username:
      password:

/**
 * es写配置
 * @author moshangshang
 */
@Data
@ConfigurationProperties("datax.elasticsearch.writer")
public class DataXElasticSearchProperties {

    private String url;

    private String username;

    private String password;

}

2.编写生成job文件实体类

@EqualsAndHashCode(callSuper = true)
@Data
public class ElasticSearchWriter extends Writer {

    public String getName() {
        return "elasticsearchwriter";
    }

    @EqualsAndHashCode(callSuper = true)
    @Data
    public static class ElasticSearchParameter extends Parameter {
        private List<Column> column;
        private String endpoint;
        private String accessId;
        private String accessKey;
        private String index;
        private Settings settings;
        private String type = "default";
        private boolean cleanup = true;
        private boolean discovery = false;
        private Integer batchSize = 1000;
        private String splitter = ",";
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Column {
        private String name;
        private String type;
        private String analyzer;
    }

    @Data
    public static class Settings {
        private Map<String,Object> index;
    }


}

3.es接口扩展

/**
 * es接口扩展
 * @author moshangshang
 */
public interface DataXElasticsearchInterface extends DataXInterface {

    Writer getWriter(String table, Map<String,Object> indexSettings);

}

4.es核心处理类

@Component
@EnableConfigurationProperties({DataXElasticSearchProperties.class})
public class ElasticSearchHandler extends AbstractDataXHandler implements DataXElasticsearchInterface{

    @Autowired
    private ElasticsearchRestTemplate elasticsearchRestTemplate;

    @Autowired
    private DataXElasticSearchProperties dataXElasticSearchProperties;

    @Override
    public Reader getReader(String table) {
        return null;
    }

    /**
     * 普通写入
     * @param index 索引
     * @return Writer
     */
    @Override
    public Writer getWriter(String index) {
        ElasticSearchWriter writer = new ElasticSearchWriter();
        ElasticSearchWriter.ElasticSearchParameter writerParameter = getElasticSearchWriter(index);
        writer.setParameter(writerParameter);
        return writer;
    }

    @Override
    public String getJobTaskName(String readerTable, String writeTable) {
        return null;
    }

    /**
     * es写入,带setting设置
     */
    @Override
    public Writer getWriter(String index,Map<String,Object> map) {
        ElasticSearchWriter writer = new ElasticSearchWriter();
        ElasticSearchWriter.ElasticSearchParameter writerParameter = getElasticSearchWriter(index);
        ElasticSearchWriter.Settings settings = new ElasticSearchWriter.Settings();
        settings.setIndex(map);
        writerParameter.setSettings(settings);
        writer.setParameter(writerParameter);
        return writer;
    }


    /**
     * 获取公共写入参数
     */
    public ElasticSearchWriter.ElasticSearchParameter getElasticSearchWriter(String index){
        ElasticSearchWriter.ElasticSearchParameter writerParameter = new ElasticSearchWriter.ElasticSearchParameter();
        List<Column> columns = getEsColumns(index);
        writerParameter.setColumn(columns);
        writerParameter.setEndpoint(dataXElasticSearchProperties.getUrl());
        writerParameter.setAccessId(dataXElasticSearchProperties.getUsername());
        writerParameter.setAccessKey(dataXElasticSearchProperties.getPassword());
        writerParameter.setIndex(index);
        return writerParameter;
    }



   /**
     * 获取指定索引的映射字段
     * 读取时和创建顺序相反
     */
    public List<ElasticSearchWriter.Column> getEsColumns(String index){
        List<ElasticSearchWriter.Column> columns = new ArrayList<>();
        // 获取操作的索引文档对象
        IndexOperations indexOperations = elasticsearchRestTemplate.indexOps(IndexCoordinates.of(index));
        Map<String, Object> mapping = indexOperations.getMapping();
        mapping.forEach((k,value) ->{
            JSONObject json = JSON.parseObject(JSONObject.toJSONString(value));
            for (Map.Entry<String, Object> entry : json.entrySet()) {
                String key = entry.getKey();
                JSONObject properties = JSON.parseObject(JSONObject.toJSONString(entry.getValue()));
                String type = properties.getString("type");
                String analyzer = properties.getString("analyzer");
                columns.add(new ElasticSearchWriter.Column(key,type,analyzer));
            }
        });
        return columns;
    }

}

5.测试

    @Test
    public void test3() throws IOException {
        Map<String,Object> settings = new HashMap<>();
        settings.put("number_of_shards",1);
        settings.put("number_of_replicas",1);
        String jobTask = elasticSearchHandler.getJobTaskName(mysqlHandler.getReader("t_user")
                , elasticSearchHandler.getWriter("datax_data",settings),"m2e");
        dataXExecuter.run(jobTask);
    }

5.Job文件参数说明

1.MysqlReader

参数名描述必选
jdbcUrl对端数据库的JDBC连接信息并支持一个库填写多个连接地址。之所以使用JSON数组描述连接信息,是因为阿里集团内部支持多个IP探测,如果配置了多个,MysqlReader可以依次探测ip的可连接性,直到选择一个合法的IP,如果全部连接失败,MysqlReader报错。 注意,jdbcUrl必须包含在connection配置单元中。对于阿里外部使用情况,JSON数组填写一个JDBC连接即可。
username数据源的用户名
password数据源指定用户名的密码
table所选取的需要同步的表。支持多张表同时抽取,用户自己需保证多张表是同一schema结构,注意,table必须包含在connection配置单元中。
column所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用代表默认使用所有列配置,例如['']。
splitPk分区主键,DataX因此会启动并发任务进行数据同步。推荐splitPk用户使用表主键
where筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。注意:limit不是SQL的合法where子句。where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或者value,均视作同步全量数据。
querySql查询SQL同步。当用户配置了这一项之后,当用户配置querySql时,MysqlReader直接忽略table、column、where条件的配置,querySql优先级大于table、column、where选项。

2.MysqlWriter

参数名描述必选
jdbcUrl目的数据库的 JDBC 连接信息。作业运行时,DataX 会在你提供的 jdbcUrl 后面追加如下属性:yearIsDateType=false&zeroDateTimeBehavior=convertToNull&rewriteBatchedStatements=true;在一个数据库上只能配置一个 jdbcUrl 值。这与 MysqlReader 支持多个备库探测不同,因为此处不支持同一个数据库存在多个主库的情况(双主导入数据情况)
username数据源的用户名
password数据源指定用户名的密码
table目的表的表名称。支持写入一个或者多个表。当配置为多张表时,必须确保所有表结构保持一致。table 和 jdbcUrl 必须包含在 connection 配置单元中
column目的表需要写入数据的字段,例如: “column”: [“id”,“name”,“age”]。如果要依次写入全部列,使用*表示, 例如: "column": ["*"]
sessionDataX在获取Mysql连接时,执行session指定的SQL语句,修改当前connection session属性
preSql写入数据到目的表前,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,请使用 @table 表示,这样在实际执行 Sql 语句时,会对变量按照实际表名称进行替换。比如你的任务是要写入到目的端的100个同构分表(表名称为:datax_00,datax01, … datax_98,datax_99),并且你希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:"preSql":["delete from 表名"],效果是:在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称
postSql写入数据到目的表后,会执行这里的标准语句
writeMode控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句;可选:insert/replace/update,默认insert
batchSize一次性批量提交的记录数大小,该值可以极大减少DataX与Mysql的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。默认:1024

3.ElasticsearchWriter

参数名描述必选
endpointElasticSearch的连接地址
accessIdhttp auth中的user
accessKeyhttp auth中的password
indexelasticsearch中的index名
typeelasticsearch中index的type名,默认index名
cleanup是否删除原表, 默认值:false
batchSize每次批量数据的条数,默认值:1000
trySize失败后重试的次数, 默认值:30
timeout客户端超时时间,默认值:600000
discovery启用节点发现将(轮询)并定期更新客户机中的服务器列表。默认false
compressionhttp请求,开启压缩,默认true
multiThreadhttp请求,是否有多线程,默认true
ignoreWriteError忽略写入错误,不重试,继续写入,默认false
alias数据导入完成后写入别名
aliasMode数据导入完成后增加别名的模式,append(增加模式), exclusive(只留这一个),默认append
settings创建index时候的settings, 与elasticsearch官方相同
splitter如果插入数据是array,就使用指定分隔符,默认值:-,-
columnelasticsearch所支持的字段类型,样例中包含了全部
dynamic不使用datax的mappings,使用es自己的自动mappings,默认值: false

参考资料:https://blog.csdn.net/wlddhj/article/details/137585979

Github主页地址:https://github.com/alibaba/DataX

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1905470.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SAP_MM模块-特殊业务场景下的系统实现方案

一、业务背景 目前公司有一种电商业务&#xff0c;卖的是备品配件&#xff0c;是公司先跟供应商采购&#xff0c;然后再销售给客户&#xff0c;系统账就是按照正常业务来流转&#xff0c;公司进行采购订单入库&#xff0c;然后销售订单出库。 不过这种备品配件&#xff0c;实…

【服务器搭建】✈️用自己电脑搭建一个服务器!

目录 &#x1f44b;前言 &#x1f440;一、内网穿透 &#x1f331;二、内网穿透工具 &#x1f49e;️三、本地测试 3.1 环境准备 3.2 nginx 修改启动页面 3.3 神卓互联注册&#xff0c;创建映射规则 &#x1f4eb;四、章末 &#x1f44b;前言 小伙伴们大家好&#xff0c;一…

【算法笔记自学】第 7 章 提高篇(1)——数据结构专题(1)

7.1栈的应用 #include <iostream> #include <string> #include <stack> using namespace std;int main() {int n, x;string action;cin >> n;stack<int> s;for (int i 0; i < n; i) {cin >> action;if (action "push") {ci…

微信小程序毕业设计-社区门诊管理系统项目开发实战(附源码+论文)

大家好&#xff01;我是程序猿老A&#xff0c;感谢您阅读本文&#xff0c;欢迎一键三连哦。 &#x1f49e;当前专栏&#xff1a;微信小程序毕业设计 精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; &#x1f380; Python毕业设计…

从资金管理的角度 谈谈伦敦金投资技巧

刚进入伦敦金市场的时候&#xff0c;笔者认为技术分析是很重要的&#xff0c;所以将学习伦敦金投资技巧的精力全部投入到技术分析的学习中。经过一系列交易的亏损&#xff0c;笔者才发现&#xff0c;其实交易管理才是最重要的。如果管理得好&#xff0c;30%的胜率&#xff0c;投…

Liunx网络配置

文章目录 一、查看网络配置永久修改网卡临时修改网卡 二、查看主机名称 hostname三、查看路由表条目 route四、查看网络连接情况netstat五、获取socket统计信息ss六、查看当前系统中打开的文件和进程的工具lsof七、测试网络连通性ping八、跟踪数据包 traceroute九、域名解析 ns…

一个最简单的comsol斜坡稳定性分析例子——详细步骤

一个最简单的comsol斜坡稳定性分析例子——详细步骤 标准模型例子—详细步骤 线弹性模型下的地应力平衡预应力与预应变、土壤塑性和安全系数求解的辅助扫描

计算机网络之令牌环

1.令牌环工作原理 令牌环&#xff08;Token Ring&#xff09;是一种局域网&#xff08;LAN&#xff09;的通信协议&#xff0c;最初由IBM在1984年开发并标准化为IEEE 802.5标准。在令牌环网络中&#xff0c;所有的计算机或工作站被连接成一个逻辑或物理的环形拓扑结构。网络中…

Kyutai 推出了 Moshi Chat,这是一种既可以实时收听又可以说话的 AI

Kyutai 是一家专注于开放式 AI 研究的非营利性实验室&#xff0c;它推出了开源的 Moshi Chat 项目 Kyutai 是一家致力于推进人工智能 &#xff08;AI&#xff09; 开放研究的非营利性实验室&#xff0c;其最新创新 Moshi Chat 取得了重大进展。这种尖端的实时原生多模态基础模…

STM32-USART

本内容基于江协科技STM32视频学习之后整理而得。 文章目录 1. 串口通信协议1.1 通信接口1.2 串口通信1.3 硬件电路1.4 电平标准1.5 串口参数及时序1.6 串口时序 2. USART串口通信2.1 USART简介2.2 USART框图2.3 USART基本结构2.4 数据帧2.5 数据帧-配置停止位2.6 起始位侦测2.…

dell Vostro 3690安装win11 23h2 方法

下载rufus-4.5.exe刻U盘去除限制 https://www.dell.com/support/home/zh-cn/product-support/product/vostro-3690-desktop/drivers dell官网下载驱动解压到U盘 https://dl.dell.com/FOLDER09572293M/2/Intel-Rapid-Storage-Technology-Driver_88DM9_WIN64_18.7.6.1010_A00_01…

图神经网络dgl和torch-geometric安装

文章目录 搭建环境dgl的安装torch-geometric安装 在跑论文代码过程中&#xff0c;许多小伙伴们可能会遇到一些和我一样的问题&#xff0c;就是文章所需要的一些库的版本比较老&#xff0c;而新版的环境跑代码会报错&#xff0c;这就需要我们手动的下载whl格式的文件来安装相应的…

Django之项目开发(二)

目录 一、安装和使用uWSGI 1.1、安装 1.2、配置文件 1.3、启动与停止uwsgi 二、安装nginx 三、Nginx 配置uWSGI 四、Nginx配置静态文件 五、Nginx配置负载均衡 一、安装和使用uWSGI uWSGI 是一个 Web 服务器,可以用来部署 Python Web 应用。它是一个高性能的通用的 We…

Spring源码十七:Bean实例化入口探索

上一篇Spring源码十六&#xff1a;Bean名称转化我们讨论doGetBean的第一个方法transformedBeanName方法&#xff0c;了解Spring是如何处理特殊的beanName&#xff08;带&符号前缀&#xff09;与Spring的别名机制。今天我们继续往方法下面看&#xff1a; doGetBean 这个方法…

机械键盘如何挑选

机械键盘的选择是一个关键的决策&#xff0c;因为它直接影响到我们每天的打字体验。在选择机械键盘时&#xff0c;有几个关键因素需要考虑。首先是键盘的键轴类型。常见的键轴类型包括蓝轴、红轴、茶轴和黑轴等。不同的键轴类型具有不同的触发力、触发点和声音。蓝轴通常具有明…

Partisia Blockchain 现已完成第一阶段空投,即将在DeFi领域发力

Partisia Blockchain 是以 MPC 方案为基础的 Layer1 生态&#xff0c;其具备可审计的隐私特性&#xff0c;同时还能保持链的可拓展、高迸发、可互操作以及安全等系列特性&#xff0c;Partisia Blockchain 被认为是目前最具潜力的企业级公链&#xff0c;并且估值高达 16 亿美元。…

身边的故事(十四):阿文的故事:再买房

短短的一年多时间里&#xff0c;阿文仿佛从人生低谷完全走出来了。各种眼花缭乱的操作和处理事情方式让人觉得不可思议&#xff0c;是不是一个人大手大脚花钱惯了&#xff0c;让他重新回到艰苦朴素的日子是不是比死都难受呢&#xff1f;又或者像我这种靠勤勤恳恳的打工人是无法…

博客搭建-图床篇

我们的博客难免少不了图片&#xff0c;图片管理是一个不小的难题。如果我们将图片全部放到我们自己的服务器上&#xff0c;那么带宽就基本上会被图片所占满了&#xff0c;这会导致网站加载很慢&#xff08;特别是图片加载很慢&#xff09;。 ‍ 什么是图床 为了解决图片的问…

ansible常见问题配置好了密码还是报错

| FAILED! > { “msg”: “Using a SSH password instead of a key is not possible because Host Key checking is enabled and sshpass does not support this. Please add this host’s fingerprint to your known_hosts file to manage this host.” } 怎么解决&#xf…