【Redis】AOF 基础

news2024/12/24 9:34:03

因为 Redis AOF 的实现有些绕, 就分成 2 篇进行分析, 本篇主要是介绍一下 AOF 的一些特性和依赖的其他函数的逻辑,为下一篇 (Redis AOF 源码) 源码分析做一些铺垫。

AOF 全称: Append Only File, 是 Redis 提供了一种数据保存模式, Redis 默认不开启。
AOF 采用日志的形式来记录每个写操作, 并追加到文件。开启后, 执行更改 Redis 数据的命令时, 就会把命令写入到 AOF 文件中。
Redis 重启时会根据日志文件的内容把写指令从前到后执行一次以完成数据的恢复工作。

1 AOF 相关的配置

appendonly no                   # AOF 开关, 默认为关闭
appendfilename "appendonly.aof" # 保存的文件名
appendfsync everysec            # AOF 持久化策略 (硬盘缓存写入到硬盘) 

AOF 简单使用的话就这 3 个配置, 前 2 个就是字面的意思, 理解其他比较简单。
我们说明一下第三个配置的使用。

开启 AOF 后, 每次修改的命令都会存到 Redis 的一个缓存区
缓存区的数据最终是需要写入到磁盘的, 而 Redis 是通过 write 函数, 将缓存中的数据写入到磁盘中。
但是 write 函数实际是先将数据先保存到系统层级的缓存, 后续由系统自身将数据保存到磁盘, 系统默认为 30 秒保存一次。这样的话, 可能有风险, 如果系统直接宕机了
可能会丢失 30 秒左右的数据, 所以系统提供了一个 fsync 函数, 可以把系统层级的缓存立即写入到磁盘中, 但是这是一个阻塞且缓慢的操作, 会影响到执行的线程。

所以上面的配置的第 3 项就是控制这个 Redis 缓存到磁盘的行为

  1. everysec: AOF 默认的持久化策略。每秒执行一次 fsync, 可能导致丢失 1s 数据, 这种策略兼顾了安全性和效率
  2. no: 表示不执行 fsync, 由操作系统保证数据同步到磁盘, 速度最快, 但是不太安全
  3. always: 表示每次写入到执行 fsync, 保证数据同步到磁盘, 效率很低

除了上面的 3 个基础配置, 还有几个关于 AOF 执行中的行为配置

# 默认为 100
# 当目前 AOF 文件大小超过上次重写的 AOF 文件的百分之多少进行重写 (重写的含义可以看下面的重写机制), 即当 AOF 文件增长到一定大小的时候, Redis 能够调用 bgrewriteaof 对日志文件进行重写
auto-aof-rewrite-percentag  100

# 默认为 64m
# 设置允许重写的最小 AOF 文件大小, 避免达到约定百分比但占用的容量仍然很小的情况就重写
auto-aof-rewrite-min-size  64mb

# 默认为 no
# 在 AOF 重写时, 是否不要执行 fsync, 将缓存写入到磁盘, 默认为 no。
# 如果对低延迟要求很高的应用, 这里可以设置为 yes, 否则设置为 no, 这样对持久化特性来说这是更安全的选择
# 设置为 yes 表示重写期间对新的写操作不 fsync, 暂时存在内存中, 等重新操作完成后再写入
# 默认为 no, 建议改为 yes, 因为 Linux 的默认 fsync 策略为 30 秒, 所以可能丢失 30 秒数据
no-appendfsync-on-rewrite  no

# 默认为 yes
# 当 Redis 启动的时候, AOF 文件的数据会被重新载入内存
# 但是 AOF 文件可能在尾部是不完整的, 比如突然的断电宕机什么的, 可能导致 AOF 文件数据不完整
# 对于不完整的 AOF 文件如何处理
# 配置为 yes, 当截断的 AOF 文件被导入的时候, 会自动发布一个 log 给客户端, 然后继续加载文件中的数据
# 配置为 no, 用户必须手动 redis-check-aof 修复 AOF 文件才可以
aof-load-truncated yes

2 AOF 重写机制

上面的配置中有好几个提示到重写的概念, 那么什么是重写呢?

由于 AOF 持久化是 Redis 不断将写命令记录到 AOF 文件中, 随着 Redis 不断的运行, AOF 文件将会越来越大, 占用服务器磁盘越来越大, 同时 AOF 恢复要求时间越长。

为了解决这个问题, Redis 新增了重写机制, 当 AOF 文件的大小超过了所设定的阈值时, Redis 就会自动启动 AOF 文件的内容压缩, 只保留可以恢复数据的最小指令集。
AOF 文件不是对原文件进行整理, 而是直接读取服务器现有的键值对, 然后用一条命令去代替之前记录这个键值对的多条命令, 生成一个新的文件替换原来的 AOF 文件。

用户可以通过 bgrewriteaof 命令来手动触发 AOF 文件的重写, 这个重写的过程也是通过子进程实现的。
在子进程进行 AOF 重写时, 主线程需要保证

  1. 处理客户端的请求
  2. 将新增和更新命令追加到现有的 AOF 文件中
  3. 将新增和更新命令追加到 AOF 重写缓存中

3 AOF 文件的优势和劣势

优势

  1. AOF 持久化的方法提供了多种的同步频率, 即使使用默认的同步频率每秒同步一次, Redis 最多也就丢失 1 秒的数据而已
  2. AOF 日志文件以 append-only 模式写入, 所以没有任何磁盘寻址的开销, 写入性能非常高, 而且文件不容易受损, 即使文件尾部受损, 也能很容易恢复, 打开文件, 把后面损坏的数据删除即可

劣势

  1. 对于具有相同数据的的 Redis, AOF 文件通常会比 RDF 文件体积更大 (RDB 存的是数据快照)
  2. 虽然 AOF 提供了多种同步的频率, 默认情况下, 每秒同步一次的频率也具有较高的性能。但是在高并发的情况下, RDB 比 AOF 具好更好的性能保证

4 AOF 和 RDB 两种方案比较

如果可以忍受一小段时间内数据的丢失, 使用 RDB 是最好的, 定时生成 RDB 快照 (snapshot) 非常便于进行数据库备份, 并且 RDB 恢复数据集的速度也要比 AOF 恢复的速度要快, 否则使用 AOF。

但是一般情况下建议不要单独使用某一种持久化机制, 而是应该两种一起用, 在这种情况下, 当 Redis 重启的时候会优先载入 AOF 文件来恢复原始的数据, 因为在通常情况下 AOF 文件保存的数据集要比 RDB 文件保存的数据集要完整。

在 Redis 4.0 带来了一个新的持久化选项 —— 混合持久化。将 RDB 文件的内容和增量的 AOF 日志文件存在一起。
这里的 AOF 日志不再是全量的日志, 而是自持久化开始到持久化结束的这段时间发生的增量 AOF 日志, 通常这部分 AOF 日志很小。

在 Redis 重启的时候, 可以先加载 RDB 的内容, 然后再重放增量 AOF 日志就可以完全替代之前的 AOF 全量文件重放, 重启效率因此大幅得到提升。

5 AOF 的过程

高度概括如下:

  1. 所有的修改命令会追加到 Redis 的一个 AOF 缓存区
  2. AOF 缓存区根据配置的策略向硬盘做同步操作
  3. 随着 AOF 文件越来越大, 达到配置的条件, 对 AOF 文件进行重写, 达到压缩的目的

到此, AOF 的理论知识就没了, 下面是介绍几个比较重要的函数的逻辑。

6 AOF 文件结构

如果现在向 Redis 中写入一个 key 为 redis-key, value 为 redis-value 的字符串键值对后, 这对键值对会以下面的格式保存在 AOF 文件中:

*3\r\n$3\r\nset\r\n$9\r\nredis-key\r\n$11\r\nredis-value\r\n

*数字 的格式开始, 表示后面的命令的参数个数, 然后通过 $数字 表示后面参数的长度, 然后各个分隔之间通过 \r\n 进行分隔。

整体的格式就是 Redis 自定义的 RESP 协议, 具体的 RESP 介绍, 可以看一下这篇文章。

可以看到这种文本格式具有很高的可读性, 同时可以直接进行修改。

注: Redis 中有多个数据库, 写入的数据是保存在哪个数据库的?
在写入对应的数据库数据时, 内部会自动插入一条 select 数据库的编号 的命令到 AOF 文件, 表明对应的数据库, 解析时也是通过这条命令切换到对应的数据库。

源码中将 key 和 value 转换为上面的文件格式的实现是由 2 个函数实现的: catAppendOnlyGenericCommand 和 catAppendOnlyExpireAtCommand, 前者处理的是正常的命令, 而后者处理的是命令的过期时间。

6.1 catAppendOnlyGenericCommand - 没有过期时间的命令

/**
 * 将入参的参数转为 RESP 格式写入到入参的 dst
 * @param dst 当前未写入到文件的命令文本, 新的命令会追加到这个的后面
 * @param argc 命令参数的个数
 * @param argv 命令参数, 比如 set key value
 */
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {

    char buf[32];
    int len, j;
    robj *o;

    // 上面的 set redis-key redis-value, 按照 RESP 协议转换的内容如下
    // *3\r\n$3\r\nset\r\n$9\r\nredis-key\r\n$11\r\nredis-value\r\n
    // 这里面可以拆为 2 部分处理 
    // 1. *3\r\n  --> 命令的参数个数
    // 2. $3\r\nset\r\n$9\r\nredis-key\r\n$11\r\nredis-value\r\n  --> 具体的命令

    // 1. 处理命令的参数个数部分
    
    // 命令开始的前缀为 *
    buf[0] = '*';
    // argc 表示的是写入命令的个数, 经过这一步 buf = *参数个数
    len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
    // 追加 \r\n, 到了这一步 经过这一步 buf = *参数个数\r\n
    buf[len++] = '\r';
    buf[len++] = '\n';

    // 先将处理的文本第一步拼接到 dst 的后面, 此时 dst = *参数个数\r\n
    dst = sdscatlen(dst,buf,len);

    // 2. 处理具体的命令

    // 拼接参数列表
    for (j = 0; j < argc; j++) {
        
        // 将对应的参数转为字符串类型
        o = getDecodedObject(argv[j]);
        buf[0] = '$';
        // 将命令的长度写入到 buf 中
        len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
        // 继续在后面拼接 \r\n, 到这一步 buf = $命令的长度\r\n
        buf[len++] = '\r';
        buf[len++] = '\n';

        // 同样将 $命令的长度\r\n 写入到 dst
        dst = sdscatlen(dst,buf,len);
        // 将当前的命令具体的内容 写入到 dst
        dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
        // 在 dist 后面追加一个 \r\n
        dst = sdscatlen(dst,"\r\n",2);
        // 经过一次循环, dist 里面多了一段 $命令的长度\r\n命令\r\n 的内容

        // 引用此数 - 1
        decrRefCount(o);
    }
    return dst;
}

/**
 * 如果入参的对象是 raw 或者 embstr 编码, 引用次数 + 1
 * 如果为 int 编码, 根据这个整数创建出一个字符串, 同时返回这个字符串
 * 其他类型不会处理
 */
robj *getDecodedObject(robj *o) {
    robj *dec;

    // 判断一个对象的编码是否为 OBJ_ENCODING_EMBSTR 或者 OBJ_ENCODING_RAW
    if (sdsEncodedObject(o)) {
        // 对象的引用次数还没达到最大值时, 进行引用次数 + 1
        incrRefCount(o);
        return o;
    }
    // 是字符串类型同时编码为 OBJ_ENCODING_INT
    if (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_INT) {

        char buf[32];
        // 整形转为 char 数组
        ll2string(buf,32,(long)o->ptr);
        // 转为字符串
        dec = createStringObject(buf,strlen(buf));
        return dec;
    } else {
        serverPanic("Unknown encoding type");
    }
}

2.7.2 catAppendOnlyExpireAtCommand - 带过期时间的命令

/**
 * 将入参的过期时间转为 RESP 格式的字符串并存入 buf
 * @param buf 当前未写入到文件的命令文本, 新的命令会追加到这个的后面, 同时将命令修改为 pexpireat 的格式
 * @param cmd 执行的命令
 * @param key redis 的 key 值
 * @param second 过期的时间, 单位秒
 */
sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, robj *seconds) {

    long long when;
    robj *argv[3];

    // 转为字符串类型, 以便使用 strtoll 函数
    seconds = getDecodedObject(seconds);
    // 根据指定的进制将入参的 char 数组转为一个整数, 10 --> 10 进制
    // 得到过期的时间
    when = strtoll(seconds->ptr,NULL,10);

    // 当前执行的命令为 expire, setex expireat 将参数的秒转换成毫秒
    if (cmd->proc == expireCommand || cmd->proc == setexCommand || cmd->proc == expireatCommand) {
        when *= 1000;
    }

    // 将 expire, setex expireat 命令的参数,从相对时间设置为绝对时间
    // 以前可能是 10s 后过期, 经过这一步,得到的是 xxx 年 yyy 月 的具体时间
    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand || cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        when += mstime();
    }

    // 减少 second 的引用次数, 便于回收
    decrRefCount(seconds);

    // 拼接为 pexpireat key 超时时间 的命令格式
    argv[0] = createStringObject("PEXPIREAT",9);
    argv[1] = key;
    argv[2] = createStringObjectFromLongLong(when);

    // 将上面的 pexpireat key 过期时间 的命令通过 catAppendOnlyGenericCommand 转为 RESP 格式的字符串
    buf = catAppendOnlyGenericCommand(buf, 3, argv);
    decrRefCount(argv[0]);
    decrRefCount(argv[2]);
    // 返回buf
    return buf;

}

7 代码中涉及的几个模块

在 Redis 的 AOF 中涉及了几个模块功能, 这些功能辅助着整个 AOF 的功能, 这里对这些功能进行一个简单的讲解, 需要说明的时, 这些功能具体的实现可以不用去深入理解, 到了 AOF 源码时, 知道对应的函数的功能就行了。

这里也只是简单的介绍一下, 感兴趣了解一下大体的思路而已, 可以在后面 AOF 源码分析后, 再回来看一下。

7.1 延迟统计

Redis 中会对一些比较耗时的操作做一下统计, 便于后面的性能分析。
而在 AOF 中在调用 write 函数等操作也会进行延迟操作的统计。
大体的延迟统计实现如下:

统计延迟信息的配置


#define CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD 0

struct redisServer {

    // 延迟监视的阈值, 默认值为 0, 如果配置为大于 0 的值, 表示开启延迟监控, 同时超过了这个时间就进行延迟记录
    long long latency_monitor_threshold;

    // 字典, 也就是延迟记录的保存地方, 保存的格式是 延迟记录的事件名 和 latencyTimeSeries (一个数组)
    dict *latency_events;
}

统计延迟样本对象的定义

struct latencyTimeSeries {

    // 用于记录的下一个延迟样本的位置, 超过了数组的长度, 会重新被赋值为 0 
    int idx;

    // 最大的延时
    uint32_t max;

    // 最近的延时记录样本数组
    struct latencySample samples[LATENCY_TS_LEN];
}

struct latencySample {

    // 延时样本创建的时间
    int32_t time; 

    // 延迟样本的延迟时间, 单位毫秒
    uint32_t latency;
}

统计延迟样本的函数定义

// 下面的函数做了小改动, 逻辑一样的

// 获取延迟事件的时间
void latencyStartMonitor(var) { 
    // mstime() 获取到当前的时间
    var = server.latency_monitor_threshold ? mstime() : 0;
}

void latencyEndMonitor(var) {
    if (server.latency_monitor_threshold) {
        var = mstime() - var;
    }
}

/**
 * 判断是否需要记录延迟时间 
 * @param event 事件名
 * @param var 延迟事件的耗时时间
 */
void latencyAddSampleIfNeeded(event,var) {
    if (server.latency_monitor_threshold &&  (var) >= server.latency_monitor_threshold)
        latencyAddSample((event),(var));
}

/**
 * 添加延迟事件到 redisServer 的 latency_events 字典
 */ 
void latencyAddSample(char *event, mstime_t latency) {

    // 找出 event 对应的延时事件记录结构体
    struct latencyTimeSeries *ts = dictFetchValue(server.latency_events,event);

    time_t now = time(NULL);
    int prev;

    // 没有对应事件的 latencyTimeSeries, 添加一个
    if (ts == NULL) {
        ts = zmalloc(sizeof(*ts));
        ts->idx = 0;
        ts->max = 0;
        memset(ts->samples,0,sizeof(ts->samples));
        dictAdd(server.latency_events,zstrdup(event),ts);
    }

    // 获取存储的位置
    prev = (ts->idx + LATENCY_TS_LEN - 1) % LATENCY_TS_LEN;

    // 数组对应位置的样本的创建时间等于当前时间
    if (ts->samples[prev].time == now) {
        // 当前的延迟时间大于样本里面的延迟时间, 更新为当前时间
        if (latency > ts->samples[prev].latency)
            ts->samples[prev].latency = latency;
        return;
    }

    // 修改对应位置的样本的时间信息
    ts->samples[ts->idx].time = time(NULL);
    ts->samples[ts->idx].latency = latency;

    // 如果大于当前所有样本的时间, 更新最大延迟时间为当前的延迟时间
    if (latency > ts->max) 
        ts->max = latency;

    ts->idx++;
    // 超过了上限, 重新设置为 0 
    if (ts->idx == LATENCY_TS_LEN) 
        ts->idx = 0;    

}

上面就是延迟事件的创建和保存, 至于在哪里使用的, 如何汇总分析, AOF 这里没有涉及, 就跳过了, 如果需要继续研究可以查看 latency.hlatency.c 这 2 个文件。

7.2 BIO - 后台线程

在上面的介绍中可以知道 fsync 是一个很耗时的过程, 如果把这个过程同样放在 Redis 的主线程中, 那么可能影响到整个 Redis 的性能, 所以 Redis 将 fsync 的过程交给了后台的线程处理。
Reids 将后台相关耗时的操作封装为了一个 BIO 的功能, 可以看出是一个线程池, 线程池在启动时初始了几个线程, 然后生产者向这个池中添加任务。
而 Redis 主线程在执行到 fsync 时, 会提交一个 fsync 的任务到 BIO 中, 完成结束。真正的 fsync 由后台线程处理。

大体的实现如下:

任务类型定义

// 执行 close 函数, 也就是文件的关闭
#define BIO_CLOSE_FILE    0 

// 执行 redis_fsync 函数, 也就是 fsync 函数
#define BIO_AOF_FSYNC     1

// 延迟对象释放
#define BIO_LAZY_FREE     2 

// 任务类型的总数
#define BIO_NUM_OPS       3

BIO 的初始化

// 存放声明的线程数组
static pthread_t bio_threads[BIO_NUM_OPS];

// 线程锁, 这里是多线程场景了, 所以有并发问题
static pthread_mutex_t bio_mutex[BIO_NUM_OPS];

// 添加任务相关的 condition, 简单的理解就是, 线程会被阻塞在这个 condition, 另一个线程可以唤醒这个 condition 上的线程
static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];

// 执行过程相关的 condition
static pthread_cond_t bio_step_cond[BIO_NUM_OPS];

// 任务列表
static list *bio_jobs[BIO_NUM_OPS];

// 存放对应的任务类型还有多少个任务等待执行
static unsigned long long bio_pending[BIO_NUM_OPS];


void bioInit(void) {

    pthread_attr_t attr;
    pthread_t thread;
    size_t stacksize;
    int j;

    // 初始锁, condition, 任务列表
    for (j = 0; j < BIO_NUM_OPS; j++) {
        pthread_mutex_init(&bio_mutex[j],NULL);
        pthread_cond_init(&bio_newjob_cond[j],NULL);
        pthread_cond_init(&bio_step_cond[j],NULL);
        bio_jobs[j] = listCreate();
        bio_pending[j] = 0;
    }

    //设置线程栈空间
    pthread_attr_init(&attr);
    pthread_attr_getstacksize(&attr,&stacksize); 
    if (!stacksize) 
        stacksize = 1; 
    while (stacksize < REDIS_THREAD_STACK_SIZE) 
        stacksize *= 2;
    pthread_attr_setstacksize(&attr, stacksize);

    // 创建线程, 并存放到 bio_threads 这个线程数组
    for (j = 0; j < BIO_NUM_OPS; j++) {
        void *arg = (void*)(unsigned long) j;

        // 创建线程, 线程执行的逻辑为 bioProcessBackgroundJobs
        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1);
        }
        bio_threads[j] = thread;
    }
}

线程执行的逻辑

void *bioProcessBackgroundJobs(void *arg) {

    struct bio_job *job;
    unsigned long type = (unsigned long) arg;
    sigset_t sigset;

    // 任务类型校验
    if (type >= BIO_NUM_OPS) {
        serverLog(LL_WARNING, "Warning: bio thread started with wrong type %lu",type);
        return NULL;
    }

    // 配置 thread 能够在任何时候被杀掉
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

    // 获取锁
    pthread_mutex_lock(&bio_mutex[type]);

    sigemptyset(&sigset);
    sigaddset(&sigset, SIGALRM);
    if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
        serverLog(LL_WARNING, "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));

    // 线程主逻辑
    while(1) {

        listNode *ln; 

        // 没有任务, 进行等待, 
        if (listLength(bio_jobs[type]) == 0) {
            pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
            continue;
        }  

        // 获取列表的第一个任务
        ln = listFirst(bio_jobs[type]);
        job = ln->value; 

        // 释放锁, 这个锁的功能主要是为了确保任务的获取
        pthread_mutex_unlock(&bio_mutex[type]);

        if (type == BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == BIO_AOF_FSYNC) {
            redis_fsync((long)job->arg1);
        } else if (type == BIO_LAZY_FREE) {
            if (job->arg1)
                lazyfreeFreeObjectFromBioThread(job->arg1);
            else if (job->arg2 && job->arg3)
                lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
            else if (job->arg3)
                lazyfreeFreeSlotsMapFromBioThread(job->arg3);
        } else {
            serverPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        zfree(job);

        // 获取锁
        pthread_mutex_lock(&bio_mutex[type]);
        // 删除任务
        listDelNode(bio_jobs[type],ln);
        // 需要执行的任务减 1
        bio_pending[type]--;

        // 唤醒所有等待在 bio_step_cond 上的线程
        pthread_cond_broadcast(&bio_step_cond[type]);
    }      
}

添加任务

void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {

    struct bio_job *job = zmalloc(sizeof(*job));

    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
    
    // 获取锁
    pthread_mutex_lock(&bio_mutex[type]);
    // 添加任务
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;

    // 通知等待在 bio_newjob_cond 的线程
    pthread_cond_signal(&bio_newjob_cond[type]);
    // 释放锁
    pthread_mutex_unlock(&bio_mutex[type]);
}

7.3 pipe - 父子进程通信

Redis 的 AOF 重写机制和 RDB 类型, 也是通过 fork 创建子进程, 将整个 AOF 重写过程交给子进程处理。
不同的时: AOF 重写过程中会不断和父进程通信获取父进程的命令缓存。 父子进程之间就是通过 pipe 进行通讯的。

这里只做一下简单的了解。

#include <unistd.h>

int Pipe(int pipefd[2]);

通过 pipe 的函数可以在 2 个文件描述符之间建立一个通道, 第一个用来读 read(fd[0]), 第二个用来写 write(fd[1])

具体的分析可以看一下这篇文章 APUE读书笔记—进程间通信(IPC)之管道和有名管道(FIFO)

而 Redis 中建立了 3 套通道

int aofCreatePipes(void) {

    int fds[6] = {-1, -1, -1, -1, -1, -1};
    int j;

    //  parent -> children data
    if (pipe(fds) == -1) goto error;

    // children -> parent ack
    if (pipe(fds+2) == -1) goto error; 

    // parent -> children ack
    if (pipe(fds+4) == -1) goto error; 

    // 同步非阻塞
    if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
    if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;

    // 注册一个事件, 执行的函数为 aofChildPipeReadable, 里面的逻辑就是读取 aof_pipe_read_ack_from_child 的数据到 aof_pipe_write_ack_to_child
    if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;

    // 6 个通道
    server.aof_pipe_write_data_to_child = fds[1];
    server.aof_pipe_read_data_from_parent = fds[0];
    server.aof_pipe_write_ack_to_parent = fds[3];
    server.aof_pipe_read_ack_from_child = fds[2];
    server.aof_pipe_write_ack_to_child = fds[5];
    server.aof_pipe_read_ack_from_parent = fds[4];
    server.aof_stop_sending_diff = 0;
    return C_OK;
}

最终形成的效果如下:
Alt 'AOF 创建的通道'

父进程写入到 aof_pipe_read_data_from_parent 的数据会自动同步到子进程的 aof_pipe_read_data_from_parent, 另外 2 个类似。

至此, AOF 的一些概念和源码中相关的一些代码就介绍完了, 下篇开始真正的 AOF 源码分析。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1317813.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

指针必刷题(C语言指针就该这么学)【数据结构基础】【C语言指针必刷题】

前言&#xff1a;必备知识回忆 1.数组名的意义 i.sizeof(数组名&#xff09;&#xff0c;这里的数组名表示整个数组&#xff0c;计算的是整个数组的大小 ii.&数组名&#xff0c;这里的数组名表示整个数组&#xff0c;取出的是整个数组的地址 iii.除此之外&#xff0c;所…

基于urllib库的网页数据爬取

实验名称&#xff1a; 基于urllib库的网页数据爬取 实验目的及要求&#xff1a; 【实验目的】 通过本实验了解和掌握urllib库。 【实验要求】 1. 使用urllib库爬取百度搜索页面。 2. 使用urllib库获取百度搜索的关键字搜索结果&#xff08;关键字任选&#xff09;。 实验原理及…

电子元器件介绍——二极管(四)

电子元器件介绍 文章目录 电子元器件介绍前言一、二极管的基础知识二、二极管的分类三、二极管的应用总结 前言 这一节我们看一下二极管。 一、二极管的基础知识 PN结&#xff1a;是指一块半导体单晶&#xff0c;其中一部分是P型区&#xff0c;其余部分是N型区。 在电场作用…

C++ Qt开发:自定义Dialog对话框组件

Qt 是一个跨平台C图形界面开发库&#xff0c;利用Qt可以快速开发跨平台窗体应用程序&#xff0c;在Qt中我们可以通过拖拽的方式将不同组件放到指定的位置&#xff0c;实现图形化开发极大的方便了开发效率&#xff0c;本章将重点介绍自定义Dialog组件的常用方法及灵活运用。 在…

基于linux系统的Tomcat+Mysql+Jdk环境搭建(四)linux安装Mysql

1.切换到你需要安装mysql的路径 cd /root/usr/ 2.在线安装 安装网上的安装方式都有很多&#xff0c;可以自己百度一下 我们这里是自己搭建测试环境&#xff0c;可以直接选择在线安装&#xff0c;命令如下&#xff1a;yum install mysql-server&#xff0c; 但是我失败了 ┭┮…

在排序数组中查找元素的第一个和最后一个位置(Java详解)

一、题目描述 给你一个按照非递减顺序排列的整数数组 nums&#xff0c;和一个目标值 target。请你找出给定目标值在数组中的开始位置和结束位置。 如果数组中不存在目标值 target&#xff0c;返回 [-1, -1]。 你必须设计并实现时间复杂度为 O(log n) 的算法解决此问题。 示…

verilog语法进阶,时钟原语

概述&#xff1a; 内容 1. 时钟缓冲 2. 输入时钟缓冲 3. ODDR2作为输出时钟缓冲 1. 输入时钟缓冲 BUFGP verilog c代码&#xff0c;clk作为触发器的边沿触发&#xff0c;会自动将clk综合成时钟信号。 module primitive1(input clk,input a,output reg y); always (posed…

深度剖析JavaScript中冒泡和捕获机制、事件代理

JS事件传播的两种机制包括冒泡和捕获&#xff0c;下面将具体剖析它们之间本质的区别。 事件冒泡: 先触发子元素的事件&#xff0c;再触发父元素的事件。 创建一个 ul label 和 li label, 分别绑定一个父id 和 子 id, 再通过创建 script&#xff0c;去绑定各自的点击事件。 <…

Threejs利用着色器编写动态飞线特效

一、导语 动态飞线特效是可视化数据地图中常见的需求之一&#xff0c;鼠标点击的区块作为终点&#xff0c;从其他区块飞线至点击区块&#xff0c;附带颜色变换或者结合粒子动画 二、分析 利用创建3点来构成贝塞尔曲线&#xff0c;形成线段利用着色器材质来按照线段以及时间…

Linux---压缩和解压缩命令

1. 压缩格式的介绍 Linux默认支持的压缩格式: .gz.bz2.zip 说明: .gz和.bz2的压缩包需要使用tar命令来压缩和解压缩.zip的压缩包需要使用zip命令来压缩&#xff0c;使用unzip命令来解压缩 压缩目的: 节省磁盘空间 2. tar命令及选项的使用 命令说明tar压缩和解压缩命令 …

SE考研真题总结(三)

继续更新&#xff0c;今天准备连出两期该系列~ SE考研真题总结&#xff08;二&#xff09;https://blog.csdn.net/jsl123x/article/details/134857052?spm1001.2014.3001.5501 目录 一.简答题 二.代码大题 一.简答题 1.工程和科学的区别 科学是关于事物的基本原理和事实的…

代码随想录算法训练营 | day53 动态规划 1143.最长公共子序列,1035.不相交的线,53.最大子序和

刷题 1143.最长公共子序列 题目链接 | 文章讲解 | 视频讲解 题目&#xff1a;给定两个字符串 text1 和 text2&#xff0c;返回这两个字符串的最长公共子序列的长度。 一个字符串的 子序列 是指这样一个新的字符串&#xff1a;它是由原字符串在不改变字符的相对顺序的情况下…

Android动画(四)——属性动画ValueAnimator的妙用

目录 介绍 效果图 代码实现 xml文件 介绍 ValueAnimator是ObjectAnimator的父类&#xff0c;它继承自Animator。ValueAnimaotor同样提供了ofInt、ofFloat、ofObject等静态方法&#xff0c;传入的参数是动画过程的开始值、中间值、结束值来构造动画对象。可以将ValueAnimator看…

C#深拷贝效率对比

对于浅拷贝和深拷贝&#xff0c;前面的文章已经说明了。 C#浅拷贝和深拷贝数据-CSDN博客 本篇说一下&#xff0c;深拷贝的效率问题&#xff0c;效率一直是程序追求的&#xff0c;效率越高肯定越好&#xff0c;有时候功能是实现了&#xff0c;但是运行以及处理数据的效率非常低…

【算法】bfs与dfs算法解决FloodFill(洪流)问题(C++)

文章目录 1. 什么是FloodFill问题2. 用什么方法解决FloodFill问题3. 具体例题773.图像渲染200.岛屿数量695.岛屿的最大面积130.被围绕的区域 1. 什么是FloodFill问题 一般floodfill问题可以描述为&#xff1a;给定一个二维矩阵&#xff0c;其中每个元素代表一个像素点&#xf…

Python-flask 入门代码

python与pycharm安装 过程略&#xff0c;网上很多&#xff0c;记得为pycharm配置默认解释器 虚拟环境 pipenv # 全局安装虚拟环境 # 可加-U参数&#xff0c;明确全局安装&#xff0c;不加好像也可以? pip3 install pipenv #检查安装情况 pipenv --version # ---控制台输出…

跨域的解决方式(java后端)

文章目录 一、跨域介绍1、什么是跨域2、为什么会产生跨域3、禁止跨域的原因 二、简单请求和非简单请求1、简单请求1.1、什么时简单请求1.2、简单请求基础流程 2、非简单请求2.1、预检请求2.2、预检请求的回应2.3、浏览器的正常请求和回应 3、自定义跨域过滤器 三、解决方式1、C…

Java基础语法之抽象类和接口

抽象类 什么是抽象类 并不是所有的类都是用来描述对象的&#xff0c;这样的类就是抽象类 例如&#xff0c;矩形&#xff0c;三角形都是图形&#xff0c;但图形类无法去描述具体图形&#xff0c;所以它的draw方法无法具体实现&#xff0c;这个方法就可以没设计成抽象方法&…

003 Windows用户与组管理

Windows用户管理 一、用户账户 1、什么是用户账户 不同用户身份拥有不同的权限每个用户包含了一个名称和一个密码每个用户账户具有唯一的安全标识符查看系统中的用户 net user 安全标识符&#xff08;SID&#xff09; whoami /user 使用注册表查看 打开注册表命令regedi…

Sentinel使用详解

组件简介 Sentinel是阿里开源的一套用于服务容错的综合性解决方案。它以流量为切入点&#xff0c;从流量控制、熔断降级、系统负载保护等多个维度来保护服务的稳定性。Sentinel承接了阿里巴巴近10年的双十一大促流量的核心场景&#xff0c;例如秒杀、消息削峰填谷、集群流量控…