🔥 本文将带你深入解析大规模数据迁移的实践方案,从架构设计到代码实现,手把手教你解决数据迁移过程中的各种挑战。
📚博主其他匠心之作,强推专栏:
- 小游戏开发【博主强推 匠心之作 拿来即用无门槛】
文章目录
-
- 一、场景引入
-
- 1. 问题背景
- 2. 场景分析
-
- 为什么需要消息队列?
- 为什么选择Redis?
- 技术选型对比
- 二、技术方案设计
-
- 1. 整体架构
- 2. 核心组件设计
-
- 2.1 任务管理模块
-
- 2.1.1 MigrationStarter
- 2.1.2 MigrationTaskManager
- 2.2 数据读取模块
-
- 2.2.1 DataReader
- 2.2.2 OrderMapper
- 2.3 Redis队列模块
-
- 2.3.1 RedisQueue
- 2.4 消费者模块
-
- 2.4.1 ConsumerManager
- 2.4.2 ConsumerWorker
- 2.4.3 MongoWriter
- 2.4.4 MongoDBMonitor
- 2.4.5 ConsumerProgressManager
- 写在最后
一、场景引入
场景:需要将8000w条历史订单数据从原有MySQL数据库迁移到新的MongoDB集群中,你有什么好的解决方案? 大家可以先暂停,自己思考一下。
1. 问题背景
需要将8000w条历史订单数据从原有MySQL数据库迁移到新的MongoDB集群中,主要面临以下挑战:
- 源库压力:直接读取大量数据会影响线上业务
- 目标库压力:直接写入大量数据可能导致MongoDB性能下降
- 数据一致性:确保迁移过程中数据不丢失、不重复
- 迁移效率:需要在规定时间窗口内完成迁移
- 异常处理:支持断点续传,避免异常导致全量重试
2. 场景分析
为什么需要消息队列?
- 源库保护:
- 通过消息队列控制读取速度
- 避免大量查询影响线上业务
- 任务解耦:
- 将数据读取和写入解耦
- 支持独立扩展读写能力
- 流量控制:
- 控制写入MongoDB的速度
- 避免瞬时高并发
为什么选择Redis?
- 性能考虑:
- Redis的list结构天然支持队列操作
- 单机QPS可达10w级别
- 可靠性:
- 支持持久化,防止数据丢失
- 主从架构保证高可用
- 成本因素:
- 无需额外引入消息队列组件
- 降低系统复杂度
- 实现简单:
- 开发成本低
- 维护成本低
技术选型对比
方案 | 优势 | 劣势 | 是否采用 |
---|---|---|---|
直接迁移 | 实现简单 | 压力大,风险高 | 否 |
Redis队列 | 实现简单,成本低 | 单机容量有限 | 是 |
Kafka | 吞吐量大,持久化好 | 部署复杂,成本高 | 否 |
RabbitMQ | 功能完善 | 过重,维护成本高 | 否 |
二、技术方案设计
1. 整体架构
MySQL(Source) -> Data Reader -> Redis Queue -> Consumer Workers -> MongoDB(Target)
↑ ↑ ↑ ↑
限流控制 队列监控告警 消费进度监控 写入状态监控
整个迁移过程说起来很简单:从MySQL读数据,写到Redis队列,消费者从Redis读取后写入MongoDB。但魔鬼藏在细节里,让我们一步步看看要注意什么:
MySQL数据读取
首先是MySQL这块,我们不能无脑读取。想象一下,如果不控制读取速度,直接大量读取数据,很可能会影响到线上正常业务。所以这里有两个关键点:
- 选择合适的时间。建议在业务低峰期,比如凌晨,这时候可以适当提高读取速率。
- 控制读取速度。通过监控MySQL的负载情况,动态调整读取速率。
读取方式
读取数据时要格外注意顺序问题。我们一般用创建时间和ID来排序:
- 先按创建时间排序
- 如果时间相同,再按ID排序
- 每次读取都记录当前的时间点和ID
- 下次继续读取时,就从这个位置开始
Redis队列控制
数据读出来了,不能直接往Redis里写。Redis也是有容量限制的,需要合理控制:
- 假设一条订单数据1KB(实际可能2-3KB)
- 单实例Redis一般4-8G,按8G算
- 建议只用30%给这个任务,也就是2.4GB
- 差不多能放2400万条数据,超过就要告警
- 如果队列积压严重,要停止写入,等消费者消费一些后再继续
数据写入流程
整个写入过程要严格保证可靠性:
- 读取数据
- 记录任务进度
- 写入Redis
- 这几步要在一个事务里完成
- 如果失败了要记录下来,方便重试
消费者处理
消费这块要特别注意几个问题:
- 从Redis读取数据
- 写入MongoDB
- 确认消费完成
- 记录消费进度
- 失败要支持重试
- 整个过程要保证幂等性,防止重复消费
MongoDB写入控制
最后写MongoDB时也要注意控制速度:
- 监控MongoDB的负载情况
- 根据负载动态调整写入速度
- 避免写入太快导致MongoDB压力过大
通过这样的设计,我们就能实现一个相对可靠的数据迁移方案。当然,实际实现时还需要考虑更多细节,比如异常处理、监控告警等。
2. 核心组件设计
在开始具体的代码实现之前,让我们先理解每个组件的职责和实现思路。
2.1 任务管理模块
首先来看任务管理相关的代码实现。这部分主要包含两个核心类:MigrationStarter
和MigrationTaskManager
。
2.1.1 MigrationStarter
这是整个迁移任务的入口类,负责创建任务并提交给任务管理器。它的主要职责是:
- 生成唯一的任务ID
- 创建迁移任务实例
- 提交任务到管理器
/**
* 数据迁移启动器
* 作为整个迁移任务的入口
*/
@Slf4j
@Component
public class MigrationStarter {
@Autowired
private MigrationTaskManager taskManager; // 迁移任务管理器
/**
* 启动迁移任务
* @param startTime 开始时间
* @param endTime 结束时间
*/
public void startMigration(LocalDateTime startTime, LocalDateTime endTime) {
// 1. 创建迁移任务
String taskId = UUID.randomUUID().toString(); // 生成任务ID
MigrationTask task = new MigrationTask(taskId, startTime, endTime); // 创建迁移任务
// 2. 提交任务
taskManager.submitTask(task); // 提交任务
log.info("迁移任务已提交,taskId={}", taskId); // 日志记录
}
}
2.1.2 MigrationTaskManager
任务管理器负责任务的具体执行和生命周期管理。它实现了:
- 线程池管理
- 任务执行流程控制
- 异常处理和重试机制
/**
* 迁移任务管理器
* 负责任务的调度和管理
*/
@Slf4j
@Component
public class MigrationTaskManager {
@Autowired
private DataReader dataReader;
@Autowired
private MongoWriter mongoWriter;
@Autowired
private RedisQueue redisQueue;
// 线程池配置
private final ExecutorService executorService = new ThreadPoolExecutor(
5, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("migration-pool-%d").build()
);
/**
* 提交迁移任务
*/
@Transactional(rollbackFor = Exception.class)
public void submitTask(MigrationTask task) {
executorService.submit(() -> {
try {
// 1. 初始化任务
task.init();
// 2. 执行数据读取
while (task.hasNext()) {
// 读取数据
List<Order> orders = dataReader.readNextBatch(task);
// 在事务中执行更新进度和写入Redis队列
transactionTemplate.execute(status -> {
try {
// 先更新任务进度(持久化)
task.updateProgress(orders);
// 原子性写入Redis队列
redisQueue.pushBatchAtomic(orders);
return true;
} catch (Exception e) {
status.setRollbackOnly();
throw new RuntimeException("处理批次数据失败", e);
}
});
}
// 3. 完成任务
task.complete();
} catch (Exception e) {
log.error("任务执行异常", e);
task.fail(e);
}
});
}
}
2.2 数据读取模块
数据读取是整个迁移过程的起点,需要特别注意读取性能和源库压力控制。这部分包含两个关键类:
2.2.1 DataReader
数据读取器负责从MySQL中批量读取数据,实现了:
- 动态批次大小调整
- 读取速率控制
- 异常处理机制
/**
* 数据读取器
*/
@Slf4j
@Component
public class DataReader {
private final RateLimiter rateLimiter;
@Autowired
private MySQLMonitor mysqlMonitor;
private int currentBatchSize = 1000; // 初始批次大小
private static final int MIN_BATCH_SIZE = 100;
private static final int MAX_BATCH_SIZE = 5000;
public DataReader() {
// 初始速率设置
this.rateLimiter = RateLimiter.create(100); // 每秒100个请求
}
/**
* 动态调整读取速率
*/
private void adjustReadingRate() {
if (!mysqlMonitor.checkMySQLStatus()) {
// 降低速率和批次大小
rateLimiter.setRate(rateLimiter.getRate() * 0.8);
currentBatchSize = Math.</