国产数据库如达梦、金仓数据库,开源数据迁移工具貌似支持的很少,手写工具类进行数据库脚本的迁移,主要有2个类如下:
/**
* 模拟flyway进行sql迁移
*/
@Component
@Slf4j
public class SqlInitialize implements InitializingBean, Ordered {
/**
* 初始化SQL文件
*
* @param sqlFileName sql文件名,规范:xxx.sql
*/
@Resource
private JdbcTemplate template;
@Value("${sql.log.path}")
private String sqlLogPath;
private static Map<String, String> map = null;
private String validateSqlFileName = "V3.3.0.0__inti_baseline.sql";
@Override
public void afterPropertiesSet() throws Exception {
log.info("<<=============bengin to init sql==============>>");
DataSource dataSource = template.getDataSource();
assert dataSource != null;
try (Connection connection = dataSource.getConnection()) {
JpaScriptRunner runner = new JpaScriptRunner(connection);
File file = new File(sqlLogPath);
//判断父级目录是否存在
String parentPath = file.getParent();
File folder = new File(parentPath);
if (!folder.exists()) {
if (folder.mkdirs()) {
log.info("文件夹已创建:" + folder.getAbsolutePath());
} else {
log.error("文件夹创建失败:" + folder.getAbsolutePath());
}
} else {
log.debug("文件夹已存在:" + folder.getAbsolutePath());
}
if (!file.exists()) {
log.debug(sqlLogPath);
file.createNewFile();
log.debug("文件已创建:" + file.getAbsolutePath());
}
runner.setErrorLogWriter(new PrintWriter(new BufferedOutputStream(new FileOutputStream(sqlLogPath))));
runner.setLogWriter(null);
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
org.springframework.core.io.Resource[] resources = resolver.getResources(getDatasourceName(runner) + File.separator + "*.sql");
org.springframework.core.io.Resource resourceVa = resolver.getResource(getDatasourceName(runner) + File.separator + validateSqlFileName);
initValideInfo(runner, resourceVa);
if (resources.length == 0) {
log.error("[{}] is not a directory,please check your config of sql file location", getDatasourceName(runner));
} else {
sort(resources);
if (resources == null || resources.length == 0) {
log.error("no sql file found");
} else {
for (org.springframework.core.io.Resource item : resources) {
String sqlFileName = item.getFilename();
if (!StringUtils.equals(sqlFileName, validateSqlFileName)) {
String md5 = SecureUtil.md5(sqlFileName);
if (map != null && map.containsKey(sqlFileName)) {
String hash = map.get(sqlFileName);
if (!StringUtils.equals(hash, md5)) {
throw new Exception("sql validate failed");
} else {
continue;
}
}
runner.runScript(IoUtil.getReader(item.getInputStream(), Charset.forName("UTF-8")));
template.update("insert into \"sql_schema_validate\"(\"name\", \"hash\") values(?, ?);", sqlFileName, md5);
log.info("execute {} sucess", sqlFileName);
}
}
}
}
log.info("<<=============init sql end==============>>");
} catch (Exception e) {
log.error(e.getMessage());
throw e;
} finally {
if (map != null) {
map = null;
}
}
}
@Override
public int getOrder() {
return 0;
}
private void initValideInfo(JpaScriptRunner runner, org.springframework.core.io.Resource item) {
try {
runner.runScript(IoUtil.getReader(item.getInputStream(), Charset.forName("UTF-8")));
String sql = "select aaa.\"name\", aaa.\"hash\" from \"sql_schema_validate\" aaa";
template.query(sql, new RowMapper<Map>() {
@Override
public Map mapRow(ResultSet resultSet, int i) throws SQLException {
String name = resultSet.getString("name");
String hash = resultSet.getString("hash");
if (map == null) {
map = new HashMap<String, String>();
}
map.put(name, hash);
return map;
}
});
} catch (Exception e) {
log.info(e.getMessage());
}
}
private void sort(org.springframework.core.io.Resource[] files) {
for (int i = 0; i < files.length - 1; ++i) {
for (int j = i + 1; j < files.length; ++j) {
String o1 = files[i].getFilename();
String o2 = files[j].getFilename();
int i1 = StringUtils.indexOf(o1, "__");
int i2 = StringUtils.indexOf(o2, "__");
String digtal1 = StringUtils.substring(o1, 7, i1);
String digtal2 = StringUtils.substring(o2, 7, i2);
if (Integer.valueOf(digtal1) > Integer.valueOf(digtal2)) {
org.springframework.core.io.Resource tmp = files[i];
files[i] = files[j];
files[j] = tmp;
}
}
}
}
public static String getDatasourceName(JpaScriptRunner runner) {
String datasourceName = "";
String dbType = "postgreSQL";
if (!Strings.isNullOrEmpty(System.getenv("DB_TYPE"))) {
dbType = System.getenv("DB_TYPE");
}
switch (dbType) {
//多种类数据库适配
case "postgreSQL":
runner.setAutoCommit(true);
runner.setSendFullScript(true);
datasourceName = "postgreSQL";
break;
case "dm":
runner.setAutoCommit(false);
runner.setSendFullScript(false);
datasourceName = "dm";
break;
default:
throw new RuntimeException("不支持的数据库类型" + dbType);
}
return datasourceName;
}
}
jpaRunner:
@Slf4j
public class JpaScriptRunner {
private static final String LINE_SEPARATOR = System.lineSeparator();
private static final String DEFAULT_DELIMITER = ";";
private static final Pattern DELIMITER_PATTERN = Pattern.compile("^\\s*((--)|(//))?\\s*(//)?\\s*@DELIMITER\\s+([^\\s]+)", Pattern.CASE_INSENSITIVE);
private final Connection connection;
private boolean stopOnError;
private boolean throwWarning;
private boolean autoCommit;
private boolean sendFullScript;
private boolean removeCRs;
private boolean escapeProcessing = true;
private PrintWriter logWriter = new PrintWriter(System.out);
private PrintWriter errorLogWriter = new PrintWriter(System.err);
private String delimiter = DEFAULT_DELIMITER;
private boolean fullLineDelimiter;
public JpaScriptRunner(Connection connection) {
this.connection = connection;
}
public void setStopOnError(boolean stopOnError) {
this.stopOnError = stopOnError;
}
public void setThrowWarning(boolean throwWarning) {
this.throwWarning = throwWarning;
}
public void setAutoCommit(boolean autoCommit) {
this.autoCommit = autoCommit;
}
public void setSendFullScript(boolean sendFullScript) {
this.sendFullScript = sendFullScript;
}
public void setRemoveCRs(boolean removeCRs) {
this.removeCRs = removeCRs;
}
/**
* Sets the escape processing.
*
* @param escapeProcessing
* the new escape processing
* @since 3.1.1
*/
public void setEscapeProcessing(boolean escapeProcessing) {
this.escapeProcessing = escapeProcessing;
}
public void setLogWriter(PrintWriter logWriter) {
this.logWriter = logWriter;
}
public void setErrorLogWriter(PrintWriter errorLogWriter) {
this.errorLogWriter = errorLogWriter;
}
public void setDelimiter(String delimiter) {
this.delimiter = delimiter;
}
public void setFullLineDelimiter(boolean fullLineDelimiter) {
this.fullLineDelimiter = fullLineDelimiter;
}
public void runScript(Reader reader) throws Exception {
setAutoCommit();
try {
if (sendFullScript) {
executeFullScript(reader);
} else {
executeLineByLine(reader);
}
} finally {
rollbackConnection();
}
}
private void executeFullScript(Reader reader) throws Exception {
StringBuilder script = new StringBuilder();
try {
BufferedReader lineReader = new BufferedReader(reader);
String line;
while ((line = lineReader.readLine()) != null) {
script.append(line);
script.append(LINE_SEPARATOR);
}
String command = script.toString();
println(command);
executeStatement(command);
commitConnection();
} catch (Exception e) {
String message = "Error executing: " + script + ". Cause: " + e;
printlnError(message);
throw new Exception(message, e);
}
}
private void executeLineByLine(Reader reader) throws Exception {
StringBuilder command = new StringBuilder();
try {
BufferedReader lineReader = new BufferedReader(reader);
String line;
while ((line = lineReader.readLine()) != null) {
handleLine(command, line);
}
commitConnection();
} catch (Exception e) {
String message = "Error executing: " + command + ". Cause: " + e;
printlnError(message);
throw new Exception(message, e);
}
}
/**
* @deprecated Since 3.5.4, this method is deprecated. Please close the {@link Connection} outside of this class.
*/
@Deprecated
public void closeConnection() {
try {
connection.close();
} catch (Exception e) {
// ignore
}
}
private void setAutoCommit() throws Exception {
try {
if (autoCommit != connection.getAutoCommit()) {
connection.setAutoCommit(autoCommit);
}
} catch (Throwable t) {
throw new Exception("Could not set AutoCommit to " + autoCommit + ". Cause: " + t, t);
}
}
private void commitConnection() throws Exception {
try {
if (!connection.getAutoCommit()) {
connection.commit();
}
} catch (Throwable t) {
throw new Exception("Could not commit transaction. Cause: " + t, t);
}
}
private void rollbackConnection() {
try {
if (!connection.getAutoCommit()) {
connection.rollback();
}
} catch (Throwable t) {
// ignore
}
}
private void handleLine(StringBuilder command, String line) throws SQLException {
String trimmedLine = line.trim();
if (lineIsComment(trimmedLine)) {
Matcher matcher = DELIMITER_PATTERN.matcher(trimmedLine);
if (matcher.find()) {
delimiter = matcher.group(5);
}
println(trimmedLine);
} else if (commandReadyToExecute(trimmedLine)) {
command.append(line, 0, line.lastIndexOf(delimiter));
command.append(LINE_SEPARATOR);
println(command);
executeStatement(command.toString());
command.setLength(0);
} else if (trimmedLine.length() > 0) {
command.append(line);
command.append(LINE_SEPARATOR);
}
}
private boolean lineIsComment(String trimmedLine) {
return trimmedLine.startsWith("//") || trimmedLine.startsWith("--");
}
private boolean commandReadyToExecute(String trimmedLine) {
// issue #561 remove anything after the delimiter
return !fullLineDelimiter && trimmedLine.contains(delimiter) || fullLineDelimiter && trimmedLine.equals(delimiter);
}
private void executeStatement(String command) throws SQLException {
try (Statement statement = connection.createStatement()) {
statement.setEscapeProcessing(escapeProcessing);
String sql = command;
if (removeCRs) {
sql = sql.replace("\r\n", "\n");
}
try {
boolean hasResults = statement.execute(sql);
while (!(!hasResults && statement.getUpdateCount() == -1)) {
checkWarnings(statement);
printResults(statement, hasResults);
hasResults = statement.getMoreResults();
}
} catch (SQLWarning e) {
throw e;
} catch (SQLException e) {
if (stopOnError) {
throw e;
} else {
String message = "Error executing: " + command + ". Cause: " + e;
printlnError(message);
}
}
}
}
private void checkWarnings(Statement statement) throws SQLException {
if (!throwWarning) {
return;
}
// In Oracle, CREATE PROCEDURE, FUNCTION, etc. returns warning
// instead of throwing exception if there is compilation error.
SQLWarning warning = statement.getWarnings();
if (warning != null) {
throw warning;
}
}
private void printResults(Statement statement, boolean hasResults) {
if (!hasResults) {
return;
}
try (ResultSet rs = statement.getResultSet()) {
ResultSetMetaData md = rs.getMetaData();
int cols = md.getColumnCount();
for (int i = 0; i < cols; i++) {
String name = md.getColumnLabel(i + 1);
print(name + "\t");
}
println("");
while (rs.next()) {
for (int i = 0; i < cols; i++) {
String value = rs.getString(i + 1);
print(value + "\t");
}
println("");
}
} catch (SQLException e) {
printlnError("Error printing results: " + e.getMessage());
}
}
private void print(Object o) {
if (logWriter != null) {
logWriter.print(o);
logWriter.flush();
}
}
private void println(Object o) {
if (logWriter != null) {
logWriter.println(o);
logWriter.flush();
}
}
private void printlnError(Object o) {
if (errorLogWriter != null) {
errorLogWriter.println(o);
errorLogWriter.flush();
}
}
}
基线表:
CREATE TABLE "sql_schema_validate" (
"name" varchar(256),
"hash" varchar(50) NOT NULL,
PRIMARY KEY ("name")
);
数据库文件列表使用fly的命名柜子进行如下:
方便进行数据库的切换,其中sqlLogPath为记录出错的sql的记录文件,这样基本上满足sql迁移的功能,后续有问题再调整