持久化
RDB
1、save会阻塞所有命令。而bgsave则不能与其他持久化命令同时执行
2、自动rdb的发起:servercorn默认每100ms执行一次,根据redisserver里面记录的dirty(上次save后修改的次数)和lastsave(上次save的时间)变量以及配置的saveparams(保存条件)来判断是否符合保存条件,若符合则执行save
3、默认的自动保存条件
save 900 1 (900秒内1次修改)
save 300 10
save 60 10000
数据结构
struct redisServer {
// ...
long long dirty; //距离上次保存后修改的键数
time_t lastsave; //上次保存时间
struct saveparam *saveparams; //保存条件
// ...
}
源码
serverCron里根据根据redisserver里面记录的dirty和lastsave变量以及配置的*saveparams参数来判断当前是否需要触发bgsave
save和bgSave伪代码
AOF
数据结构
struct redisServer {
// ...
sds aof_buf; //aof命令缓存区
// ...
}
源码
在每次命令的call函数最终会执行到feedAppendOnlyFile方法,这个方法会将写命令存入aof_buf缓存区。
而在后续serverCron都会执行flushAppendOnlyFile考虑是否将aof缓存区的内容写入同步到aof文件,以下是伪代码
不同的appendfsync决定了不同的刷盘策略
flushAppendOnlyFile源码
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
mstime_t latency;
if (sdslen(server.aof_buf) == 0) {
//如果是AOF_FSYNC_EVERYSEC且超过一秒且当前无刷盘任务则刷盘
if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.aof_fsync_offset != server.aof_current_size &&
server.unixtime > server.aof_last_fsync &&
!(sync_in_progress = aofFsyncInProgress())) {
goto try_fsync;
} else {
return;
}
}
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = aofFsyncInProgress();
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
//如果当前有刷盘任务且本次又不强制刷盘则判断是否需要推迟等待
if (sync_in_progress) {
if (server.aof_flush_postponed_start == 0) {
//如果是第一次则推迟并记录推迟开始时间
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
//最多推迟等待两秒
return;
}
//记录下延迟个数
server.aof_delayed_fsync++;
serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
latencyStartMonitor(latency);
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); //写入系统缓存
latencyEndMonitor(latency);
if (sync_in_progress) {
latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
} else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) {
latencyAddSampleIfNeeded("aof-write-active-child",latency);
} else {
latencyAddSampleIfNeeded("aof-write-alone",latency);
}
latencyAddSampleIfNeeded("aof-write",latency);
//已经写入系统缓存,重置延迟刷盘时间
server.aof_flush_postponed_start = 0;
//当实际写入与buf长度不一致时处理错误及打印些日志
if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
static time_t last_write_error_log = 0;
int can_log = 0;
/* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
can_log = 1;
last_write_error_log = server.unixtime;
}
/* Log the AOF write error and record the error code. */
if (nwritten == -1) {
if (can_log) {
serverLog(LL_WARNING,"Error writing to the AOF file: %s",
strerror(errno));
server.aof_last_write_errno = errno;
}
} else {
if (can_log) {
serverLog(LL_WARNING,"Short write while writing to "
"the AOF file: (nwritten=%lld, "
"expected=%lld)",
(long long)nwritten,
(long long)sdslen(server.aof_buf));
}
if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
if (can_log) {
serverLog(LL_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
} else {
/* If the ftruncate() succeeded we can set nwritten to
* -1 since there is no longer partial data into the AOF. */
nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}
/* Handle the AOF write error. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* We can't recover when the fsync policy is ALWAYS since the
* reply for the client is already in the output buffers, and we
* have the contract with the user that on acknowledged write data
* is synced on disk. */
serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
} else {
/* Recover from failed write leaving data into the buffer. However
* set an error to stop accepting writes as long as the error
* condition is not cleared. */
server.aof_last_write_status = C_ERR;
/* Trim the sds buffer if there was a partial write, and there
* was no way to undo it with ftruncate(2). */
if (nwritten > 0) {
server.aof_current_size += nwritten;
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
}
} else {
if (server.aof_last_write_status == C_ERR) {
serverLog(LL_WARNING,
"AOF write error looks solved, Redis can write again.");
server.aof_last_write_status = C_OK;
}
}
server.aof_current_size += nwritten;
//aof较小时则清空重用,比较大则释放内存并重新申请
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf);
} else {
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
//实际刷盘
try_fsync:
//如果配置了aof rewrite时不要刷盘,且当前有子进程在执行aof重写或rdb保存,就先不要fsync
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;
//AOF_FSYNC_ALWAYS 同步刷盘
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
latencyStartMonitor(latency);
redis_fsync(server.aof_fd);
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_fsync_offset = server.aof_current_size;
server.aof_last_fsync = server.unixtime;
}
//AOF_FSYNC_EVERYSEC 提交job异步刷盘
else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) {
aof_background_fsync(server.aof_fd);
server.aof_fsync_offset = server.aof_current_size;
}
server.aof_last_fsync = server.unixtime;
}
}
其实不止serverCron会执行flushAppendOnlyFile,像beforesleep、prepareForShutDown、stopAppendOnly时也会执行
总结:aof核心方法是 feedAppendOnlyFile、flushAppendOnlyFile
混合持久化(aof重写)
混合持久化本质是通过 AOF 后台重写(bgrewriteaof 命令)完成的,不同的是当开启混合持久化时,fork 出的子进程先将当前全量数据以 RDB 方式写入新的 AOF 文件,然后再将 AOF 重写缓冲区(aof_rewrite_buf_blocks)的增量命令以 AOF 方式写入到文件,写入完成后通知主进程将新的含有 RDB 格式和 AOF 格式的 AOF 文件替换旧的的 AOF 文件。
数据结构
struct redisServer {
// ...
int aof_rewrite_perc; //重写条件之增长百分比
off_t aof_rewrite_min_size; //重写条件之aof文件大小
list *aof_rewrite_buf_blocks; //aof 重写缓存区
int aof_pipe_write_data_to_child; //管道传递发送端
int aof_pipe_read_data_from_parent; //管道传递接收端
// ...
}
源码
除主动调用 rewriteaof/bgrewriteaof 外,在serverCron会根据配置的参数判断是否需要重新。具体判断逻辑如下
serverCron里的判断是否需要aof重写的源码:
rewriteAppendOnlyFileBackground 重写aof
思考
aof重写会阻塞主线程吗?如果不会,那如何保证重写过程中的数据一致性?
不会阻塞,也是fork一个子线程。并且会有一个aof_rewrite_buf用于存储重写过程中的命令。所以当正在进行重写时一条命令的执行会同时写入aof_buf和aof_rewrite_buf_blocks缓存区,再通过aof_pipe_write_data_to_child管道传给重写子进程,子进程重写的过程中会尽可能的从管道中获取数据(因为aof重写缓存区写到aof文件是主线程执行了,为了防止长时间阻塞,每次等待可读取事件1ms,如果一直能读取到数据,则这个过程最多执行1000次,也就是1秒。如果连续20次没有读取到数据,则结束这个过程。),当重写进程结束了,会在主进程(阻塞)将剩余的新命令写入aof文件。
其实难点就在于aof重写是子进程异步的,主进程新加入的写命令如何传递给重写子进程。具体原理用到了管道传递(共有3个管道:2个控制管道,1个数据管道),具体实现后面有机会再详细独立一篇吧,这里只大概说一下数据发送端和接收端。
数据发送端:
写命令写入aof_buf的同时会判断是否正在aof重写,如果是则同时写入aof重写缓存区并创建管道读事件
数据接收端:
rewriteAppendOnlyFileBackground方法fork的子进程中rewriteAppendOnlyFile方法里的第1405行-1414行
过期键对rdb和aof的影响
rdb:过期键不会写入rdb文件,载入rdb文件时,主服务器也不会载入,只有从服务器会载入过期键。不过因为主服务器会同步至从,所以最终从服务器也没有过期键。(在复制模式下从服务器不会删除过期键,只能以主服务器同步del,在主服务器统一的删除,以确保数据一致性)
aof:在真正删除时才会追加删除命令到aof,同时重写aof文件时也会去掉过期键。
过期删除策略
1、定时:aeMain(时间事件)
2、定期:servercron
3、惰性:expireIfNeeded
持久化文件的加载
在redis启动的main函数里第4373行的loadDataFromDisk加载持久化文件,而aeMain的监听在main函数的最后。所以先加载完持久化文件后才会提供redis服务。
参考文献:《Redis设计与实现》黄健宏著、redis-5.0.14源码