Redis 接收连接

news2025/4/24 0:48:51

阅读本文前,建议先看:Redis 事件循环(Event Loop)。

Redis 6 支持接收 3 种连接,对应的接收处理器如下:

  • TCPacceptTcpHandler
  • TLSacceptTLSHandler
  • Unix SocketacceptUnixHandler

本文只关注 TCP 连接。

对于监听 Socket,没必要为它们创建 connection 对象和 client 对象。

acceptTcpHandler

在事件循环过程中,epoll 监听到连接(读)事件,就会调用事先注册好的接收处理器 acceptTcpHandler 来处理。

acceptTcpHandler 定义在 networking.c 中。

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    // 每一次最多接收MAX_ACCEPTS_PER_CALL(1000)个连接
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];	// 保存客户端ip地址
    // 未使用的变量,编译器会报警,这里关闭编译器警告
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    // 不断接收客户端连接,直到满1000个
    while(max--) {
        // 最终调用socket层系统调用 accept 接收连接
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        anetCloexec(cfd); // 设置FD_CLOEXEC标志
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        // 目前只有 Unix Socket 会传递 CLIENT_UNIX_SOCKET 标志
        // tcp 和 tls 都是 0
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
}

accept 获取 fd

anetTcpAccept 实际调用 accept 系统调用从内核拿完成 TCP 三次握手的连接,返回连接的句柄(fd)。后续对连接的所有操作都需要直接或间接的依赖这个 fd。

Linux 内核协议栈帮我们完成 TCP 三次握手,然后内核将建立好的 TCP 连接放到监听 Socket 的全连接队列,等待用户态应用程序调用 accept 系统调用获取连接。

创建连接对象

调用 connCreateAcceptedSocket 创建 connection 对象,并用 connection 对象包装连接句柄 fd。后续对底层 fd 的操作都由 connection 对象代理。

connection *connCreateAcceptedSocket(int fd) {
    connection *conn = connCreateSocket(); // 创建connection对象
    conn->fd = fd;	// 关联底层fd
    conn->state = CONN_STATE_ACCEPTING;
    return conn;
}

connection *connCreateSocket() {
    connection *conn = zcalloc(sizeof(connection)); // 分配连接对象
    conn->type = &CT_Socket;	// 链接类型是CT_Socket
    conn->fd = -1;

    return conn;
}
连接对象
struct connection {
    ConnectionType *type; 	// 连接类型
    ConnectionState state;	// 连接状态
    short int flags;
    short int refs;			// 引用计数
    int last_errno;
    void *private_data;		// 指向client结构指针
    ConnectionCallbackFunc conn_handler; 	// 接收处理器
    ConnectionCallbackFunc write_handler;	// 写处理器
    ConnectionCallbackFunc read_handler;	// 写处理器
    int fd;		// 封装了底层的fd
};
连接类型

连接类型被抽象成 ConnectionType 结构体,该结构体中定义了一系列函数接口,具体的连接类型(TCP/TLS)实现它们自己的函数接口。

TCP 连接的 ConnectionType 实现是 CT_Socket(Unix Socket 实际上复用的 TCP 连接类型)。Redis 6 引入了 SSL/TLS,因此 TLS 的 ConnectionType 实现是 CT_TLS

CT_Socket 定义在 connection.c 中。

ConnectionType CT_Socket = {
    .ae_handler = connSocketEventHandler,
    .close = connSocketClose,
    .write = connSocketWrite,
    .read = connSocketRead,
    .accept = connSocketAccept,
    .connect = connSocketConnect,
    .set_write_handler = connSocketSetWriteHandler,
    .set_read_handler = connSocketSetReadHandler,
    .get_last_error = connSocketGetLastError,
    .blocking_connect = connSocketBlockingConnect,
    .sync_write = connSocketSyncWrite,
    .sync_read = connSocketSyncRead,
    .sync_readline = connSocketSyncReadLine,
    .get_type = connSocketGetType
};

通用的 accept 流程

将创建的 connection 对象作为参数调用 acceptCommonHandler,接下来进入通用的 accept 流程。

对于 TCP 和 TLS ,flags 都为 0,对于 Unix_Socket,flagsCLIENT_UNIX_SOCKET

#define MAX_ACCEPTS_PER_CALL 1000
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
    client *c;
    char conninfo[100];
    UNUSED(ip);

    // 检查连接状态
    if (connGetState(conn) != CONN_STATE_ACCEPTING) {
        // ..... 记录日志
        connClose(conn); // 关闭连接
        return;
    }

     /* 检查当前总连接数(客户端 + 集群内部连接)是否超过 maxclients 限制。*/
    if (listLength(server.clients) + getClusterConnectionsCount() >= server.maxclients)
    {
        char *err;
        if (server.cluster_enabled)
            err = "-ERR max number of clients + cluster "
                  "connections reached\r\n";
        else
            err = "-ERR max number of clients reached\r\n";

        // 发送错误消息
        if (connWrite(conn,err,strlen(err)) == -1) {
            /* Nothing to do, Just to avoid the warning... */
        }
        server.stat_rejected_conn++;	// 被拒绝连接数加1,用于统计
        connClose(conn); // 关闭连接
        return;
    }

    // 创建client客户端对象,并包装连接对象
    if ((c = createClient(conn)) == NULL) {
        // .... 记录警告日志
        connClose(conn); /* May be already closed, just ignore errors */
        return;
    }

    /* Last chance to keep flags */
    c->flags |= flags;

    /* 启动连接接受操作。
     * 请注意,connAccept() 在此处可以自由地执行以下两件事之一:
     *    立即调用 clientAcceptHandler() 函数;
     *    安排在未来的某个时间调用 clientAcceptHandler() 函数。
     * 正因为如此,在调用 connAccept() 之后,我们不能再执行其他任何操作。
     */
    if (connAccept(conn, clientAcceptHandler) == C_ERR) {
        char conninfo[100];
        if (connGetState(conn) == CONN_STATE_ERROR)
            serverLog(LL_WARNING,
                    "Error accepting a client connection: %s (conn: %s)",
                    connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
        freeClient(connGetPrivateData(conn)); // 销毁客户端对象
        return;
    }
}
创建客户端对象

createClient 方法用于创建 client,它代表着连接到 Redis 客户端,每个客户端都有各自的输入缓冲区输出缓冲区,输入缓冲区存储客户端通过网络发送过来的数据,输出缓冲区则存储着 Redis 对客户端的响应数据。client 有三种类型,不同类型的对应缓冲区的大小都不同。

  • 普通客户端:除了复制和订阅的客户端之外的所有连接
  • slave 客户端:用于主从复制,主节点会为每个从节点单独建立一条连接用于命令复制。
  • 订阅客户端:用于发布订阅功能。

createClient 位于 networking.c

client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));// 分配对象
    // 将 NULL 作为 conn 传入就有可能创建一个无连接的客户端对象
    // 这是很有用的,因为所有命令都需要在客户端的上下文中执行。
    // 当在其他上下文(例如一个 Lua 脚本)中执行命令时,我们就需要一个无连接的客户端对象。
    if (conn) {
        connNonBlock(conn);	// 连接设置为非阻塞
        // 设置TCP_NODELAY选项,禁用 nagle 算法,存放到内核缓冲区中的数据会立即发出。
        // 否则如果一次放到内核缓冲区中的数据数据包太小,则会在多个小的数据包凑成一个足够大的数据包后才会将数据发出。
        connEnableTcpNoDelay(conn);
        if (server.tcpkeepalive)
            connKeepAlive(conn,server.tcpkeepalive);	// 设置SO_KEEPALIVE选项,TCP层面的心跳检测
        // 设置连接的读处理器为readQueryFromClient
        connSetReadHandler(conn, readQueryFromClient);
        connSetPrivateData(conn, c);	// conn->private_data = client
    }

    selectDb(c,0);	// 默认选择0号数据库
    uint64_t client_id;
    atomicGetIncr(server.next_client_id,client_id,1);
    c->id = client_id;	// 分配一个单调递增的客户端ID
    c->resp = 2;
    c->conn = conn;
    c->name = NULL;
    c->bufpos = 0;
    c->qb_pos = 0;
    c->querybuf = sdsempty();
    .....
    if (conn) linkClient(c); // 将client添加到server.clients链表尾
    .....
    return c;
}
linkClient
/* 这个函数将客户端链接到客户端的全局链表中。而 unlinkClient() 函数则执行相反的操作,此外它还会做其他一些事情。*/
void linkClient(client *c) {
    // 添加到全局链表 server.clients 的尾部
    listAddNodeTail(server.clients,c);
    /* 请注意,我们会记住存储客户端的链表节点,
     * 通过这种方式,在 unlinkClient() 函数中移除客户端时,将不需要进行线性扫描,
     * 而仅仅是一个时间复杂度为常量的操作。 */
    c->client_list_node = listLast(server.clients);
    // 反转client id的字节序,将转换后的id作为key,client对象作为value,保存到基数树server.clients_index。
    // 当后续需要通过 id 获取 client 对象时会(例如 CLIENT UNBLOCK 命令)从基数树中查询。
    uint64_t id = htonu64(c->id);
    raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
}

连接对象为 NULL 的 client 不会被添加到 server.clientsserver.clients_index

基数树可视化工具:https://www.cs.usfca.edu/~galles/visualization/RadixTree.html

设置连接的读处理器
connSetReadHandler(conn, readQueryFromClient);

刚建立的连接肯定首先需要监听它的读事件。

/* 注册一个读处理器,当连接可读时该处理器将被调用。如果传入的值为NULL,则会移除现有的处理器 */
static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    return conn->type->set_read_handler(conn, func);
}

TCP 连接的连接类型是 CT_Socket,其 set_read_handler 函数接口的实现是 connSocketSetReadHandler

/* 注册一个读处理器,当连接可读时该处理器将被调用。如果传入的值为NULL,则会移除现有的处理器 */
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    // 重复设置相同的读处理器
    if (func == conn->read_handler) return C_OK;

    // func = readQueryFromClient
    // 如果 func=NULL,意味着移除现有的读处理器
    conn->read_handler = func;	

    // 如果移除读处理器
    if (!conn->read_handler)
        // 取消对fd读事件的关注
        aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
    else
        // 添加到epoll中监听读事件
        // 注意:读事件的回调函数是connSocketEventHandler
        // 文件事件aeFileEvent的clientData字段指向connection对象
        // 为什么叫clientData而不是connData,因为redis6之前没有connection对象,只有client对象
        if (aeCreateFileEvent(server.el,conn->fd,
                    AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
    return C_OK;
}

注意:
读事件处理器是 connSocketEventHandler
连接的读处理器是 readQueryFromClient
事件循环中,首先触发的是读事件处理器 connSocketEventHandler,该函数再调用连接读处理器 readQueryFromClient

虽然已经加入 epoll 进行监听了,但此时,连接对象的状态还是 CONN_STATE_ACCEPTING,后面会将其改为 CONN_STATE_CONNECTED

connAccept()
connAccept(conn, clientAcceptHandler)
/* 连接模块不处理监听和接受套接字的操作,
 * 因此我们假设在创建传入连接时已经有一个套接字。
 *
 * 因此,提供的文件描述符 (fd) 应该与一个已经通过 accept() 接受的套接字相关联。
 *
 * connAccept() 可能会直接调用 accept_handler(),也可能会返回并在稍后调用它。
 * 这种行为有点不太常规,但目的是在不需要额外握手的情况下,减少等待下一个事件循环的需求。
 *
 * 重要提示:accept_handler 可能会决定关闭连接,调用 connClose()。
 * 为了确保操作安全,在这种情况下,连接只会被标记为 CONN_FLAG_CLOSE_SCHEDULED,
 * 并且 connAccept() 会返回一个错误。
 *
 * connAccept() 的调用者必须始终检查返回值,并且在出现错误 (C_ERR) 时必须调用 connClose()。
 */
static inline int connAccept(connection *conn, ConnectionCallbackFunc accept_handler) {
    return conn->type->accept(conn, accept_handler);
}

CT_Socketaccept 接口的实现是 connSocketAccept,位于 connection.c

static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) {
    int ret = C_OK;

    if (conn->state != CONN_STATE_ACCEPTING) return C_ERR;
    conn->state = CONN_STATE_CONNECTED;

    connIncrRefs(conn);	// 增加连接对象的引用计数
    // 调用 clientAcceptHandler
    if (!callHandler(conn, accept_handler)) ret = C_ERR;
    connDecrRefs(conn);	// 引用计数减1

    return ret;
}

注意:连接的状态设置为 CONN_STATE_CONNECTED

调用 clientAcceptHandler 函数,位于 networking.c

clientAcceptHandler 主要完成连接验证、安全策略检查及信息统计。

void clientAcceptHandler(connection *conn) {
    client *c = connGetPrivateData(conn);

    // 检查连接状态
    if (connGetState(conn) != CONN_STATE_CONNECTED) {
        // ..... 记录警告日志
        freeClientAsync(c);	// 异步释放客户端资源
        return;
    }

    /* 1. Redis处于保护模式(默认开启)。
     * 2. 未绑定特定网络接口(bindaddr_count == 0,意味着绑定所有接口)。
     * 3. 用户未设置密码(USER_FLAG_NOPASS)。
     * 4. 客户端不是通过 Unix Socket 连接通信(Unix Socket是本地通信,自然无风险)
     * 如果上面4个情况同时满足,需要Redis向用户说明如何解决该问题 */
    if (server.protected_mode &&
        server.bindaddr_count == 0 &&
        DefaultUser->flags & USER_FLAG_NOPASS &&
        !(c->flags & CLIENT_UNIX_SOCKET))
    {
        char cip[NET_IP_STR_LEN+1] = { 0 };
        // 调用connPeerToString获取客户端IP地址
        connPeerToString(conn, cip, sizeof(cip)-1, NULL);

        // 检查是否为本地回环地址(IPv4 是 127.0.0.1 或 IPv6 是 ::1)
        // 若IP非本地,向客户端发送错误信息并终止连接。
        if (strcmp(cip,"127.0.0.1") && strcmp(cip,"::1")) {
            char *err =
                "-DENIED Redis is running in protected mode because protected "
                .....; //一大串错误信息

            // 向非本地客户端发送错误信息
            if (connWrite(c->conn,err,strlen(err)) == -1) {
                /* Nothing to do, Just to avoid the warning... */
            }
            server.stat_rejected_conn++;	// 记录被拒绝的连接数,用于监控
            freeClientAsync(c);	// 异步释放客户端对象
            return;
        }
    }
    // 成功接收连接
    server.stat_numconnections++;	// 增加当前连接数,用于监控
    moduleFireServerEvent(REDISMODULE_EVENT_CLIENT_CHANGE,
                          REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED,
                          c);	// 通知已注册模块(如监控插件)有新的客户端连接
}

上面的一大串错误消息大致意思是:

拒绝访问:Redis 正在保护模式下运行,因为已启用保护模式,未指定绑定地址,并且未向客户端请求认证密码。在此模式下,仅接受来自环回接口的连接。

如果您想从外部计算机连接到 Redis,您可以采用以下解决方案之一:

  • 只需从环回接口(通过从运行 Redis 服务器的同一主机连接到 Redis)发送命令 “CONFIG SET protected-mode no” 来禁用保护模式。但是,如果您这样做,请确保 Redis 不会从互联网上被公开访问。使用 “CONFIG REWRITE” 使此更改永久生效。
  • 或者,您可以通过编辑 Redis 配置文件,将保护模式选项设置为 “no”,然后重新启动服务器来禁用保护模式。
  • 如果您只是为了测试而手动启动服务器,请使用 “–protected-mode no” 选项重新启动它。
  • 设置一个绑定地址或一个认证密码。

注意:为了使服务器开始接受来自外部的连接,您只需执行上述操作之一即可。

后话

由上面我们知道,所有的连接统一设置了相同读处理器 connSocketEventHandler 并加入了 epoll 中进行监听。当对端向 Redis 发送请求后,会触发 connSocketEventHandler 的调用。

connSocketEventHandler 开始,之后做了哪些事,在后面的文章中会讲解。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2341124.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【计算机视觉】CV实战项目- Face-and-Emotion-Recognition 人脸情绪识别

Face-and-Emotion-Recognition 项目详细介绍 项目概述项目功能项目目录结构项目运行方式1. 环境准备2. 数据准备3. 模型训练4. 模型运行 常见问题及解决方法1. **安装依赖问题**2. **数据集问题**3. **模型训练问题**4. **模型运行问题** 项目实战建议项目参考文献 项目概述 F…

基于国产 FPGA+ 龙芯2K1000处理器+翼辉国产操作系统继电保护装置测试装备解决方案

0 引言 近年来,我国自主可控芯片在国家政策和政 府的支持下发展迅速,并在电力、军工、机械、 通信、电子、医疗等领域掀起了国产化替代之 风,但在芯片自主可控和国产化替代方面还有明 显的不足之处。 2022年我国集成电路进口量多 达 5 3…

如何批量为多个 Word 文档添加水印保护

在日常办公中,Word文档添加水印是一项重要的操作,特别是在需要保护文件内容的安全性和版权时。虽然Office自带了添加水印的功能,但当需要一次性给多个Word文档添加水印时,手动操作显得非常繁琐且低效。为了提高效率,可…

长期行为序列建模技术演进:从SIM到TWIN-v2

背景 在推荐系统与广告投放领域,长期行为序列建模旨在从用户数月甚至数年的历史行为中捕捉稳定兴趣模式,是解决冷启动、提升推荐精度的关键。随着工业界需求激增,SIM、ETA、SDIM、TWIN及TWIN-v2等模型相继诞生,推动技术不断革新。…

Linux下 REEF3D及DIVEMesh 源码编译安装及使用

目录 软件介绍 基本依赖 一、源码下载 1、REEF3D 2、DIVEMesh 二、解压缩 三、编译安装 1、REEF3D 2、DIVEMesh 四、算例测试 软件介绍 REEF3D是一款开源流体动力学框架,提供计算流体力学及波浪模型。软件采用高效并行化设计,可以在大规模处理器…

嵌入式软件测试的革新:如何用深度集成工具破解效率与安全的双重困局?

在汽车电子、工业控制、航空航天等嵌入式开发领域,团队常面临一个看似无解的悖论:如何在保证代码安全性的前提下,大幅提升测试效率? 传统测试工具往往需要搭建独立环境、插入大量桩代码,甚至需要开发者手动编写测试用例…

Ubuntu24.04安装ROS2问题

1,根据官方指导安装,安装到步骤: sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key -o /usr/share/keyrings/ros-archive-keyring.gpg 时遇到问题。导致sudo apt update一直报错: 找了几天的资料…

【图问答】DeepSeek-VL 论文阅读笔记

《DeepSeek-VL: Towards Real-World Vision-Language Understanding》 1. 摘要/引言 基于图片问答(Visual Question Answering,VQA)的任务 2. 模型结构 和 三段式训练 1)使用 SigLIP 和 SAM 作为混合的vision encoder&#xf…

【专题刷题】滑动窗口(二):水果成篮,所有字母异位词,乘积小于 K 的子数组

📝前言说明: 本专栏主要记录本人的基础算法学习以及LeetCode刷题记录,按专题划分每题主要记录:(1)本人解法 本人屎山代码;(2)优质解法 优质代码;&#xff…

深入理解React中的Props与State:核心区别与最佳实践

在React开发中,props和state是构建交互式UI的两大基石。许多React初学者常常混淆这两者的概念,导致组件设计出现反模式。本文将全面剖析props与state的本质区别,通过实际场景说明它们的适用边界,并分享高效管理组件数据的实践经验…

STM32单片机入门学习——第46节: [14-1] WDG看门狗

写这个文章是用来学习的,记录一下我的学习过程。希望我能一直坚持下去,我只是一个小白,只是想好好学习,我知道这会很难,但我还是想去做! 本文写于:2025.04.23 STM32开发板学习——第46节: [14-1] WDG看门狗 前言开发板说明引用解答和科普一、…

n8n 中文系列教程_05.如何在本机部署/安装 n8n(详细图文教程)

n8n 是一款强大的开源工作流自动化工具,可帮助你连接各类应用与服务,实现自动化任务。如果你想快速体验 n8n 的功能,本机部署是最简单的方式。本教程将手把手指导你在 Windows 或 MacOS 上通过 Docker 轻松安装和运行 n8n,无需服务…

2025第十六届蓝桥杯python B组满分题解(详细)

目录 前言 A: 攻击次数 解题思路: 代码: B: 最长字符串 解题思路: 代码: C: LQ图形 解题思路: 代码: D: 最多次数 解题思路: 代码: E: A * B Problem 解题思路&…

Kafka 面试,java实战贴

面试问题列表 Kafka的ISR机制是什么?如何保证数据一致性? 如何实现Kafka的Exactly-Once语义? Kafka的Rebalance机制可能引发什么问题?如何优化? Kafka的Topic分区数如何合理设置? 如何设计Kafka的高可用跨…

linux多线(进)程编程——(9)信号量(一)

前言 在找到了共享内存存在的问题后,进程君父子着手开始解决这些问题。他们发明了一个新的神通——信号量。 信号量 信号量是一个计数器,用于管理对共享资源的访问权限。主要特点包括: (1)是一个非负整数 &#xff…

PFLM: Privacy-preserving federated learning with membership proof证明阅读

系列文章目录 提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 例如:第一章 Python 机器学习入门之pandas的使用 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目…

图片转base64 - 加菲工具 - 在线转换

图片转base64 - 加菲工具 先进入“加菲工具” 网 打开 https://www.orcc.top, 选择 “图片转base64”功能 选择需要转换的图片 复制 点击“复制”按钮,即可复制转换好的base64编码数据,可以直接用于img标签。

opencv 对图片的操作

对图片的操作 1.图片镜像旋转(cv2.flip())2 图像的矫正 1.图片镜像旋转(cv2.flip()) 图像的旋转是围绕一个特定点进行的,而图像的镜像旋转则是围绕坐标轴进行的。图像的镜像旋转分为水平翻转、垂直翻转、水平垂直翻转…

LabVIEW数据采集与传感系统

开发了一个基于LabVIEW的智能数据采集系统,该系统主要通过单片机与LabVIEW软件协同工作,实现对多通道低频传感器信号的有效采集、处理与显示。系统的设计旨在提高数据采集的准确性和效率,适用于各种需要高精度和低成本解决方案的工业场合。 项…

【Easylive】​​Gateway模块 bootstrap.yml 解析

【Easylive】项目常见问题解答(自用&持续更新中…) 汇总版 Gateway模块 bootstrap.yml 常规解析 该配置文件定义了 Spring Cloud Gateway 的核心配置,包括 环境配置、服务注册、动态路由规则 等。以下是逐项解析: 1. 基础配…