工具类
生成的配置
要同步表为:
customer_user.tb_business_user_info
express.route_push_service
请提前自行到doris中建好目标数据库,如果没有会报错
同步的配置文件如下:(将配置内容保存为xxx.yaml文件到flink cdc提交任务)
pipeline:
name: Sync MySQL Tables To Doris
parallelism: 1
source:
type: mysql
hostname: 10.1.0.24
port: 3306
username: root
password: xxxxxxxxx
tables: customer_user.tb_business_user_info,express.route_push_service
server-id: your MYSQL serverId
server-time-zone: UTC+08
sink:
type: doris
fenodes: 10.1.0.27:8030,10.1.0.50:8030,10.1.0.244:8030
benodes: 10.1.0.27:8040,10.1.0.50:8040,10.1.0.244:8040
username: root
password: "xxxxxxxxxxx"
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 3
route:
- source-table: customer_user.tb_business_user_info
sink-table: test.ods_customer_user_tb_business_user_info
description: sync customer_user.tb_business_user_info to test.ods_customer_user_tb_business_user_info
- source-table: express.route_push_service
sink-table: test.ods_express_route_push_service
description: sync express.route_push_service to test.ods_express_route_push_service
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import lombok.Builder;
import lombok.Data;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author humorchen
* date: 2024/8/29
* description: mysql 同步到doris配置生成器
**/
public class MysqlSyncToDorisConfigUtil {
public interface GetConfig{
String getConfig();
}
public interface CheckConfig{
void checkConfig();
}
public interface InitConfig{
void initConfig();
}
@Data
@Builder
public static class MysqlSource implements GetConfig,CheckConfig,InitConfig{
private String host;
private int port;
private String username;
private String password;
private String serverId;
private String timezone;
private List<TableName> tableNames;
private static final String TEMPLATE = "source:\n" +
" type: mysql\n" +
" hostname: #{host}\n" +
" port: #{port}\n" +
" username: #{username}\n" +
" password: #{password}\n" +
" tables: #{tables}\n" +
" server-id: #{serverId}\n" +
" server-time-zone: #{timezone}\n";
@Override
public void initConfig() {
if (port< 1){
port = 3306;
}
if (StrUtil.isBlank(timezone)){
timezone = "UTC+08";
}
if (CollectionUtil.isNotEmpty(tableNames)){
List<String> collect = tableNames.stream().map(TableName::toString).distinct().collect(Collectors.toList());
CollectionUtil.sortByPinyin(collect);
this.tableNames = collect.stream().map(s -> {
String[] split = s.split("\\.");
return TableName.builder().database(split[0]).tableName(split[1]).build();
}).collect(Collectors.toList());
}
}
@Override
public void checkConfig() {
Assert.notBlank(host);
Assert.isTrue(port>100);
Assert.notBlank(username);
Assert.notBlank(password);
Assert.notBlank(serverId);
Assert.notBlank(timezone);
Assert.isTrue(tableNames != null && !tableNames.isEmpty());
}
@Override
public String getConfig() {
StringBuilder builder = new StringBuilder();
if (tableNames != null){
int i = 0;
for (TableName tableName : tableNames) {
if (i > 0){
builder.append(",");
}
i++;
builder.append(tableName.toString());
}
}
String tableNames = builder.toString();
return TEMPLATE.replace("#{host}",host).replace("#{port}",String.valueOf(port)).replace("#{username}",username).replace("#{password}",password).replace("#{tables}",tableNames).replace("#{serverId}",serverId).replace("#{timezone}",timezone);
}
}
@Data
@Builder
public static class TableName{
private String database;
private String tableName;
@Override
public boolean equals(Object o){
if (! (o instanceof TableName)){
return false;
}
TableName target = (TableName) o;
if (!Objects.equals(database,target.database)){
return false;
}
if (!Objects.equals(tableName,target.tableName)){
return false;
}
return true;
}
@Override
public int hashCode(){
int hash = 0;
if (database != null){
hash += database.hashCode();
}
if (tableName != null){
hash += tableName.hashCode();
}
return hash;
}
@Override
public String toString(){
return (database+"."+tableName).replace(" ","");
}
}
@Data
@Builder
public static class DorisSource implements GetConfig,CheckConfig{
private List<DorisFeNode> feNodeList ;
private List<DorisBeNode> beNodeList;
private String username;
private String password;
private String database;
private static final String TEMPLATE = "sink:\n" +
" type: doris\n" +
" fenodes: #{feNodeList}\n" +
" benodes: #{beNodeList}\n" +
" username: #{username}\n" +
" password: \"#{password}\"\n" +
" table.create.properties.light_schema_change: true\n" +
" table.create.properties.replication_num: 3";
@Override
public void checkConfig() {
Assert.isTrue(CollectionUtil.isNotEmpty(feNodeList));
Assert.isTrue(CollectionUtil.isNotEmpty(beNodeList));
Assert.notBlank(username);
Assert.notBlank(password);
Assert.notBlank(database);
}
@Override
public String getConfig() {
StringBuilder feNodeBuilder = new StringBuilder();
if (feNodeList != null){
for (int i = 0; i < feNodeList.size(); i++) {
if (i >0){
feNodeBuilder.append(",");
}
feNodeBuilder.append(feNodeList.get(i).toString());
}
}
StringBuilder beNodeBuilder = new StringBuilder();
if (beNodeList != null){
for (int i = 0; i < beNodeList.size(); i++) {
if (i >0){
beNodeBuilder.append(",");
}
beNodeBuilder.append(beNodeList.get(i).toString());
}
}
return TEMPLATE.replace("#{feNodeList}",feNodeBuilder.toString()).replace("#{beNodeList}",beNodeBuilder.toString()).replace("#{username}",username).replace("#{password}",password);
}
}
@Data
@Builder
public static class DorisFeNode{
private String host;
private int port;
@Override
public String toString(){
return host+":"+port;
}
}
@Data
@Builder
public static class DorisBeNode{
private String host;
private int port;
@Override
public String toString(){
return host+":"+port;
}
}
@Data
@Builder
public static class PipeLineConfig implements GetConfig,CheckConfig,InitConfig{
private String name;
private int parallelism;
private static final String TEMPLATE = "\n" +
"pipeline:\n" +
" name: #{name}\n" +
" parallelism: #{parallelism}\n";
@Override
public void checkConfig() {
Assert.notBlank(name);
Assert.isTrue(parallelism > 0);
}
@Override
public String getConfig() {
return TEMPLATE.replace("#{name}",name).replace("#{parallelism}",String.valueOf(parallelism));
}
@Override
public void initConfig() {
if (name == null){
name = "Sync MySQL Tables To Doris";
}
if (parallelism <1){
parallelism = 1;
}
}
}
@Data
@Builder
public static class RouteConfig implements GetConfig,CheckConfig,InitConfig{
private MysqlSource mysqlSource;
private DorisSource dorisSource;
private String tablePrefix;
private static final String CONFIG_PREFIX = "\nroute:\n";
private static final String TEMPLATE = " - source-table: #{sourceDatabase}.#{sourceTable}\n" +
" sink-table: #{sinkDatabase}.#{tablePrefix}#{sourceDatabase}_#{sourceTable}\n" +
" description: sync #{sourceDatabase}.#{sourceTable} to #{sinkDatabase}.#{tablePrefix}#{sourceDatabase}_#{sourceTable}\n";
@Override
public void initConfig() {
if (tablePrefix == null){
tablePrefix = "ods_";
}
}
@Override
public void checkConfig() {
Assert.notNull(tablePrefix);
}
@Override
public String getConfig() {
StringBuilder builder = new StringBuilder();
builder.append(CONFIG_PREFIX);
List<TableName> tableNames = mysqlSource.tableNames;
for (TableName tableName : tableNames) {
builder.append(TEMPLATE.replace("#{sourceDatabase}",tableName.database).replace("#{sourceTable}",tableName.tableName).replace("#{sinkDatabase}", dorisSource.database)
.replace("#{tablePrefix}",tablePrefix)
);
}
return builder.toString();
}
}
public static String syncTables(MysqlSource mysqlSource,DorisSource dorisSource,RouteConfig routeConfig){
return syncTables(mysqlSource,dorisSource,routeConfig,null);
}
public static String syncTables(MysqlSource mysqlSource,DorisSource dorisSource,RouteConfig routeConfig,PipeLineConfig pipeLineConfig){
if (pipeLineConfig == null){
pipeLineConfig = PipeLineConfig.builder().build();
}
Assert.notNull(mysqlSource);
Assert.notNull(dorisSource);
Assert.notNull(routeConfig);
mysqlSource.initConfig();
routeConfig.initConfig();
pipeLineConfig.initConfig();
pipeLineConfig.checkConfig();
mysqlSource.checkConfig();
dorisSource.checkConfig();
routeConfig.checkConfig();
return pipeLineConfig.getConfig() +
mysqlSource.getConfig() +
dorisSource.getConfig() +
routeConfig.getConfig();
}
public static Set<TableName> getTableNamesFromStr(String str){
String[] split = str.split("\n");
Set<TableName> ret = new HashSet<>();
for (String s : split) {
if (StrUtil.isNotBlank(s)){
s = s.replace(" ","").toLowerCase();
String[] ss = s.split("\\.");
ret.add(TableName.builder().database(ss[0]).tableName(ss[1]).build());
}
}
return ret;
}
public static void main(String[] args) {
String tables = "customer_user.tb_business_user_info\n" +
"express.route_push_service\n";
Set<TableName> tableNameSet = getTableNamesFromStr(tables);
MysqlSource mysqlSource = MysqlSource.builder().host("10.1.0.24").port(3306).username("root").password("xxxxxxxxx").serverId("your MYSQL serverId").tableNames(new ArrayList<>(tableNameSet)).build();
DorisSource dorisSource = DorisSource.builder().
feNodeList(Lists.newArrayList(DorisFeNode.builder().host("10.1.0.27").port(8030).build(),
DorisFeNode.builder().host("10.1.0.50").port(8030).build(),
DorisFeNode.builder().host("10.1.0.244").port(8030).build()
))
.beNodeList(Lists.newArrayList(DorisBeNode.builder().host("10.1.0.27").port(8040).build(),
DorisBeNode.builder().host("10.1.0.50").port(8040).build(),
DorisBeNode.builder().host("10.1.0.244").port(8040).build())
)
.username("root").password("xxxxxxxxxxx").database("test").build();
RouteConfig routeConfig = RouteConfig.builder().mysqlSource(mysqlSource).dorisSource(dorisSource).tablePrefix("ods_").build();
String config = syncTables(mysqlSource,dorisSource,routeConfig);
System.out.println("要同步表为:");
for (TableName tableName : mysqlSource.tableNames) {
System.out.println(tableName.toString());
}
System.out.println("请提前自行到doris中建好目标数据库,如果没有会报错");
System.out.println("同步的配置文件如下:(将配置内容保存为xxx.yaml文件到flink cdc提交任务)");
System.out.println();
System.out.println(config);
}
public static void main12(String[] args){
// 要同步的表
String tables = "customer_user.tb_business_user_info\n" +
"express.route_push_service\n" +
"funds.dunning_order_request_table\n" +
"funds.payout_order_table\n" +
"funds.pdd_settle_table\n" +
"manage.coupon_order_detail\n" +
"manage.fulfillment_info\n" +
"manage.order_report_detail\n" +
"manage.recycle_inspection_report\n" +
"manage.recycle_store_order_review\n" +
"market.device\n" +
"market.device \n" +
"market.order\n" +
"market.order_and_device\n" +
"order.order_finance_detail\n" +
"order.order_info\n" +
"order.order_status\n" +
"order.order_store_daily\n" +
"order.order_store_recycle_detail\n" +
"order.order_trade_in_detail\n" +
"storage.delivery_order\n" +
"storage.device\n" +
"storage.inspect_order\n" +
"storage.receive_order_detail\n" +
"storage.recycle_updoor_inspections \n" +
"storage.send_goods_order_detail\n" +
"storage.storage_order\n" +
"storage.unpack\n" +
"storage.warehouse\n" +
"storage.work_order_result\n";
// 同步成功的表
String now = "ods_customer_user_tb_business_user_info\n" +
"ods_express_route_push_service\n" +
"ods_funds_dunning_order_request_table\n" +
"ods_funds_payout_order_table\n" +
"ods_manage_coupon_order_detail\n" +
"ods_manage_fulfillment_info\n" +
"ods_manage_order_report_detail\n" +
"ods_manage_recycle_inspection_report\n" +
"ods_manage_recycle_store_order_review\n" +
"ods_market_device\n" +
"ods_market_order\n" +
"ods_market_order_and_device\n" +
"ods_order_order_finance_detail\n" +
"ods_order_order_info\n" +
"ods_order_order_status\n" +
"ods_order_order_store_daily\n" +
"ods_order_order_store_recycle_detail\n" +
"ods_order_order_trade_in_detail\n" +
"ods_storage_delivery_order\n" +
"ods_storage_device\n" +
"ods_storage_inspect_order\n" +
"ods_storage_receive_order_detail\n" +
"ods_storage_recycle_updoor_inspections\n" +
"ods_storage_send_goods_order_detail\n" +
"ods_storage_storage_order\n" +
"ods_storage_unpack\n" +
"ods_storage_warehouse\n" +
"ods_storage_work_order_result";
checkLostTables(tables,now);
}
/**
* 检查同步丢失的表,有时候会有报错没有提示是哪个表,这样对比看是是哪个失败了
*
* @param target
* @param now
*/
public static void checkLostTables(String target,String now){
String[] targetSplit = target.split("\n");
ArrayList<String> targetList = new ArrayList<>();
for (int i = 0; i < targetSplit.length; i++) {
targetList.add("ods_"+targetSplit[i].replace(".","_").replace(" ",""));
}
String[] nowSplit = now.split("\n");
ArrayList<String> nowList = new ArrayList<>();
for (String s : nowSplit) {
nowList.add(s.replace(" ",""));
}
CollectionUtil.sortByPinyin(targetList);
CollectionUtil.sortByPinyin(nowList);
System.out.println("target len: "+targetList.size());
System.out.println("now len: "+nowList.size());
Collection<String> subtract = CollectionUtil.subtract(targetList, nowList);
System.out.println(JSONObject.toJSONString(subtract));
for (int i = 0; i < targetList.size(); i++) {
try {
System.out.print(targetList.get(i));
}catch (Exception e){}
System.out.print(" \t ");
try {
System.out.print(nowList.get(i));
}catch (Exception e){}
System.out.println( );
}
}
}