达梦数据库自定义插件
达梦8的依赖引入
定义reader module
定义writer module
修改核心配置数据库类型支持
打包插件
测试
以mysql到dm数据库为例
配置mysql2dm.json
执行任务
查询下结果
DataX二次开发之达梦数据库插件
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,支持大部分主流的数据库之间的数据同步,也提供了数据库插件的自定义开发。
达梦数据库自定义插件
插件自定义开发详细看官网dataxPluginDev.md说明文档
达梦8的依赖引入
<!-- dm driver -->
<!-- https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 -->
<dependency>
<groupId>com.dameng</groupId>
<artifactId>DmJdbcDriver18</artifactId>
<version>8.1.3.140</version>
</dependency>
定义reader module
定义writer module
修改核心配置数据库类型支持
public enum DataBaseType {
Dm("dm", "dm.jdbc.driver.DmDriver"),
//...省略其他
private String typeName;
private String driverClassName;
DataBaseType(String typeName, String driverClassName) {
this.typeName = typeName;
this.driverClassName = driverClassName;
}
public String getDriverClassName() {
return this.driverClassName;
}
public String appendJDBCSuffixForReader(String jdbc) {
case Oracle:
break;
case Dm:
//...省略其他
default:
throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type.");
}
return result;
}
public String appendJDBCSuffixForWriter(String jdbc) {
String result = jdbc;
String suffix = null;
switch (this) {
case Oracle:
break;
case Dm:
break;
//...省略其他
default:
throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type.");
}
return result;
}
public String formatPk(String splitPk) {
String result = splitPk;
switch (this) {
case MySql:
case Dm:
if (splitPk.length() >= 2 && splitPk.startsWith("`") && splitPk.endsWith("`")) {
result = splitPk.substring(1, splitPk.length() - 1).toLowerCase();
}
break;
//...省略其他
default:
throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type.");
}
return result;
}
public String quoteColumnName(String columnName) {
String result = columnName;
switch (this) {
case Dm:
//...省略其他
default:
throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type");
}
return result;
}
public String quoteTableName(String tableName) {
String result = tableName;
switch (this) {
case MySql:
result = "`" + tableName.replace("`", "``") + "`";
break;
case Oracle:
break;
case Dm:
break;
//...省略其他
default:
throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type");
}
return result;
}
private static Pattern mysqlPattern = Pattern.compile("jdbc:mysql://(.+):\\d+/.+");
private static Pattern oraclePattern = Pattern.compile("jdbc:oracle:thin:@(.+):\\d+:.+");
private static Pattern dmPattern = Pattern.compile("jdbc:dm://:@(.+):\\d+:.+");
/**
* 注意:目前只实现了从 mysql/oracle 中识别出ip 信息.未识别到则返回 null.
*/
public static String parseIpFromJdbcUrl(String jdbcUrl) {
Matcher mysql = mysqlPattern.matcher(jdbcUrl);
if (mysql.matches()) {
return mysql.group(1);
}
Matcher oracle = oraclePattern.matcher(jdbcUrl);
if (oracle.matches()) {
return oracle.group(1);
}
Matcher dm = dmPattern.matcher(jdbcUrl);
if (dm.matches()) {
return dm.group(1);
}
return null;
}
//...省略其他
}
打包插件
需要在父类的插件里边配置模块以及插件打包配置
maven执行以下命令
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
测试
以mysql到dm数据库为例
mysql建立表并插入数据
-- test.psn definition
CREATE TABLE `psn` (
`id` int(11) NOT NULL,
`name` text,
`address` text
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO psn
(id, name, address)
VALUES(1, 'elite', 'gz');
INSERT INTO psn
(id, name, address)
VALUES(2, 'tom', 'bj');
INSERT INTO psn
(id, name, address)
VALUES(3, 'jack', 'sz');
INSERT INTO psn
(id, name, address)
VALUES(4, 'json', 'sh');
在达梦数据库里边创建一个表psn
CREATE TABLE test.psn
(
id int NOT NULL,
name VARCHAR(40) NULL,
address VARCHAR(100)
);
配置mysql2dm.json
关系型数据库插件都差不多的配置
{
"job": {
"setting": {
"speed": {
"channel":2
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"splitPk": "id",
"column": ["id","name","address"],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://mysqlip:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false"],
"table": ["psn"]
}
]
}
},
"writer": {
"name": "dmwriter",
"parameter": {
"username": "test",
"password": "123456@dm",
"column": ["id","name","address"],
"connection": [
{
"table": [
"psn"
],
"jdbcUrl": "jdbc:dm://ip:5236?schema=TEST"
}
]
}
}
}
]
}
}
执行任务
执行任务可以用命令测试,详细可以参考官网,或者用java,本例以java为例