一、实测结果
-
业务无感知,无死锁平滑
-
线上800万数据以下 直接使用 alter 新增字段 300ms左右
-
2000万数据,强制使用主键索引,每次查询50万数据 并插入新表 ,耗时 20s ,cpu 占45%
二、整体步骤
-
创建新表 biz_table_new
-
查询当前表first_max_id
- 确定当前同步数据量
37509688
- 确定当前同步数据量
-
将数据同步至新表
insert into biz_table_new(id,app_trade_no,trade_no,trade_sub_no,uid,account_type,relate_uid,relate_account_type,biz_type,biz_subtype,in_out,amount,balance_before,remark,extra,created_at,updated_at) select * from journal_2022 force index(PRIMARY) where id>0 and id <=500000;select sleep(10); insert into biz_table_new(id,app_trade_no,trade_no,trade_sub_no,uid,account_type,relate_uid,relate_account_type,biz_type,biz_subtype,in_out,amount,balance_before,remark,extra,created_at,updated_at) select * from biz_table force index(PRIMARY) where id>500000 and id <=1000000;select sleep(10); ....
- 分批次查询,根据步长设置区间
- 每次执行完毕休眠10s
-
验证数据一致性,截止到first_max_id的数据同步完成
- 根据max_id 验证历史数据是否一致
select count(1) from biz_table where id<=37509688; select count(1) from biz_table_new where id<=37509688;
-
评估1分钟后的increment值
- 查询当前max_id 如:37609688
- 当前业务,增长量 ,计算1分钟内的数据量,如:200000
- 得出预计的increment 值 如:37609688+200000=37809688
-
执行表迁移
- 设置新表的Increment 值
- 重命名 biz_table 为 biz_table_bak
- 重命名 new_table 为 biz_table
alter table biz_table_new AUTO_INCREMENT =37809688; rename table biz_table to biz_table_bak; rename table biz_table_new to biz_table;
-
同步新增数据量
- 计算biz_table_bak 的max_id 为 last_max_id 如
37808688
- 根据first_max_id () 、last_max_id 获取此区间数据,同步至新表
insert into biz_table_new(id,app_trade_no,trade_no,trade_sub_no,uid,account_type,relate_uid,relate_account_type,biz_type,biz_subtype,in_out,amount,balance_before,remark,extra,created_at,updated_at) select * from biz_table force index(PRIMARY) where id>37509688 and id <=37808688
- 计算biz_table_bak 的max_id 为 last_max_id 如
-
数据同步完毕
三、GO脚本分享
- 脚本
package utils
import "fmt"
const diffTmp = `
insert into %s(id,uid,account_type,balance,frozen,status,created_at,updated_at,app_alias)
select *,"lailiao" as app_alias from %s_bak force index(PRIMARY)
where id>%d and id <=%d;select sleep(1);
`
func SyncDataV2(tmp string, oldTable, newTable string, step, minId, maxId int) string {
if tmp == "" {
tmp = defaultTmp
}
var sql string
if minId < 0 {
minId = 0
}
var nextId int
for i := minId; i < maxId; i++ {
nextId = minId + step
if nextId > maxId {
nextId = maxId
}
sql += fmt.Sprintf(tmp, newTable, oldTable, minId, nextId)
minId = nextId
if minId == maxId {
break
}
}
return sql
}
func SyncDiffData(tmp string, oldTable, newTable string, step, minId, maxId int) string {
if tmp == "" {
tmp = diffTmp
}
var sql string
if minId < 0 {
minId = 0
}
var nextId int
for i := minId; i < maxId; i++ {
nextId = minId + step
if nextId > maxId {
nextId = maxId
}
sql += fmt.Sprintf(tmp, newTable, oldTable, minId, nextId)
minId = nextId
if minId == maxId {
break
}
}
return sql
}
func VerifyData(oldTable, newTable string, max int) string {
sql := fmt.Sprintf("select count(1) from %s where id<=%d;", oldTable, max)
sql += fmt.Sprintf("select count(1) from %s where id<=%d;", newTable, max)
return sql
}
func RenameTable(oldTable, newTable string, incrementId int) (sql string) {
sql += fmt.Sprintf("alter table %s AUTO_INCREMENT =%d;", newTable, incrementId)
sql += fmt.Sprintf("rename table %s to %s;", oldTable, oldTable+"_bak")
sql += fmt.Sprintf("rename table %s to %s;", newTable, oldTable)
return sql
}
- 测试用例
func TestAccount(t *testing.T) {
oldTable := "account"
newTable := "account_new"
//同步历史数据
step := 100000
min := 0
max := 952317 //数据表max_id
tmp := `
insert into %s(id,uid,account_type,balance,frozen,status,created_at,updated_at,app_alias)
select *,"xxx" as app_alias from %s force index(PRIMARY)
where id>%d and id <=%d;select sleep(10);
`
template := SyncDataV2(tmp, oldTable, newTable, step, min, max)
t.Log(template)
//验证数据一致性
verify := VerifyData(oldTable, newTable, max)
t.Log("Verify:\n",verify)
//变更表名
increment := max + 20000
sql := RenameTable(oldTable, newTable, increment)
t.Log("SyncData:\n",sql)
//同步新增数据
tmp = `
insert into %s(id,uid,account_type,balance,frozen,status,created_at,updated_at,app_alias)
select *,"lailiao" as app_alias from %s_bak force index(PRIMARY)
where id>%d and id <=%d;select sleep(1);
`
template = SyncDiffData(tmp, oldTable, oldTable, step, max, increment)
t.Log(template)
}