从MySql到PostgreSQL迁移的决策过程就不说了。我也是第一次用PostgreSQL,也没法说好不好。决策已经定了,下面介绍一下执行过程。
一、数据基本情况
服务器:4核CPU,8G内存,1T硬盘,8Mbit网速。
数据库:MySql-5.5-community,数据量492GB,包含索引、日志。
由于服务器硬盘容量已不足300GB,没有办法在服务器上同时运行MySql和PostgreSQL完成迁移,所以只在本地运行PostgreSQL,并将数据先迁移到本地。
二、采用通用代码迁移。
因为熟悉,决定采用Java迁移。(为了减少工作量,选择站在巨人的肩膀上。)搜索到了这么一篇文章:自己动手写一个Mysql到PostgreSQL数据库迁移工具,看起来不错,拷贝到本地,稍做适配、改进,对主键为整形的数据表,采用增量方式进行迁移,代码如下:
package springDemo;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.util.Assert;
import com.zaxxer.hikari.HikariDataSource;
public class DataTableMigration{
private static Logger LOG = LoggerFactory.getLogger(DataTableMigration.class);
private final JdbcTemplate targetJdbc;
private final JdbcTemplate sourceJdbc;
private final String tableName;
private final String primaryKey;
private final String[] columnNamesInSourceDB;
private final String[] columnNamesInTargetDB;
private final Map<String, String> columnMappings;
public DataTableMigration(DataSource sourceDataSource, String tableName, DataSource targetDataSource) throws SQLException {
this.tableName = tableName.toLowerCase();
if (sourceDataSource instanceof HikariDataSource) {
HikariDataSource hikariDataSource = (HikariDataSource) sourceDataSource;
hikariDataSource.setMaxLifetime(86400000); // 设置为24小时
hikariDataSource.setConnectionTimeout(600000);
hikariDataSource.setReadOnly(true);
}
if (targetDataSource instanceof HikariDataSource) {
HikariDataSource hikariDataSource = (HikariDataSource) targetDataSource;
hikariDataSource.setMaxLifetime(86400000); // 设置为24小时
hikariDataSource.setConnectionTimeout(600000);
}
this.sourceJdbc = new JdbcTemplate(sourceDataSource);
this.targetJdbc = new JdbcTemplate(targetDataSource);
System.out.println(sourceDataSource);
System.out.println(targetDataSource);
this.primaryKey = MigrationUtils.getPrimaryKeyByTableName(sourceDataSource.getConnection(), this.tableName);
this.columnNamesInSourceDB = MigrationUtils.getColumnsByTableName(sourceDataSource.getConnection(), this.tableName);
Assert.isTrue(this.columnNamesInSourceDB != null && this.columnNamesInSourceDB.length > 0,
"can't find column infor from source db for the table " + this.tableName);
this.columnNamesInTargetDB = MigrationUtils.getColumnsByTableName(targetDataSource.getConnection(), this.tableName);
Assert.isTrue(this.columnNamesInTargetDB != null && this.columnNamesInTargetDB.length > 0,
"can't find column infor from target db for the table " + this.tableName);
this.columnMappings = new HashMap<>();
}
protected JdbcTemplate getSourceJdbc() {
return this.sourceJdbc;
}
protected JdbcTemplate getTargetJdbc() {
return this.targetJdbc;
}
protected List<Map<String, Object>> queryForList(String querySql, long offset, long stepLength) {
return getSourceJdbc().queryForList(querySql, offset, stepLength);
}
private Object[] rowToParam(Map<String, Object> row) {
return Arrays.stream(columnNamesInTargetDB)
.map(colInSource -> columnMappings.getOrDefault(colInSource, colInSource))
.map(row::get)
.toArray();
}
protected String getInsertSQL() {
return String.format("insert into %s (%s) values(%s) ",
this.tableName,
String.join(",", columnNamesInTargetDB),
IntStream.range(0, columnNamesInTargetDB.length)
.mapToObj(n -> "?")
.collect(Collectors.joining(",")));
}
protected String getInsertSQLOnCconflict() {
return String.format("insert into %s (%s) values(%s) ON CONFLICT (%s) DO NOTHING",
this.tableName,
String.join(",", columnNamesInTargetDB),
IntStream.range(0, columnNamesInTargetDB.length).mapToObj(n -> "?").collect(Collectors.joining(",")),
this.primaryKey);
}
protected int getStepLength() {
return 1000000;
}
protected long getSourceMaxIndex() {
long count = getSourceJdbc().queryForObject("select max(" + primaryKey + ") from " + tableName, Long.class);
return count;
}
protected long getTargetMaxIndex() {
long count = getTargetJdbc().queryForObject("select count(1) from " + tableName, Long.class);
if (count > 0)
count = getTargetJdbc().queryForObject("select max(" + primaryKey + ") from " + tableName, Long.class);
else
count = getSourceJdbc().queryForObject("select min(" + primaryKey + ") from " + tableName, Long.class) - 1;
return count;
}
public void migrateIntegerIndexTable() throws Exception {
LOG.info("start to migrate data from source db to target db");
String sql = String.format("select %s from %s where %s > ? order by %s asc limit ?;",
String.join(",", columnNamesInSourceDB), this.tableName, this.primaryKey, this.primaryKey);
long maxRecords = getSourceMaxIndex();
long stepLength = getStepLength();
for (long offset = getTargetMaxIndex(); offset < maxRecords; offset = getTargetMaxIndex()) {
List<Map<String, Object>> rows = queryForList(sql, offset, stepLength);
LOG.info("get records From source");
getTargetJdbc().batchUpdate(getInsertSQL(),
rows.stream().map(this::rowToParam).collect(Collectors.toList()));
LOG.info("moved {} records", offset);
}
}
public void migrateIntegerIndexTableJust1Line(long id) throws Exception {
LOG.info("start to migrate data from source db to target db");
String sql = String.format("select %s from %s where %s = ? limit ?;",
String.join(",", columnNamesInSourceDB), this.tableName, this.primaryKey);
List<Map<String, Object>> rows = queryForList(sql, id, 1);
LOG.info("get records From source");
getTargetJdbc().batchUpdate(getInsertSQL(),
rows.stream().map(this::rowToParam).collect(Collectors.toList()));
LOG.info("moved {} record", id);
}
// 从原库获取总数量。
protected int getSourceTotalRecords() {
int count = getSourceJdbc().queryForObject("select count(1) from " + tableName, Integer.class);
LOG.info("source db has {} records", count);
return count;
}
// 从目标库获取已经存储的数量。
protected int getTargetTotalRecords() {
int count = getTargetJdbc().queryForObject("select count(1) from " + tableName, Integer.class);
LOG.info("target db has {} records", count);
return count;
}
public void migrateStringIndexTable() throws SQLException {
LOG.info("start to migrate data from source db to target db");
String sql = String.format("select %s from %s order by %s asc limit ?, ?;",
String.join(",", columnNamesInSourceDB), this.tableName, this.primaryKey);
int maxRecords = getSourceTotalRecords();
int stepLength = getStepLength();
for (int offset = 0; offset < maxRecords; offset = offset + stepLength) {
List<Map<String, Object>> rows = queryForList(sql, offset, stepLength);
LOG.info("get records From source, " + rows.size());
getTargetJdbc().batchUpdate(getInsertSQLOnCconflict(),
rows.stream().map(this::rowToParam).collect(Collectors.toList()));
LOG.info("moved {} records", offset);
}
}
public void close() {
try {
if (sourceJdbc != null) {
sourceJdbc.getDataSource().getConnection().close();
}
if (targetJdbc != null) {
targetJdbc.getDataSource().getConnection().close();
}
} catch (SQLException e) {
LOG.error("Error closing database connection", e);
}
}
public static void main(String[] args) {
LOG.atInfo();
Config cf = new Config();
System.setProperty("spring.jdbc.getParameterType.ignore","true");
try {
DataTableMigration dtmStr = new DataTableMigration(cf.sourceDataSource(), "target", cf.targetDataSource());
dtmStr.migrateStringIndexTable();
dtmStr.close();
String[] tableNames = { "dailyexchange", "movingavg", "stats" };
for (int i = 0; i < tableNames.length; i++) {
DataTableMigration dtmInt = new DataTableMigration(cf.sourceDataSource(), tableNames[i], cf.targetDataSource());
dtmInt.migrateIntegerIndexTable();
dtmInt.close();
}
// DataTableMigration dtmInt = new DataTableMigration(cf.sourceDataSource(), "min1", cf.targetDataSource());
// dtmInt.migrateIntegerIndexTable();
// dtmInt.close();
} catch (SQLException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
开始几个数据表,由于规模较小只有几百万行,几个小时就迁移完成。下面开始迁移最大的数据表min1,有34亿行。这个速度就无法接受了。考虑到每次通讯会耗费时间,所以尽量加大每批次传输量。调整每批次迁移数量到100万行后(最大是1048576),稍微提高了传输速度,达到10分钟每百万行。如下:
HikariDataSource (null)
HikariDataSource (null)
2023-04-12T07:31:49.370+08:00 INFO --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2023-04-12T07:31:50.701+08:00 INFO --- [ main] com.zaxxer.hikari.pool.HikariPool : HikariPool-1 - Added connection com.mysql.cj.jdbc.ConnectionImpl@3cce5371
2023-04-12T07:31:50.704+08:00 INFO --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2023-04-12T07:31:51.056+08:00 INFO --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-2 - Starting...
2023-04-12T07:31:51.148+08:00 INFO --- [ main] com.zaxxer.hikari.pool.HikariPool : HikariPool-2 - Added connection org.postgresql.jdbc.PgConnection@19b93fa8
2023-04-12T07:31:51.148+08:00 INFO --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-2 - Start completed.
2023-04-12T07:31:51.164+08:00 INFO --- [ main] springDemo.DataTableMigration : start to migrate data from source db to target db
2023-04-12T07:40:24.912+08:00 WARN --- [ main] com.zaxxer.hikari.pool.PoolBase : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@3016fd5e (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:29.923+08:00 WARN --- [ main] com.zaxxer.hikari.pool.PoolBase : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@6c45ee6e (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:34.928+08:00 WARN --- [ main] com.zaxxer.hikari.pool.PoolBase : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@6b3e12b5 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:39.933+08:00 WARN --- [ main] com.zaxxer.hikari.pool.PoolBase : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@5aac4250 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:44.936+08:00 WARN --- [ main] com.zaxxer.hikari.pool.PoolBase : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@1338fb5 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:49.938+08:00 WARN --- [ main] com.zaxxer.hikari.pool.PoolBase : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@42463763 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:54.941+08:00 WARN --- [ main] com.zaxxer.hikari.pool.PoolBase : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@59f63e24 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:59.947+08:00 WARN --- [ main] com.zaxxer.hikari.pool.PoolBase : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@7ca33c24 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:41:20.733+08:00 INFO --- [ main] springDemo.DataTableMigration : get records From source
2023-04-12T07:41:33.743+08:00 INFO --- [ main] springDemo.DataTableMigration : moved 2990509187 records
以这个速度传输完34亿行数据大概需要24天(是的吧(34亿/100万)*10分钟/1440分钟),仍然无法接受。参考相关文章(找不见了),了解到采用通用代码迁移数据,将会有大量时间用于构建List<Map<String, Object>>映射。
三、编写专用代码迁移。
想偷懒采用通用代码,对大表看来不太行。所以没有办法,不得不编写专门的迁移代码,鸣谢:ChatGTP。代码具体如下:
package pack;
import java.sql.*;
import java.time.LocalDate;
import java.time.LocalTime;
import java.io.InputStream;
import java.util.Properties;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
public class MysqlToPostgres {
/*
* 要通过批量插入的方式将MySQL数据库的数据表迁移到PostgreSQL数据库中,你需要基于JDBC技术开发一个Java程序。
*
* 以下是迁移过程中的步骤:
*
* 1. 使用JDBC连接MySQL数据库和PostgreSQL数据库。
*
* 2. 从MySQL数据库中读取要迁移的数据表。
*
* 3. 将MySQL数据表中的数据批量读取出来。
*
* 4. 将数据批量插入到PostgreSQL数据库中的相应数据表中。
*
*/
private static final Logger logger = Logger.getLogger(Min1.class);
private Connection mysqlConn = null;
private Connection pgConn = null;
public static void main(String args[]) {
System.out.println(System.getProperty("java.class.path"));
PropertyConfigurator.configure("log4j.properties");
MysqlToPostgres M2P = new MysqlToPostgres();
M2P.init();
long flag = M2P.getTargetMaxIndex();
long end = M2P.getSourceMaxIndex();
logger.info("source line count:" + end);
for (; flag < end; flag = M2P.getTargetMaxIndex()) {
logger.info("target line count:" + flag);
M2P.migrate(flag);
// break;
}
M2P.uninit();
}
public void init() {
Properties props = new Properties();
InputStream input = null;
try {
String filename = "consts.properties";
input = MysqlToPostgres.class.getClassLoader().getResourceAsStream(filename);
if (input == null) {
System.out.println("Sorry, unable to find " + filename);
return;
}
// load the properties file
// get the property value and print it out
props.load(input);
String sourceIP = props.getProperty("sourceIP");
String targetIP = props.getProperty("targetIP");
String username = props.getProperty("DBUserName");
String password = props.getProperty("DBPassword");
System.out.println(getMinute() + " " + username);
// 连接MySQL数据库
Class.forName("com.mysql.jdbc.Driver");
mysqlConn = DriverManager.getConnection("jdbc:mysql://" + sourceIP + "/cf_stock?useCompression=true", username, password);
// 连接PostgreSQL数据库
Class.forName("org.postgresql.Driver");
pgConn = DriverManager.getConnection("jdbc:postgresql://" + targetIP + "/cf_stock", username, password);
} catch (Exception e) {
e.printStackTrace();
}
}
protected long getSourceMaxIndex() {
long count = 0;
Statement mysqlStmt = null;
try {
mysqlStmt = mysqlConn.createStatement();
// 批量读取MySQL数据表中的数据
ResultSet mysqlRs = mysqlStmt.executeQuery("select max(recordID) from min1;");
if (mysqlRs.next()) {
count = mysqlRs.getLong("max(recordID)");
}
mysqlStmt.close();
} catch (Exception e) {
e.printStackTrace();
}
return count;
}
protected long getTargetMaxIndex() {
long count = 0;
Statement pgStmt = null;
try {
pgStmt = pgConn.createStatement();
// 批量读取MySQL数据表中的数据
ResultSet pgRs = pgStmt.executeQuery("select max(recordID) from min1;");
if (pgRs.next()) {
count = pgRs.getLong(1);
}
pgStmt.close();
} catch (Exception e) {
e.printStackTrace();
}
return count;
}
public void migrate(long flag) {
PreparedStatement pgStmt = null;
PreparedStatement mysqlStmt = null;
try {
String sql = "INSERT INTO min1 "
+ "(recordID, dayRecordID, targetID, date, minute, "
+ "open, high, low, close, average, shareVolume, moneyVolume, openInterest) "
+ "VALUES (?,?,?,?,?, ?,?,?,?, ?,?,?,?) ";
pgStmt = pgConn.prepareStatement(sql);
// 批量读取MySQL数据表中的数据
String mysqlSql = "select * from min1 where recordID > ? order by recordID asc limit 1000000;";
mysqlStmt = mysqlConn.prepareStatement(mysqlSql);
mysqlStmt.setLong(1, flag);
ResultSet mysqlRs = mysqlStmt.executeQuery();
logger.info(getMinute()+" get records from mysql.");
int i = 0;
while (mysqlRs.next()) {
Min1 m1 = new Min1(mysqlRs);
// 将数据批量插入到PostgreSQL数据库中
pgStmt.setLong (1, m1.recordID);
pgStmt.setLong (2, m1.dayRecordID);
pgStmt.setString (3, m1.targetID);
pgStmt.setDate (4, m1.date);
pgStmt.setShort (5, m1.minute);
pgStmt.setFloat (6, m1.open);
pgStmt.setFloat (7, m1.high);
pgStmt.setFloat (8, m1.low);
pgStmt.setFloat (9, m1.close);
pgStmt.setFloat (10, m1.average);
pgStmt.setLong (11, m1.shareVolume);
pgStmt.setLong (12, m1.moneyVolume);
pgStmt.setLong (13, m1.openInterest);
pgStmt.addBatch();
i++;
if (i % 500000 == 0) {
System.out.println(i);
}
}
// 提交批量插入
logger.info(getMinute() + " combine all sql into a batch.");
pgStmt.executeBatch();
logger.info(getMinute() + " after excute batch.");
pgStmt.clearBatch();
mysqlRs.close();
mysqlStmt.close();
pgStmt.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public void uninit() {
try {
mysqlConn.close();
pgConn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public String getMinute() {
LocalTime now = LocalTime.now();
return "" + now.getHour() + ":" + now.getMinute() + ":" + now.getSecond();
}
}
运行起来效果还可以,大概2分钟迁移100万行,如此算来大概需要5天:
[main] INFO pack.Min1 - source line count:3474392405
[main] INFO pack.Min1 - target line count:2991509187
[main] INFO pack.Min1 - 7:44:14 get records from mysql.
500000
1000000
[main] INFO pack.Min1 - 7:44:15 combine all sql into a batch.
[main] INFO pack.Min1 - 7:44:29 after excute batch.
[main] INFO pack.Min1 - target line count:2992509187
[main] INFO pack.Min1 - 7:45:54 get records from mysql.
500000
1000000
[main] INFO pack.Min1 - 7:45:56 combine all sql into a batch.
[main] INFO pack.Min1 - 7:46:10 after excute batch.
[main] INFO pack.Min1 - target line count:2993509187
完。