1、postgres SQL 支持 插入更新操作(与mysql 语法有一定差异)
可参考下面文章
MySQL + PostgreSQL批量插入更新insertOrUpdate_mysql insert update-CSDN博客
2、datax中,可通过源码调整来实现
参考来源
https://juejin.cn/post/7124899170615296013
3、源码调整注意事项
datax : 版本
源码下载,自行用idea进行打包编译,修改完如下类,
com.alibaba.datax.plugin.writer.postgresqlwriter.PostgresqlWriter
com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil
编译替换jar文件名:
postgresqlwriter-0.0.1-SNAPSHOT.jar
plugin-rdbms-util-0.0.1-SNAPSHOT.jar
目录树如下:(plugin/writer/postgresqlwriter)
find <目录路径> | sed -e 's/[^-][^\/]*\//--/g' -e 's/--/|-/'
|-lib
|-bin
|-job
|-conf
|-log
|-log_perf
|-tmp
|-script
|-plugin
|---writer
|-----postgresqlwriter
|-------plugin_job_template.json
|-------plugin.json
|-------libs
|---------checker-qual-3.5.0.jar
|---------postgresql-42.3.3.jar
|---------commons-collections-3.0.jar
|---------druid-1.0.15.jar
|---------commons-lang3-3.3.2.jar
|---------logback-core-1.0.13.jar
|---------commons-io-2.4.jar
|---------datax-common-0.0.1-SNAPSHOT.jar
|---------guava-r05.jar
|---------plugin-rdbms-util-0.0.1-SNAPSHOT.jar
|---------hamcrest-core-1.3.jar
|---------logback-classic-1.0.13.jar
|---------commons-math3-3.1.1.jar
|---------slf4j-api-1.7.10.jar
|---------fastjson2-2.0.23.jar
|-------postgresqlwriter-0.0.1-SNAPSHOT.jar
4、使用
4.1 、可以支持带有唯一索引的表的新增或者更新
{
"job":{
"content":[
{
"reader":{
"name":"txtfilereader",
"parameter":{
"username": "root",
"password": "数据库密码",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://192.168.5.180:3306/xxxx?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8"
],
"querySql": [
" SELECT * from sys_test_copy1"
]
}
}
},
"writer":{
"name":"postgresqlwriter",
"parameter":{
"writeMode": "update!@#(id)!@#(name)",
"column":[
"id",
"name"
],
"connection":[
{
"jdbcUrl":"jdbc:postgresql://127.0.0.1:5432/postgres",
"table":[
"test_datax"
]
}
],
"password":"xxxx",
"username":"postgres"
}
}
}
],
"setting":{
"speed":{
"channel":6
}
}
}
}
4.2、根据主键进行新增或者更新
INSERT INTO sys_test_copy1(user_id, email) VALUES (5592, 'xxxx5@hotmail.com') ON CONFLICT (user_id) do nothing;
对应的 job 可以按如下结构编写
{
"job": {
"setting": {
"speed": {
"channel": 5
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "数据库密码",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://192.168.5.180:3306/xxxx?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8"
],
"querySql": [
" SELECT * from sys_test_copy1"
]
}
]
}
},
"writer": {
"name": "postgresqlwriter",
"parameter": {
"username": "postgres",
"password": "数据库密码",
"writeMode": "insert!@#(user_id)",
"column": [
"*"
],
"connection": [
{
"table": [
"sys_test_copy1"
],
"jdbcUrl": "jdbc:postgresql://192.168.5.190:5432/xxxx",
}
]
}
}
}
]
}
}
其实都是写的 insert into on CONFLICT 语句
com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil
下面的代码后续调整下规则,
private static String onDuplicateKeyUpdateString(String writeMode, List<String> columnHolders) {
String[] writeModeArr = writeMode.split("!@#", -1);
int writeModeArrLen = writeModeArr.length;
writeMode = writeModeArr[0];
StringBuilder sb = new StringBuilder();
if ("insert".equals(writeMode) && writeModeArrLen == 2) {
sb.append(" ON CONFLICT ").append(writeModeArr[1]).append(" do nothing");
}
if ("update".equals(writeMode) && writeModeArrLen == 3) {
sb.append(" ON CONFLICT ").append(writeModeArr[1]);
String[] updateFieldArr = writeModeArr[2].replace("(","").replace(")","").split(",", -1);
List<String> updateSqlList = new ArrayList<>();
for (String updateField : updateFieldArr) {
if (!columnHolders.contains(updateField)) {
continue;
}
updateSqlList.add(updateField + "=EXCLUDED." + updateField);
}
if (updateSqlList.isEmpty()) {
sb.append(" DO NOTHING");
} else {
sb.append(" DO UPDATE SET ").append(StringUtils.join(updateSqlList, ","));
}
}
return sb.toString();
}
小结:
pg插件,目前不支持插入更新操作,需要手工调整源码来适配。适配注意点,是根据你是否配置唯一索引来决定。(insert or update)
下期将简单介绍下,如果通过xxl-job 来执行 脚本
python datax.py ./job/mysql_postgres_job.json