浅析Redis③:命令处理之数据返回Client(下)

news2024/11/20 18:38:03

写在前面

Redis作为我们日常工作中最常使用的缓存数据库,其重要性不言而喻,作为普通开发者,我们在日常开发中使用Redis,主要聚焦于Redis的基层数据结构的命令使用,很少会有人对Redis的内部实现机制进行了解,对于我而言,也是如此,但一直以来,我对于Redis的内部实现都很好奇,它为什么会如此高效,本系列文章是旨在对Redis源代码分析拆解,通过阅读Redis源代码,了解Redis基础数据结构的实现机制。

关于Redis的源码分析,已经有非常多的大佬写过相关的内容,最为著名的是《Redis设计与实现》,对于Redis源码的分析已经非常出色,本系列文章对于源码拆解时,并不会那么详细,相信大部分读者应该不是从事Redis的二次开发工作,对于源码细节过于深入,会陷入细节的泥潭,这是我在阅读源码时尽量避免的,我尽量做到对大体的脉络进行梳理,讲清楚主干逻辑,细节部分,如果读者有兴趣,可以自行参阅源码或相关资料。

本系列源代码,基于Redis 3.2.6

前言

在上两篇中

浅析Redis①:命令处理核心源码分析(上)

浅析Redis②:命令处理之epoll实现(中)

我们大致了解了Redis客户端命令请求的处理流程,在整个流程中,我们了解了Redis是如何处理来自客户端的命令请求,epoll的执行逻辑,我们还有最后一个问题没有解释,Redis是如何将数据写回Client端的?

本篇我们就围绕第一个问题,寻找答案,继续看Redis客户端命令请求的处理流程。

Redis数据返回Client端流程

Redis在命令处理时,在命令执行的末尾,都会调用一个addReply(),这里我们以最简单的STRING get为例:

t_string.c getGenericCommand()

int getGenericCommand(client *c) {
    robj *o;
	
    // 从字典中查询数据
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
        return C_OK;
	
    // 将数据返回Client
    if (o->type != OBJ_STRING) {
        addReply(c,shared.wrongtypeerr);
        return C_ERR;
    } else {
        addReplyBulk(c,o);
        return C_OK;
    }
}

void addReplyBulk(client *c, robj *obj) {
    addReplyBulkLen(c,obj);
    addReply(c,obj);
    addReply(c,shared.crlf);
}

继续看addReply()的实现:

networking.c addReply()

void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;

    // 核心,将数据写入内存缓存区,等待后续流程处理,写回Client Socket
    if (sdsEncodedObject(obj)) {
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyObjectToList(c,obj);
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) {
            char buf[32];
            int len;
            len = ll2string(buf,sizeof(buf),(long)obj->ptr);
            if (_addReplyToBuffer(c,buf,len) == C_OK)
                return;
        }
        obj = getDecodedObject(obj);
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyObjectToList(c,obj);
        decrRefCount(obj);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}

上述流程,是string get命令执行后,数据处理的流程,可以发现,Redis并没有将数据直接返回Client端,而是将数据写入了一个叫做缓冲区的内存区域,那么缓冲区是什么?

Redis的内存缓冲区

在 Redis 中,缓冲区(buffer)是用于存储数据的内存区域。Redis 使用缓冲区来管理数据的读取、写入和传输过程。

Redis 的缓冲区主要有两个方面的应用:

  • 输入缓冲区(Input Buffer):当 Redis 接收到客户端发送的命令请求时,会先将请求数据存储在输入缓冲区中,然后再进行解析和处理。输入缓冲区用于临时存储从网络或其他输入源接收到的原始数据。
  • 输出缓冲区(Output Buffer):当 Redis 响应客户端的命令请求时,会先将响应数据存储在输出缓冲区中,然后再发送给客户端。输出缓冲区用于临时存储待发送的数据。

缓冲区在 Redis 中的作用是提高数据的处理效率和性能。通过使用缓冲区,Redis 可以批量读取和写入数据,减少了频繁的系统调用和网络传输开销。此外,缓冲区还可以用于临时存储数据,以便进行数据的加工和处理。

需要注意的是,Redis 缓冲区大小是有限的,它受到配置参数 client-output-buffer-limit 和 client-query-buffer-limit 的影响。

如果缓冲区已满,而输入或输出数据仍在不断到达,则可能导致连接被拒绝或数据丢失。

因此,在高并发或大数据量的场景中,需要根据实际情况调整缓冲区大小以保证系统的稳定性和性能。

OK,命令处理部分流程结束,我们把逻辑拉回到main函数中,聚焦aeMain()

ae.c aeMain()

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

在前两篇中,我们介绍过aeMain(),这里使用一个死循环,aeProcessEvents()轮询epoll是否存在就绪的事件,在aeProcessEvents()之前,我们需要关注beforesleep()

if (eventLoop->beforesleep != NULL)
    eventLoop->beforesleep(eventLoop);

在轮询之前,都会执行beforesleep(),这个函数就是我们要关注的核心,继续看beforesleep()实现:

server.c beforeSleep()

void beforeSleep(struct aeEventLoop *eventLoop) {
....
....    
此处省略部分非核心代码
....  
    
    /* Write the AOF buffer on disk */
    flushAppendOnlyFile(0);

    // 将数据写回Client
    handleClientsWithPendingWrites();
}

networking.c handleClientsWithPendingWrites()

int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    int processed = listLength(server.clients_pending_write);

    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        ......
        省略部分非核心代码    

        // 核心,将数据通过socket返回Client
        if (writeToClient(c->fd,c,0) == C_ERR) continue;

        // 还有部分数据没有写完,加入epoll,等待异步执行
        if (clientHasPendingReplies(c) &&
            aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
                sendReplyToClient, c) == AE_ERR)
        {
            // 释放内存
            freeClientAsync(c);
        }
    }
    return processed;
}

上述代码是执行数据返回Client的核心逻辑,可以参见代码注释,令人疑惑的部分是,为什么这段代码中,writeToClient()可能会执行两次?

原因如下:

第一次调用 writeToClient() 是为了尝试向客户端套接字写入数据。这里的目的是将服务器待发送的数据写入到套接字缓冲区中,以便后续通过网络发送给客户端。如果写入成功,则会继续判断该客户端是否还有待发送的数据。

第二次调用 writeToClient() 是在判断客户端是否还有待发送的数据后执行的。如果客户端仍然有待发送的数据,那么说明套接字的发送缓冲区已满,无法一次性将所有数据发送出去。此时,为了确保后续的数据能够被及时发送,需要将该客户端的套接字注册到可写事件上,以便在套接字可写时继续发送剩余的数据。

需要注意的是,第二次调用 writeToClient() 并不会立即执行数据的发送,而是在套接字变为可写时由事件循环机制触发相应的写入操作。

这样可以避免在套接字无法写入数据时出现阻塞的情况,提高服务器的并发性能。

OK,我们继续看writeToClient() 的实现逻辑。

networking.c writeToClient()

int writeToClient(int fd, client *c, int handler_installed) {
    ssize_t nwritten = 0, totwritten = 0;
    size_t objlen;
    size_t objmem;
    robj *o;
	
    // 循环读取内存缓冲区的数据,写回socket,返回Client端
    while(clientHasPendingReplies(c)) {
        if (c->bufpos > 0) {
            // 核心,执行socket写回
            nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
            if (nwritten <= 0) break;
            c->sentlen += nwritten;
            totwritten += nwritten;

            /* If the buffer was sent, set bufpos to zero to continue with
             * the remainder of the reply. */
            if ((int)c->sentlen == c->bufpos) {
                c->bufpos = 0;
                c->sentlen = 0;
            }
        } else {
            o = listNodeValue(listFirst(c->reply));
            objlen = sdslen(o->ptr);
            objmem = getStringObjectSdsUsedMemory(o);

            if (objlen == 0) {
                listDelNode(c->reply,listFirst(c->reply));
                c->reply_bytes -= objmem;
                continue;
            }

            nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
            if (nwritten <= 0) break;
            c->sentlen += nwritten;
            totwritten += nwritten;

            /* If we fully sent the object on head go to the next one */
            if (c->sentlen == objlen) {
                listDelNode(c->reply,listFirst(c->reply));
                c->sentlen = 0;
                c->reply_bytes -= objmem;
            }
        }
            
        .........
    	省略部分非核心代码
    	.........    
    }
    
    .........
    省略部分非核心代码
    .........    
    
    return C_OK;
}

writeToClient()就是核心写入的部分了,这里获取redisClient对象的bufpos,可以理解为缓冲区中的标记位置,如果存在待写入的数据,循环调用系统方法write写入socketFD中。

write() 函数用于向文件描述符(包括套接字)写入数据。在这段代码中,write() 函数被用于将数据写入到客户端的套接字中,即向客户端发送数据。

就此,Redis将数据返回Client的流程,我们就了解完毕。

老规矩,我们还是用一张流程图来简略描述整个过程:
Redis命令执行结果返回Client流程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1427827.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

YOLOv8-Segment C++

YOLOv8-Segment C https://github.com/triple-Mu/YOLOv8-TensorRT 这张图像是运行yolov8-seg程序得到的结果图&#xff0c;首先是检测到了person、bus及skateboard(这个是检测错误&#xff0c;将鞋及其影子检测成了滑板&#xff0c;偶尔存在错误也属正常)&#xff0c;然后用方…

4D毫米波雷达——ADCNet 原始雷达数据 目标检测与可行驶区域分割

前言 本文介绍使用4D毫米波雷达&#xff0c;基于原始雷达数据&#xff0c;实现目标检测与可行驶区域分割&#xff0c;它是来自2023-12的论文。 会讲解论文整体思路、输入分析、模型框架、设计理念、损失函数等&#xff0c;还有结合代码进行分析。 论文地址&#xff1a;ADCNe…

Flink实时数仓同步:快照表实战详解

一、背景 在大数据领域&#xff0c;初始阶段业务数据通常被存储于关系型数据库&#xff0c;如MySQL。然而&#xff0c;为满足日常分析和报表等需求&#xff0c;大数据平台采用多种同步方式&#xff0c;以适应这些业务数据的不同存储需求。这些同步存储方式包括离线仓库和实时仓…

nvm 工具使用介绍

目录 1.背景2.nvm介绍3.下载和安装4.配置环境变量5.配置淘宝镜像5.1 方式一&#xff1a;直接执行命令5.2 方式二&#xff1a;修改配置文件 6.常用命令7.总结 下载地址&#xff1a; https://github.com/coreybutler/nvm-windows/releases 1.背景 在工作中&#xff0c;我们可能…

总分类账户和明细分类账户

目录 一. 设置二. 联系与区别三. 平行记账规则 \quad 一. 设置 \quad 根据总分类科目设置总分类账户 根据明细分类科目设置明细分类账户 \quad 二. 联系与区别 \quad \quad 三. 平行记账规则 \quad

【C++11(一)】列表初始化and右值引用

一、 统一的列表初始化 1.1 &#xff5b;&#xff5d;初始化 在C98中&#xff0c;标准允许 使用花括号{}对数组或者结构体元素 进行统一的列表初始值设定 C11扩大了用大括号 括起的列表(初始化列表)的使用范围 使其可用于所有的内置类型和 用户自定义的类型 使用初始化列表时…

bash脚本学习笔记

一、扫盲 脚本文件是一种文本文件&#xff0c;其中包含了一系列的命令和指令&#xff0c;可以被操作系统解释器直接解释执行。脚本文件通常被用来完成特定的任务或执行重复性的操作。 脚本文件通常以某种编程语言的语法编写&#xff0c;例如 Bash、Python、Perl、Ruby 等等。…

IDEA JDBC配置

一、在pom中添加依赖 <dependencies><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency></dependencies> 然后同步一下 二、编写代码…

Linux 系统服务

一、CentOS 6 与 CentOS 7开机流程 1.CentOS 6 1.1.打开电源首先通过内核引导开机。 1.2.开机自检&#xff0c;加载 BIOS 硬件信息。 1.3.MBR 记录一般是在磁盘0磁道0扇区&#xff0c;共512个字节。前446个字节是 BootLoder。计算机不知道我们的系统在哪里&#xff0c;所以需…

【Java数据结构】模拟实现ArrayList

import java.util.Arrays;/*** ArrayList的模拟实现*/ class SeqList{private int[] elem;private int usedSize0;//记录当前顺序表有多少个有效数字public static final int DEFAULT_CAPACITY10;//默认大小是10public SeqList(){this.elemnew int[DEFAULT_CAPACITY];}//新增元…

基于FFT + CNN -Transformer时域、频域特征融合的电能质量扰动识别模型

目录 往期精彩内容&#xff1a; ​模型整体结构 1 快速傅里叶变换FFT原理介绍 第一步&#xff0c;导入部分数据&#xff0c;扰动信号可视化 第二步&#xff0c;扰动信号经过FFT可视化 2 电能质量扰动数据的预处理 2.1 导入数据 2.2 制作数据集 3 基于FFTCNN-Transform…

服务攻防-端口协议桌面应用QQWPS等RCEhydra口令猜解未授权检测

知识点&#xff1a; 1、端口协议-弱口令&未授权&攻击方式等 2、桌面应用-社交类&文档类&工具类等 章节点&#xff1a; 1、目标判断-端口扫描&组合判断&信息来源 2、安全问题-配置不当&CVE漏洞&弱口令爆破 3、复现对象-数据库&中间件&…

AS-V1000产品介绍:支持GA/T1400视图库标准(可通过GA/T1400接入海康、华为、大华等图传前端设备,实现图传功能)

目 录 一、概述 二、AS-V1000视频监控管理平台的特点 二、视频监控平台通过GA/T1400接入前端设备 &#xff08;一&#xff09;接入华为GA/T1400前端设备 &#xff08;二&#xff09;接入大华GA/T1400前端设备 &#xff08;三&#xff09;接入海康威视GA/…

QML ListView 列表视图

作者: 一去、二三里 个人微信号: iwaleon 微信公众号: 高效程序员 虽然 Repeater 在重复创建多个相似项的时候很方便,但是通常只适用于有限的简单元素,并且它还无法滚动浏览。而基于 Flickable 的视图组件(如 GridView、ListView、TableView、TreeView 等)则弥补了这些缺…

vue-3d-model

vue-3d-model - npm GitHub - hujiulong/vue-3d-model: &#x1f4f7; vue.js 3D model viewer component 通过该插件降低Threejs的使用难度 vue项目加载三维模型&#xff0c;我把模型放在了服务器的tomcat里面&#xff0c;需要对tomcat的fbx项目文件夹设置跨域&#xff0c;如…

Docker容器引擎(5)

目录 一.docker-compose docker-compose的三大概念&#xff1a; yaml文件格式&#xff1a; json文件格式&#xff1a; docker-compose 配置模板文件常用的字段&#xff1a; 二.Docker Compose 环境安装&#xff1a; 查看版本&#xff1a; 准备好nginx 的dockerfile的文…

微信小程序(三十一)本地同步存储API

注释很详细&#xff0c;直接上代码 上一篇 新增内容&#xff1a; 1.存储数据 2.读取数据 3.删除数据 4.清空数据 源码&#xff1a; index.wxml <!-- 列表渲染基础写法&#xff0c;不明白的看上一篇 --> <view class"students"><view class"item…

银行数据仓库体系实践(16)--数据应用之财务分析

总账系统 在所有公司中&#xff0c;财务分析的基础都是核算&#xff0c;那在银行的系统体系中&#xff0c;核算功能在业务发生时由业务系统如核心、贷款、理财中实现登记&#xff0c;各业务系统会在每天切日后统计当天各机构的核算科目的发生额与余额&#xff0c;并统一送到总账…

k8s二进制及负载均衡集群部署详解

目录 常见部署方式 二进制部署流程 环境准备 操作系统初始化配置 关闭防火墙 配置SELinux 关闭SWAP 根据规划设置主机名 在master添加hosts&#xff0c;便于主机名解析 调整内核参数 配置时间同步 部署docker引擎 在所有node节点部署docker引擎 部署etcd集群 签发…

shell脚本自动备份数据库表

今日目标&#xff1a;shell脚本自动备份数据库中的表并记录执行日志和mysql输出日志 编写思路&#xff1a; &#xff08;1&#xff09;shell脚本运行mysql命令 &#xff08;2&#xff09;脚本输出记录到日志中 &#xff08;3&#xff09;定时任务自动执行shell脚本 1、she…