来聊聊redis集群数据迁移

news2024/12/24 22:18:35

写在文章开头

本文将是笔者对于redis源码分析的一个阶段的最后一篇,将从源码分析的角度让读者深入了解redis节点迁移的工作流程,希望对你有帮助。

在这里插入图片描述

Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili

因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

详解redis cluster数据迁移过程

节点基本结构定义

redis集群提供16384slot,我们可以按需分配给节点上,后续进行键值对存储时,我们就可以按照算法将键值对存到对应slot上的redis服务器上:
在这里插入图片描述

集群节点本质就是通过slots这个数组记录当前节点的所管理的情况,这里我们可以看到slots是一个char 数组,长度为REDIS_CLUSTER_SLOTS(16384)除8,这样做的原因是因为:

  1. char占1个字节,每个字节8位。
  2. 每个char可以记录8个slot的情况,如果是自己的slot则对应char的某一个位置记录为1:

我们以node-1为例,因为它负责0-5460的节点,所以它的slots0-5460都为1,对应的图解如下所示,可以看到笔者这里省略了后半部分,仅仅表示了0-15位置为1:

在这里插入图片描述

对此我们也给出这段redis中节点的定义,即位于cluster.h中的clusterNode这个结构体中,可以看slots这段定义:

typedef struct clusterNode {
  //......
    //记录集群负责的槽,总的为16384
    unsigned char slots[REDIS_CLUSTER_SLOTS/8]; 
    //......
}

设置slot后续节点走向

以本文示例为例,我们希望后续节点2的数据全部存到节点1中,那么我们首先需要键入如下两条配置:

# 在节点1上执行,将节点2数据导入到节点1上
 CLUSTER SETSLOT 3 IMPORTING node2
 # 在节点2上执行,将自己的数据迁移到节点1
 CLUSTER SETSLOT 3 MIGRATING node1

这两条指最终都会被各自的服务端解析,并调用clusterCommand执行,我们以节点1导入为例,假设我们执行clusterCommand解析到setslot 关键字和importing关键字,即知晓要导入其他节点的数据。对应的节点1就会通过importing_slots_from数组标记自己将导入这个slot的数据,而节点2也会通过migrating_slots_to数组标记自己要将数据导出给其他节点的slot:

在这里插入图片描述

对此我们给出clusterCommand的执行流程,可以看到该函数解析出migrating或者importing关键字时就会将对的migrating_slots_to或者importing_slots_from数组对应slot位置的索引位置设置为当前上述命令传入的node id

void clusterCommand(redisClient *c) {
   		//......

        if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {//处理迁出的逻辑
            //看看自己是否有迁出的slot,没有则报错
            if (server.cluster->slots[slot] != myself) {
                addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
                return;
            }
            //查看自己是否知晓这个node id,如果没有则报错
            if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
                addReplyErrorFormat(c,"I don't know about node %s",
                    (char*)c->argv[4]->ptr);
                return;
            }
            //标记迁出到slot为传入的node
            server.cluster->migrating_slots_to[slot] = n;
        } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {//处理迁入的逻辑
            //查看迁入的slot是否已经配置,如果有则报错
            if (server.cluster->slots[slot] == myself) {
                addReplyErrorFormat(c,
                    "I'm already the owner of hash slot %u",slot);
                return;
            }
            //查看自己是否知晓要迁入数据的node的信息,如果不知道则报错
            if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
                addReplyErrorFormat(c,"I don't know about node %s",
                    (char*)c->argv[3]->ptr);
                return;
            }
            //标记迁入slot位置为传入的nodeid
            server.cluster->importing_slots_from[slot] = n;
        } //......
}

后续的我们假设还是将set key value请求发送到节点2,因为上述命令的原因,节点会返回move/ask告知客户端这个键值对现在要存到节点1上。对应节点1收到这个key请求时,通过key计算得slot正是自己,它就会将这个键值对存储到自己的数据库中:

在这里插入图片描述

这里我们以节点1的角度查看这个问题,当客户端收到move指令后,继续向节点1发送指令,节点1通过收到指令调用processCommand,其内部调用getNodeByQuery获取当前key对应的slot,发现是自己则直接存储数据到当前节点的内存数据库中:

int processCommand(redisClient *c) {
    //......
    //如果开启了集群模式,且发送者不是master且参数带key则进入逻辑
    if (server.cluster_enabled &&
        !(c->flags & REDIS_MASTER) &&
        !(c->flags & REDIS_LUA_CLIENT &&
          server.lua_caller->flags & REDIS_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
    {
        int hashslot;

        if (server.cluster->state != REDIS_CLUSTER_OK) {
           //......
        } else {
            int error_code;
            //查找键值对对应的slot和这个slot负责的节点
            clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
            //如果为空且或者非自己,则转交出去给别人处理
            if (n == NULL || n != server.cluster->myself) {
                flagTransaction(c);
                clusterRedirectClient(c,n,hashslot,error_code);
                return REDIS_OK;
            }
        }
    }
	//......
	//将键值对存储到当前数据库中
}

我们以节点的视角再次直接步入getNodeByQuery查看这段逻辑,可以看到其内部会基于key计算slot然后将得到对应的node,如果发现这个node是自己且属于importing_slots_from,即说明是客户端通过move或者ask请求找到自己的,则进行进一步是否是多条指令执行且存在key找不到存储位置的情况,若存在则返回空,反之都是直接返回当前节点信息,即node2的新数据直接迁移过来:

clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
    //......
    //遍历命令
    for (i = 0; i < ms->count; i++) {
       //.....
		//获取指令、参数个数、参数
        mcmd = ms->commands[i].cmd;
        margc = ms->commands[i].argc;
        margv = ms->commands[i].argv;
        //解析出key以及个数
        keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
        for (j = 0; j < numkeys; j++) {
            //拿到key
            robj *thiskey = margv[keyindex[j]];
            //计算slot
            int thisslot = keyHashSlot((char*)thiskey->ptr,
                                       sdslen(thiskey->ptr));

           		//.....
                //如果就是当前节点正在做迁出或者迁入,则migrating_slot/importing_slot设置为1
                if (n == myself &&
                    server.cluster->migrating_slots_to[slot] != NULL)
                {
                    migrating_slot = 1;
                } else if (server.cluster->importing_slots_from[slot] != NULL) {
                    importing_slot = 1;
                }
            } else {
               //.....
//.....
        }
      //.....
    }
	//如果设置了导入标识为1且标识为asking则步入这段逻辑,
	if (importing_slot &&
        (c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING))
    {	//当前指令有多个key且存在未命中的则返回空,反之返回自己
        if (multiple_keys && missing_keys) {
            if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE;
            return NULL;
        } else {
            return myself;
        }
    }

    //.....
    //返回节点信息以本示例来说就是返回当前节点信息
    return n;
}

完成节点迁移

上述操作仅仅针对新节点的迁移,对于旧的节点我们就需要通过节点2键入CLUSTER GETKEYSINSLOT slot count要迁移的旧的keyslot,然后通过MIGRATE host port key dbid timeout [COPY | REPLACE]将数据迁移到节点1上。
这里我们补充一下MIGRATEcopy和replace的区别,前者是遇到重复直接报错,后者是迁移时直接覆盖。
最终这条指令回基于要迁移的key而生成一条RESTORE-ASKING key ttl serialized-value [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency]指令发送给导入的节点,以本文例子来说就是节点1:

在这里插入图片描述

这里我们给出MIGRATE 指令对应的处理函数migrateCommand,逻辑和我上文说的差不多,基于指令解析出replace或者copy等信息,然后用argv[3]即我们的key得出这个键值对的信息生成RESTORE指令将键值对转存给节点1:

/* 命令 MIGRATE host port key dbid timeout [COPY | REPLACE] */
void migrateCommand(redisClient *c) {
   
   	//......
    //解析拷贝和替代选项,前者重复会报错
    for (j = 6; j < c->argc; j++) {
        if (!strcasecmp(c->argv[j]->ptr,"copy")) {
            copy = 1;
        } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
            replace = 1;
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }

  //......
    //查看要迁移的key是否存在吗,如果不存则直接报错返回
    if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
        addReplySds(c,sdsnew("+NOKEY\r\n"));
        return;
    }

    /* Connect */
    //建立socket连接
    cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
    //......

    //cmd初始化一个buf缓冲区
    rioInitWithBuffer(&cmd,sdsempty());

    /* Send the SELECT command if the current DB is not already selected. */
    //如果尚未选择当前DB,则发送SELECT命令。
    int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
    if (select) {
        redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
        redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
        redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
    }

    /* Create RESTORE payload and generate the protocol to call the command. */
    //获取key的过期时效
    expireat = getExpire(c->db,c->argv[3]);
    if (expireat != -1) {
        ttl = expireat-mstime();
        if (ttl < 1) ttl = 1;
    }
 
    //集群用RESTORE-ASKING发送key给目标
    if (server.cluster_enabled)
        redisAssertWithInfo(c,NULL,
            rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
    else
        redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
   //填充key和value ttl等
    redisAssertWithInfo(c,NULL,sdsEncodedObject(c->argv[3]));
    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,
            sdslen(c->argv[3]->ptr)));
    redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));

   //......
    //迁移指令字符串写入缓冲区
    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
                                sdslen(payload.io.buffer.ptr)));
   //......
    //如果是replace发出 REPLACE
    if (replace)
        redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));

  

	//......
}

最后调整

最后我们只需在节点1和2都执行CLUSTER SETSLOT <SLOT> NODE <NODE ID> 完成slot指派,这指令最终就会走到clusterCommand中,节点1和节点2格子的处理逻辑为:

  1. 节点2看看迁移的key是否不存则且migrating_slots_to数据不为空,若符合要求说明迁移完成但状态未修改,直接将migrating_slots_to置空完成指派最后调整。
  2. 节点1查看节点id是否是自己且importing_slots_from是否有数据,若有则说明节点导入完成,直接将importing_slots_from置空。

void clusterCommand(redisClient *c) {
    //......
     else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {//处理setslot指令
          //......
		 else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
            /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> 标记最终迁移的节点 */
            clusterNode *n = clusterLookupNode(c->argv[4]->ptr);

          	//......
            //如果发现对应的key为0,且migrating_slots_to不为空,则说明迁出完成但状态还未修改,节点2会将migrating_slots_to设置为空
            if (countKeysInSlot(slot) == 0 &&
                server.cluster->migrating_slots_to[slot])
                server.cluster->migrating_slots_to[slot] = NULL;

          	//如果是节点1则会看指令的nodeid是否是自己且importing_slots_from是否有数据,若有则说明导入成功直接将importing_slots_from设置为空
            if (n == myself &&
                server.cluster->importing_slots_from[slot])
            {
              //......
                server.cluster->importing_slots_from[slot] = NULL;
            }
           
        }
		//......
}

小结

自此我们将redis集群中的所有核心设计都分析完成,希望对你有帮助。

我是 sharkchiliCSDN Java 领域博客专家开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

参考

Redis的slot迁移:https://blog.csdn.net/Aquester/article/details/107935887

CLUSTER GETKEYSINSLOT:https://redis.io/docs/latest/commands/cluster-getkeysinslot/

RESTORE-ASKING:https://redis.io/docs/latest/commands/restore-asking/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1942389.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JavaScript青少年简明教程:赋值语句

JavaScript青少年简明教程&#xff1a;赋值语句 赋值语句&#xff08;assignment statement&#xff09; JavaScript的赋值语句用于给变量、对象属性或数组元素赋值。赋值语句的基本语法是使用符号 () 将右侧的值&#xff08;称为“源操作数”&#xff09;赋给左侧的变量、属…

Docker Minio rclone数据迁移

docker minio进行数据迁移 使用rclone进行数据迁移是一种非常灵活且强大的方式&#xff0c;特别是在处理大规模数据集或跨云平台迁移时。rclone是一款开源的命令行工具&#xff0c;用于同步文件和目录到多种云存储服务&#xff0c;包括MinIO。下面是使用rclone进行数据迁移至Mi…

【RT摩拳擦掌】RT600 4路音频同步输入1路TDM输出方案

【RT摩拳擦掌】RT600 4路音频同步输入1路TDM输出方案 一&#xff0c; 文章简介二&#xff0c;硬件平台构建2.1 音频源板2.2 音频收发板2.3 双板硬件连接 三&#xff0c;软件方案与软件实现3.1 方案实现3.2 软件代码实现3.2.1 4路I2S接收3.2.2 I2S DMA pingpong配置3.2.3 音频数…

卧室激光投影仪推荐一下哪款效果最好?当贝X5S亮度卧室开灯照样清晰

现在家庭卧室装投影仪也不是什么稀奇的事情了&#xff0c;外面客厅看电视机&#xff0c;里面卧室投影仪直接投白墙各有各的优势。躺在卧室的床上&#xff0c;看超大屏投影真的很惬意。卧室投影的品类比较多&#xff0c;有些价格便宜的投影宣传说卧室看很适合&#xff0c;其实不…

设计模式12-构建器

设计模式12-构建器 由来和动机原理思想构建器模式的C代码实现构建器模式中的各个组件详解1. 产品类&#xff08;Product&#xff09;2. 构建类&#xff08;Builder&#xff09;3. 具体构建类&#xff08;ConcreteBuilder&#xff09;4. 指挥者类&#xff08;Director&#xff0…

实战:OpenFeign使用以及易踩坑说明

OpenFeign是SpringCloud中的重要组件&#xff0c;它是一种声明式的HTTP客户端。使用OpenFeign调用远程服务就像调用本地方法一样&#xff0c;但是如果使用不当&#xff0c;很容易踩到坑。 Feign 和OpenFeign Feign Feign是Spring Cloud组件中的一个轻量级RESTful的HTTP服务客…

rabbitmq生产与消费

一、rabbitmq发送消息 一、简单模式 概述 一个生产者一个消费者模型 代码 //没有交换机&#xff0c;两个参数为routingKey和消息内容 rabbitTemplate.convertAndSend("test1_Queue","haha");二、工作队列模式 概述 一个生产者&#xff0c;多个消费者&a…

C4D2024软件下载+自学C4D 从入门到精通【学习视频教程全集】+【素材笔记】

软件介绍与下载&#xff1a; 链接&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1n8cripcv6ZTx4TBNj5N04g?pwdhfg5 提取码&#xff1a;hfg5 基础命令的讲解&#xff1a; 掌握软件界面和基础操作界面。学习常用的基础命令&#xff0c;如建模、材质、灯光、摄像机…

设计模式-领域逻辑模式-结构映射模式

对象和关系之间的映射&#xff0c;关键问题在于二者处理连接的方式不同。 表现出两个问题&#xff1a; 表现方法不同。对象是通过在运行时&#xff08;内存管理环境或内存地址&#xff09;中保存引用的方式来处理连接的&#xff0c;关系数据库则通过创建到另外一个表的键值来处…

昇思25天学习打卡营第19天|munger85

Diffusion扩散模型 它并没有那么复杂&#xff0c;它们都将噪声从一些简单分布转换为数据样本&#xff0c;Diffusion也是从纯噪声开始通过一个神经网络学习逐步去噪&#xff0c;最终得到一个实际图像 def rearrange(head, inputs): b, hc, x, y inputs.shape c hc // head r…

大数据平台之HBase

HBase是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统&#xff0c;是Apache Hadoop生态系统的重要组成部分。它特别适合大规模结构化和半结构化数据的存储和检索&#xff0c;能够处理实时读写和批处理工作负载。以下是对HBase的详细介绍。 1. 核心概念 1.1 表&#x…

TIA博途V19无法勾选来自远程对象的PUT/GET访问的解决办法

TIA博途V19无法勾选来自远程对象的PUT/GET访问的解决办法 TIA博途升级到V19之后,1500CPU也升级到了V3.1的固件,1200CPU升级到了V4.6.1的固件, 固件升级之后,又出现了很多问题,如下图所示,在组态的时候会多出一些东西, 添加CPU之后,在属性界面可以看到“允许来自远程对象…

第二讲:NJ网络配置

Ethernet/IP网络拓扑结构 一. NJ EtherNet/IP 1、网络端口位置 NJ的CPU上面有两个RJ45的网络接口,其中一个是EtherNet/IP网络端口(另一个是EtherCAT的网络端口) 2、网络作用 如图所示,EtherNet/IP网络既可以做控制器与控制器之间的通信,也可以实现与上位机系统的对接通…

python爬虫基础——Webbot库介绍

本文档面向对自动化网页交互、数据抓取和网络自动化任务感兴趣的Python开发者。无论你是初学者还是有经验的开发者&#xff0c;Webbot库都能为你的自动化项目提供强大的支持。 Webbot库概述 Webbot是一个专为Python设计的库&#xff0c;用于简化网页自动化任务。它基于Seleniu…

高速ADC模拟输入接口设计

目录 基本输入接口考虑 输入阻抗 输入驱动 带宽和通带平坦度 噪声 失真 变压器耦合前端 有源耦合前端网络 基本输入接口考虑 采用高输入频率、高速模数转换器(ADC)的系统设计是一 项具挑战性的任务。ADC输入接口设计有6个主要条件&#xff1a; 输入阻抗、输入驱动、带宽…

【RaspberryPi】树莓派系统UI优化

接上文&#xff0c;如何去定制一个树莓派的桌面系统&#xff0c;还是以CM4为例。 解除CM4上电USB无法使用问题 将烧录好的tf卡通过读卡器插入到电脑上&#xff0c;进入boot磁盘&#xff0c;里面有一个Config文件&#xff0c;双击用记事本打开&#xff0c;在【pi4】一栏里加入一…

农业农村大数据底座:实现智慧农业的关键功能

随着信息技术的快速发展&#xff0c;农业领域也在逐步实现数字化转型。农业农村大数据底座作为支持智慧农业发展的重要基础设施&#xff0c;承载了多种关键功能&#xff0c;为农业生产、管理和决策提供了前所未有的支持和可能性。 ### 1. 数据采集与监测 农业农村大数据底座首…

【k8s故障处理篇】calico-kube-controllers状态为“ImagePullBackOff”解决办法

【k8s故障处理篇】calico-kube-controllers状态为“ImagePullBackOff”解决办法 一、环境介绍1.1 本次环境规划1.2 kubernetes简介1.3 kubernetes特点二、本次实践介绍2.1 本次实践介绍2.2 报错场景三、查看报错日志3.1 查看pod描述信息3.2 查看pod日志四、报错分析五、故障处理…

【Docker】Docker Desktop - WSL update failed

问题描述 Windows上安装完成docker desktop之后&#xff0c;第一次启动失败&#xff0c;提示&#xff1a;WSL update failed 解决方案 打开Windows PowerShell 手动执行&#xff1a; wsl --set-default-version 2 wsl --update

使用C#手搓Word插件

WordTools主要功能介绍 编码语言&#xff1a;C#【VSTO】 1、选择 1.1、表格 作用&#xff1a;全选文档中的表格&#xff1b; 1.2、表头 作用&#xff1a;全选文档所有表格的表头【第一行】&#xff1b; 1.3、表正文 全选文档中所有表格的除表头部分【除第一行部分】 1.…