Redis中的Reactor模型和执行命令源码探索

news2024/12/23 17:23:27

文章目录

  • 摘要
      • 1、 了解Linux的epoll
      • 2、了解Reactor模型
  • 一、Redis初始化
      • 1.1、配置初始化
      • 1.2、服务初始化
      • 1.3、网络监听初始化
      • 1.4、Reactor线程池初始化
      • 1.5、Reactor事件主循环
  • 二、Reactor
      • 2.1、Reactor事件处理器
      • 2.2、读事件
          • 2.2.1、第一次读事件(accept)
          • 2.2.2、第二次读事件(read)
      • 2.3、读任务分配
      • 2.4、读任务处理
      • 2.5、命令执行
          • 2.5.1、以get命令为例
          • 2.5.2、执行命令
      • 2.6、写任务分配
      • 2.7、写回客户端
  • 三、总结
      • 3.1、Redis的Reactor模型
      • 3.2、命令执行过程

摘要

有时候在面试的时候会被问到Redis为什么那么快?有一点就是客户端请求和应答是基于I/O多路复用(比如linux的epoll)的Reactor模型,今天我们就基于这个点顺着Redis的源码进行探索。

PS:本文基于Redis7.2.0源码

建议阅读本文之前熟悉Linux的epoll和Reactor模型相关知识

1、 了解Linux的epoll

在UNIX网络编程这本书中对socket有清晰的描述,server端的可用系统函数有socket、bind、listen、accept、read、write、close,其中accept、read、write依次是socket的接受完成三次握手的连接,读数据,写数据操作。

最基本的socket模型是阻塞模型,随着技术的演进,又有了非阻塞,I/O多路复用,信号驱动、异步五种I/O模型。其中I/O多路复用模型是用的最广的,而其在linux下又分为select,poll,epoll三种,性能依次提升,Redis在linux下的网络就是基于epoll的。
在这里插入图片描述

2、了解Reactor模型

Reactor模型在网络编程中非常有名,比如Java的Netty,优点是,相比多线(进)程在相同资源条件下可以同时处理更多的连接,核心就是对socket按事件类型(接受连接、读、写)进行拆分处理,令每个socket都有被执行的机会。Reactor模型分为单Reactor单线程、单Reactor多线程、多Reactor(一主多从)多线程三种类型,如下图:
reactor三种类型

我们知道Redis在V6.0之前都是单Reactor单线程类型,那V6.0改造之后呢???,跟着下面的源码探索吧

一、Redis初始化

我们知道 c 语言入口函数一般都是 main 函数,所以我们先从入口函数下手,Redis server端的入口函数在server.c文件中。

int main(int argc, char **argv) {
  //配置初始化,配置文件,redis命令注册之类的
  initServerConfig();
   //初始化服务,
	initServer();
	//初始化网络listen
	initListeners();
	//Some steps in server initialization need to be done last (after modules are loaded)
	//特别是线程池的初始化
	InitServerLast();
	//开始主线程循环
	aeMain(server.el);
  aeDeleteEventLoop(server.el);
}

1.1、配置初始化

该函数主要是做配置初始化默认值和Redis命令注册

void initServerConfig(void) {
	...
	initConfigValues();
	...
	/* Command table -- we initialize it here as it is part of the
     * initial configuration, since command names may be changed via
     * redis.conf using the rename-command directive. */
  server.commands = dictCreate(&commandTableDictType);
  server.orig_commands = dictCreate(&commandTableDictType);
  populateCommandTable();
	...
}
  1. 命令注册,其中redisCommandTable变量 位于commands.c文件中,枚举了Redis命令,比如set、get。遍历redisCommandTable注册到server.commands
void populateCommandTable(void) {
    int j;
    struct redisCommand *c;

    for (j = 0;; j++) {
        c = redisCommandTable + j;
        if (c->declared_name == NULL)//没有了
            break;

        int retval1, retval2;

        c->fullname = sdsnew(c->declared_name);
        if (populateCommandStructure(c) == C_ERR)
            continue;

        retval1 = dictAdd(server.commands, sdsdup(c->fullname), c);//命令注册
        /* Populate an additional dictionary that will be unaffected by rename-command statements in redis.conf. */
        retval2 = dictAdd(server.orig_commands, sdsdup(c->fullname), c);
        serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
    }
}

1.2、服务初始化

initServer时初始化服务起点。

void initServer(void) {
   //初始化server这个全局变量
	/* Initialization after setting defaults from the config system. */
   server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;
   //基于配置文件配置的最大连接数创建事件循环器
   server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
   server.clients_pending_write = listCreate();//全局写任务链表,用户分配给Reactor线程池
   server.clients_pending_read = listCreate();//全局读任务链表

   .....
    //创建定时任务管理器(比如key过期,客户端超时等),并设置回调函数为serverCron
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
    /* Register a readable event for the pipe used to awake the event loop
     * from module threads. */
    if (aeCreateFileEvent(server.el, server.module_pipe[0], AE_READABLE,
        modulePipeReadable,NULL) == AE_ERR) {
            serverPanic(
                "Error registering the readable event for the module pipe.");
    }

    /* Register before and after sleep handlers (note this needs to be done
     * before loading persistence since it is used by processEventsWhileBlocked. */
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
}

1.3、网络监听初始化

这里实现了socket的系统函数的scoket,bind,listen三步调用。

根据配置有三种选择TCP/TLS/Unix */,三者都实现了ConnectionType结构体的定义,位于connection.h文件中,在connection.c文件的connTypeInitialize函数会注册这三种类型(CT_Socket,CT_TLS,CT_Unix)

当在配置文件配置了port(比如6379)时,就选用CT_Socket。

void initListeners() {
 /* Setup listeners from server config for TCP/TLS/Unix */
    int conn_index;
    connListener *listener;
    if (server.port != 0) {//listen for TCP,
        conn_index = connectionIndexByType(CONN_TYPE_SOCKET);
        if (conn_index < 0)
            serverPanic("Failed finding connection listener of %s", CONN_TYPE_SOCKET);
        listener = &server.listeners[conn_index];
        listener->bindaddr = server.bindaddr;//我们在配置文件配置bind关键字时,可以配置多个,因此是个数组
        listener->bindaddr_count = server.bindaddr_count;
        listener->port = server.port;
        listener->ct = connectionByType(CONN_TYPE_SOCKET);//获取CT_Socket
    }
    ......
     /* create all the configured listener, and add handler to start to accept */
    int listen_fds = 0;
    for (int j = 0; j < CONN_TYPE_MAX; j++) {
        listener = &server.listeners[j];
        if (listener->ct == NULL)
            continue;

        if (connListen(listener) == C_ERR) {//该函数会调用CT_Socket的listen函数,最后完成scoket,bind,listen的系统调用,并设置为非阻塞I/O
        //在CT_Socket下调用链路为connListen=>(socket.c文件)connSocketListen=>listenToPort=>anetTcpServer(完成
        //scoket,bind,listen的系统调用)  ,anetNonBlock(设置为非阻塞I/O)
            serverLog(LL_WARNING, "Failed listening on port %u (%s), aborting.", listener->port, listener->ct->get_type(NULL));
            exit(1);
        }

        if (createSocketAcceptHandler(listener, connAcceptHandler(listener->ct)) != C_OK)
            serverPanic("Unrecoverable error creating %s listener accept handler.", listener->ct->get_type(NULL));
       listen_fds += listener->count;
    }
    ....
}

本函数将socket设置为可读状态,并绑定CT_Socket的accept_handler函数,即socket.c的connSocketAcceptHandler函数,用于处理客户端的连接建立

int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler) {
    int j;

    for (j = 0; j < sfd->count; j++) {
        if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,sfd) == AE_ERR) {
            /* Rollback */
            for (j = j-1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);
            return C_ERR;
        }
    }
    return C_OK;
}

1.4、Reactor线程池初始化

这里主要看一下Reactor线程池的初始化,要想启动线程池,需要最好参数配置:

io-threads n #worker线程个数
io-threads-do-reads yes #worker线程是否做read任务

void InitServerLast() {
    bioInit();
    initThreadedIO();
    set_jemalloc_bg_thread(server.jemalloc_bg_thread);
    server.initial_memory_usage = zmalloc_used_memory();
}
//线程池初始化
void initThreadedIO(void) {
	...
	 for (int i = 0; i < server.io_threads_num; i++) {
        /* Things we do for all the threads including the main thread. */
        io_threads_list[i] = listCreate();//
        if (i == 0) continue;// i=0是位置留给主线程,所以跳过

        /* Things we do only for the additional threads. */
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        setIOPendingCount(i, 0);
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}

线程执行函数IOThreadMain,其主要就是调用 readQueryFromClient处理读任务,调用writeToClient处理写任务

void *IOThreadMain(void *myid) {
	while(1) {
        for (int j = 0; j < 1000000; j++) {//循环1000000次,去等待I/O事件
            if (getIOPendingCount(id) != 0) break;
        }
        /* Give the main thread a chance to stop this thread. */
        if (getIOPendingCount(id) == 0) {//如果主线程已获取了当前线程锁,就会进入等待状态,直至被主线程解锁唤醒,否则继续循环1000000次去
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }
        serverAssert(getIOPendingCount(id) != 0);
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {//写
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {//读
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        setIOPendingCount(id, 0);
  }
}

1.5、Reactor事件主循环

本函数用于进入主线程事件循环逻辑

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

本章中很多ae开头的函数,比如aeMain、aeCreateFileEvent、aeProcessEvents等,这些函数位于ae.c文件,是Redis事件管理器的具体实现,具体见下一章。

二、Reactor

在上一章中知道了通过aeMain开启了事件循环,那么先看一下aeProcessEvents函数。
在聊aeProcessEvents函数之前我们先看下Redis如何根据系统选择操作系统提供的I/O库的。
在ae.c文件开头可以看到如下代码:

/* Include the best multiplexing layer supported by this system.
 * The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c" //linux下选择epoll
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

2.1、Reactor事件处理器

Reactor事件处理位于ae.c文件,是一个对不同系统网络库(linux的select,epoll 。mac的kqueue)的抽象管理层,主要函数如下:

aeCreateEventLoop #创建aeEventLoop对象
aeMain #启动时事件循环
aeProcessEvents #事件循环处理逻辑
aeCreateFileEvent #创建网络IO事件
aeCreateTimeEvent #创建定时任务事件
当然还有很多,就不一一列举了,在ae.c文件中

就那比较核心的aeProcessEvents 函数看一下吧

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
	...
	//执行initServer函数中注册的beforesleep回调函数
  if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);
    /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
  numevents = aeApiPoll(eventLoop, tvp);//获取socket事件,如果是epoll则是执行epoll_wait获取有事件的socket
  /* Don't process file events if not requested. */
 	if (!(flags & AE_FILE_EVENTS)) {
 			numevents = 0;
 	}
 	//执行initServer函数中注册的aftersleep 回调函数
 	if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
      eventLoop->aftersleep(eventLoop);    
        
	...
	for (j = 0; j < numevents; j++) {
	  //读事件
	  //在上一章的createSocketAcceptHandler已经了解到socket listen后第一次绑定了socket.c的connSocketAcceptHandler
	  //那么如果是accept事件就会调用connSocketAcceptHandler,继而调用networking.c的createClient
		if (!invert && fe->mask & mask & AE_READABLE) {
         fe->rfileProc(eventLoop,fd,fe->clientData,mask);
         fired++;
         fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
     }
		 //写事件
     if (fe->mask & mask & AE_WRITABLE) {
           if (!fired || fe->wfileProc != fe->rfileProc) {
                   fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                   fired++;
           }
     }
     /* If we have to invert the call, fire the readable event now  after the writable one. */
     if (invert) {
         fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
          if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc))
          {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
           }
      }
	}
}

2.2、读事件

在了解读事件前先看下连接管理对象和客户端管理对象

//连接管理对象
struct connection {
    ConnectionType *type;
    ConnectionState state;
    short int flags;
    short int refs;
    int last_errno;
    void *private_data;
    ConnectionCallbackFunc conn_handler;
    ConnectionCallbackFunc write_handler;
    ConnectionCallbackFunc read_handler;
    int fd;
};
//客户端管理对象
typedef struct client {
	...
	connection *conn; //连接
	sds querybuf;           /* Buffer we use to accumulate client querieske 每个客户端的读缓冲区*/
	/*每个客户端写缓冲区相关变量 */
	list *reply;            /* List of reply objects to send to the client. 要写到客户端的数据,数据量比较大用这个,否则用buf*/
	/* Response buffer */
  size_t buf_peak; /* Peak used size of buffer in last 5 sec interval. */
  mstime_t buf_peak_last_reset_time; /* keeps the last time the buffer peak value was reset */
  int bufpos;
  size_t buf_usable_size; /* Usable size of buffer. */
  char *buf;
	...
}
2.2.1、第一次读事件(accept)
  1. socket的accept和read都是读事件。所以在CT_Socket下,客户端第一次建立连接时先调用socket.c的connSocketAcceptHandler
    进行accept系统函数调用,所以这第一次读事件就是连接应答,即接收客户到的连接并进行相应的处理
static void connSocketAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);//调用系统函数accept,并设置socket为非阻塞的
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,"Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(connCreateAcceptedSocket(cfd, NULL),0,cip);//connCreateAcceptedSocket函数创建conn对象并返回conn对象
    }
}

//在networking.c文件
void acceptCommonHandler(connection *conn, int flags, char *ip) {
		...
		/* Create connection and client */
    if ((c = createClient(conn)) == NULL) {//基于连接conn创建客户端管理对象
        char addr[NET_ADDR_STR_LEN] = {0};
        char laddr[NET_ADDR_STR_LEN] = {0};
        connFormatAddr(conn, addr, sizeof(addr), 1);
        connFormatAddr(conn, laddr, sizeof(addr), 0);
        serverLog(LL_WARNING,"Error registering fd event for the new client connection: %s (addr=%s laddr=%s)",
                  connGetLastError(conn), addr, laddr);
        connClose(conn); /* May be already closed, just ignore errors */
        return;
    }
		...
}
  1. 创建client对象,并通过调用CT_Socket的set_read_handler函数(connSocketSetReadHandler)将conn的read_handler设置为readQueryFromClient函数,通过aeCreateFileEvent将fe的读回调函数设为CT_Socket的ae_handler函数,等待下次读事件调用。至此第一次读事件(accept)处理就结束了
client *createClient(connection *conn) {
	client *c = zmalloc(sizeof(client));//创建client对象
	...
	if (conn) {
        connEnableTcpNoDelay(conn);
        if (server.tcpkeepalive)
            connKeepAlive(conn,server.tcpkeepalive);
        connSetReadHandler(conn, readQueryFromClient);//重点。调用CT_Socket的set_read_handler函数,就socket.c文件的connSocketSetReadHandler
        connSetPrivateData(conn, c);
    }
	...
	c->conn = conn;//赋值
	if (conn) linkClient(c);//将该client对象加入client全局管理链表,即server.clients
	return c;
}

static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    if (func == conn->read_handler) return C_OK;
    conn->read_handler = func;//设置conn的read_handler回调函数为readQueryFromClient
    if (!conn->read_handler)
        aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
    else
        //创建AE_READABLE事件,并绑定读回调函数为CT_Socket的ae_handler函数,等待aeProcessEvents循环触发进行回调
        if (aeCreateFileEvent(server.el,conn->fd, AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) 
        		return C_ERR;
    return C_OK;
}
2.2.2、第二次读事件(read)

对于一个客户端,在上一小节,看到了最后创建了一个读事件,并绑定读回调函数为CT_Socket的ae_handler函数,那么下一次主线程循环调用aeProcessEvents,就会调用ae_handler函数。下面看看ae_handler函数,即scoket.c文件的connSocketEventHandler干了什么。

ps:Redis客户端一般是长连接,经过第一次读事件(accept)后,第二~N次读事件都是read,直至某一方close。

static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
{
		......
		int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;
    int call_write = (mask & AE_WRITABLE) && conn->write_handler;
    int call_read = (mask & AE_READABLE) && conn->read_handler;
		/* Handle normal I/O flows */
    if (!invert && call_read) {
        if (!callHandler(conn, conn->read_handler)) return;//可以看到会调conn->read_handler函数,即上一节connSocketSetReadHandler中绑定的readQueryFromClient
    }
    /* Fire the writable event. */
    if (call_write) {
        if (!callHandler(conn, conn->write_handler)) return;
    }
    /* If we have to invert the call, fire the readable event now
     * after the writable one. */
    if (invert && call_read) {
        if (!callHandler(conn, conn->read_handler)) return;
    }
}
  1. 读取数据

readQueryFromClient 函数就是读取客户端数据并解析得到要执行的命令,这个函数是很核心的,我们逐步分析下

void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, big_arg = 0;
    size_t qblen, readlen;

    //如果开启了线程池,主线程不处理直接返回,等待在事件轮询时分配给worker线程池
    if (postponeClientRead(c)) return;

    /* Update total number of reads on server */
    atomicIncr(server.stat_total_reads_processed, 1);//原子性的+1
   ...//省略代码
    nread = connRead(c->conn, c->querybuf+qblen, readlen);//调用系统函数read读取socket数据,并放到当前client对象querybuf缓冲区,这时一个sds(简单动态字符串)类型
    ...//省略代码
    if (processInputBuffer(c) == C_ERR)//解析数据得到命令并执行,该函数细节后续分析,这里知道作用就好了
         c = NULL;
done:
    beforeNextClient(c);
}
//
int postponeClientRead(client *c) {
    if (server.io_threads_active && //线程池激活状态
        server.io_threads_do_reads &&//配置允许线程池处理read任务
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) &&
        io_threads_op == IO_THREADS_OP_IDLE)//线程池空闲状态
    {
        listAddNodeHead(server.clients_pending_read,c);//将当前client对象加入到全局等待读取的链表中,等待主线程分配
        c->pending_read_list_node = listFirst(server.clients_pending_read);
        return 1;
    } else {
        return 0;
    }
}

2.3、读任务分配

上面小节的任务一直是主线程在做,可以看到在开启线程池的情况下,读任务只是插入到全局等待读链表server.clients_pending_read的头部,下面咱们就看看这些读任务是如何分配的吧。
细心的读者可能还记得在第一章第2节服务初始化源代码末尾注册的两个sleep函数

aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);

在第二章第1节Reactor网络事件处理中,源代码中就会调用两个sleep函数,如下:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
		...
		if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);
		...//省略代码
		if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
      eventLoop->aftersleep(eventLoop);    
		...
}

没错,分配工作就在beforeSleep函数中处理的:

void beforeSleep(struct aeEventLoop *eventLoop) {
		...
		 if (ProcessingEventsWhileBlocked) {
        uint64_t processed = 0;
        processed += handleClientsWithPendingReadsUsingThreads();//处理server.clients_pending_read中的读任务
        processed += connTypeProcessPendingData();
        if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE)
            flushAppendOnlyFile(0);
        processed += handleClientsWithPendingWrites();//处理server.clients_pending_write中的写任务
        processed += freeClientsInAsyncFreeQueue();
        server.events_processed_while_blocked += processed;
        return;
    }
		...
}

在这里你可以看到多线程下Redis只是把从socket读取数据和解析命令交给线程池处理,真正的执行命令依然由主线程进行处理

int handleClientsWithPendingReadsUsingThreads(void) {
    if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;

    // 遍历待读取的 client 队列 clients_pending_read,
    // 通过 RR 轮询均匀地分配给 I/O 线程和主线程自己(编号 0)。
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    // 设置当前 I/O 状态为读取,给线程池中每个线程的计数器设置分配的任务数量,
    // 让 线程池可以开始工作:只读取和解析命令,不执行。
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    }

    // 主线程自己也会去执行读取客户端数据,再解析,最后执行请求命令的任务,以达到最大限度利用 CPU。
    //此时io_threads_op为IO_THREADS_OP_READ,可以避开第2.2.2节postponeClientRead函数的check
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);

    // 主线程进入忙轮询,累加线程池中每个线程的原子任务计数器,直到所有计数器的遗留任务数量都是 0,才结束忙轮询。
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    //线程池任务处理完,设置当前 I/O 状态为空闲
  	io_threads_op = IO_THREADS_OP_IDLE;
    // 遍历待读取的 client 队列,清除 CLIENT_PENDING_READ 和 CLIENT_PENDING_COMMAND 标记, 然后解析并执行所有 client 的命令。
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        listDelNode(server.clients_pending_read,ln);
        c->pending_read_list_node = NULL;

        serverAssert(!(c->flags & CLIENT_BLOCKED));

        if (beforeNextClient(c) == C_ERR) {//无效client跳过
            /* If the client is no longer valid, we avoid processing the client later. So we just goto the next. */
            continue;
        }

        /* Once io-threads are idle we can update the client in the mem usage */
        updateClientMemUsageAndBucket(c);

        if (processPendingCommandAndInputBuffer(c) == C_ERR) {//执行命令
            /* If the client is no longer valid, we avoid processing the client later. So we just go to the next. */
            continue;
        }
        /* We may have pending replies if a thread readQueryFromClient() produced replies and did not put the client in pending write queue (it can't)*/
        if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
            putClientInPendingWriteQueue(c);
     }
    /* Update processed count on server */
    server.stat_io_reads_processed += processed;
    return processed;
}
int processPendingCommandAndInputBuffer(client *c) {
    //如果有CLIENT_PENDING_COMMAND 标记,直接执行命令,这种因为已经由worker线程将数据解析好了
    if (c->flags & CLIENT_PENDING_COMMAND) {
        c->flags &= ~CLIENT_PENDING_COMMAND;//清除CLIENT_PENDING_COMMAND 标记
        if (processCommandAndResetClient(c) == C_ERR) {
            return C_ERR;
        }
    }
    //client对象还有剩余的数据
    if (c->querybuf && sdslen(c->querybuf) > 0) {
        return processInputBuffer(c); //解析数据并执行命令,下一小节会有详细分析
    }
    return C_OK;
}

2.4、读任务处理

上一节看到读任务分配了,分配之后就要处理了,那在哪里处理呢?

细心的读者应该还记得第一章第4节Reactor线程池初始化中的线程执行函数IOThreadMain,其核心就是调用
readQueryFromClient处理读任务,调用writeToClient处理写任务。

看到readQueryFromClient函数,发现又回到【本章第2节读事件】了,该函数最后会调用一个processInputBuffer函数,下面来看一看吧。

该函数主要作用就是通过processInlineBuffer和processMultibulkBuffer解析命令,解析之后如果是worker线程在执行就退出,如果是主线程就会调processCommandAndResetClient执行命令

int processInputBuffer(client *c) {
	 while(c->qb_pos < sdslen(c->querybuf)) {
	  ...//省略代码
	 	if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;//解析数据
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break;//解析数据
        } else {
            serverPanic("Unknown request type");
        }

        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* If we are in the context of an I/O thread, we can't really
             * execute the command here. All we can do is to flag the client
             * as one that needs to process the command. */
            if (io_threads_op != IO_THREADS_OP_IDLE) {//如果是worker线程在执行,就直接结束循环
                serverAssert(io_threads_op == IO_THREADS_OP_READ);
                c->flags |= CLIENT_PENDING_COMMAND;
                break;
            }

            //根据前面解析的数据得到命令并执行命令
            if (processCommandAndResetClient(c) == C_ERR) {
                /* If the client is no longer valid, we avoid exiting this
                 * loop and trimming the client buffer later. So we return
                 * ASAP in that case. */
                return C_ERR;
            }
        }
	 }
	 ...//省略代码
}

2.5、命令执行

如果是主线程调用processInputBuffer函数就会走到processCommandAndResetClient去执行命令,该函数再调用processCommand函数。

注意:该函数一定是在主线程下执行的

细心的读者应该还记得第一章第1节配置初始化中Redis在全局变量server.command中,那么我们先看一下get命令吧

2.5.1、以get命令为例

先看一下redisCommandTable变量。

struct redisCommand {
	...
	redisCommandProc *proc; /* 回调函数 */
	...
}
struct redisCommand redisCommandTable[] = {
	...
	{"get","Get the value of a key","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STRING,GET_History,GET_tips,getCommand,2,CMD_READONLY|CMD_FAST,ACL_CATEGORY_STRING,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=GET_Args},
	...
}

参数很多,但我们只看执行命令的回调函数getCommand就可以了,在t_strings.c文件中

void getCommand(client *c) {
    getGenericCommand(c);
}
int getGenericCommand(client *c) {
    robj *o;

    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL)//获取数据
        return C_OK;

    if (checkType(c,o,OBJ_STRING)) {
        return C_ERR;
    }

    addReplyBulk(c,o);//将获取的数据响应给客户端
    return C_OK;
}

那如何将获取的数据响应给客户端呢?

/* Add a Redis Object as a bulk reply */
void addReplyBulk(client *c, robj *obj) {
    addReplyBulkLen(c,obj);//添加数据长度到client对象的写缓冲区变量,写缓冲区变量见【本章第2节读事件】
    addReply(c,obj);//添加数据到client的reply链表和buf缓冲区
    addReplyProto(c,"\r\n",2);//添加数据结束标志到client对象的写缓冲区变量
}
/* Add the object 'obj' string representation to the client output buffer. */
void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;
		//将本次的响应内容添加到client的reply链表和buf缓冲区,等待写回客户端
		if (sdsEncodedObject(obj)) {
        _addReplyToBufferOrList(c,obj->ptr,sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        /* For integer encoded strings we just convert it into a string
         * using our optimized function, and attach the resulting string
         * to the output buffer. */
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        _addReplyToBufferOrList(c,buf,len);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}
int prepareClientToWrite(client *c) {
	...
	if (!clientHasPendingReplies(c) && io_threads_op == IO_THREADS_OP_IDLE)
        putClientInPendingWriteQueue(c);//
	...
}

可以看到最后将client对象添加到server.clients_pending_write等待写链表上,和等待写链表一样,等主线程将其分配给worker线程。

void putClientInPendingWriteQueue(client *c) {
    /* Schedule the client to write the output buffers to the socket only
     * if not already done and, for slaves, if the slave can actually receive
     * writes at this stage. */
    if (!(c->flags & CLIENT_PENDING_WRITE) &&
        (c->replstate == REPL_STATE_NONE ||
         (c->replstate == SLAVE_STATE_ONLINE && !c->repl_start_cmd_stream_on_ack)))
    {
        c->flags |= CLIENT_PENDING_WRITE;
        listLinkNodeHead(server.clients_pending_write, &c->clients_pending_write_node);
    }
}
2.5.2、执行命令
int processCommandAndResetClient(client *c) {
    int deadclient = 0;
    client *old_client = server.current_client;
    server.current_client = c;
    if (processCommand(c) == C_OK) {
        commandProcessed(c);
        /* Update the client's memory to include output buffer growth following the processed command. */
        updateClientMemUsageAndBucket(c);
    }
    if (server.current_client == NULL) deadclient = 1;
    server.current_client = old_client;
    return deadclient ? C_ERR : C_OK;
}
int processCommand(client *c) {
	if (!client_reprocessing_command) {
        c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv,c->argc);//到server.commands全局变量中查找命令
        ....
  }   
  /* Exec the command */
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand &&
        c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand &&
        c->cmd->proc != watchCommand &&
        c->cmd->proc != quitCommand &&
        c->cmd->proc != resetCommand)
    {
        queueMultiCommand(c, cmd_flags);
        addReply(c,shared.queued);
    } else {
        int flags = CMD_CALL_FULL;
        if (client_reprocessing_command) flags |= CMD_CALL_REPROCESSING;
        call(c,flags);//调用查找到的命令
        if (listLength(server.ready_keys))
            handleClientsBlockedOnKeys();
    }

    return C_OK;   
}
void call(client *c, int flags) {
//省略代码
	c->cmd->proc(c);
}

可以看到如果客户端发了get命令,那么call本质就去执行getCommand函数,获取执行结果后添加到client的写缓冲区,最后将client对象添加到server.clients_pending_write待写链表中

2.6、写任务分配

在【本章第3节读任务分配】中,beforeSleep函数在分配完读任务后,接着就分配写任务了,即调用handleClientsWithPendingWritesUsingThreads函数。

int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; /* Return ASAP if there are no clients. */

    //没开启线程池或等待处理的写任务数量较少,就只有主线程处理
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }

    /* Start threads if needed. */
    if (!server.io_threads_active) startThreadedIO();

    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;

        /* Remove clients from the list of pending writes since
         * they are going to be closed ASAP. */
        if (c->flags & CLIENT_CLOSE_ASAP) {
            listUnlinkNode(server.clients_pending_write, ln);
            continue;
        }

        /* Since all replicas and replication backlog use global replication
         * buffer, to guarantee data accessing thread safe, we must put all
         * replicas client into io_threads_list[0] i.e. main thread handles
         * sending the output buffer of all replicas. */
        if (getClientType(c) == CLIENT_TYPE_SLAVE) {//如果是slave发来的命令,只分配给主线程
            listAddNodeTail(io_threads_list[0],c);
            continue;
        }

        int target_id = item_id % server.io_threads_num;//RR分配策略
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    //设置每个worker线程的任务数量,以唤醒worker线程工作
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    }

   // 主线程自己也会去执行写数据到客户端的任务,以达到最大限度利用 CPU。
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);

    //主线程进入忙轮询,等待所有worker线程执行完任务
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }

    io_threads_op = IO_THREADS_OP_IDLE;//设置I/O状态位空闲

    // 最后再遍历一次 clients_pending_write 队列,检查是否还有 client 的写出缓冲区中有残留数据,
    // 如果有,那就为 client 注册一个命令回复器 sendReplyToClient,等待客户端写就绪再继续把数据回写。
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        /* Update the client in the mem usage after we're done processing it in the io-threads */
        updateClientMemUsageAndBucket(c);

        /* Install the write handler if there are pending writes in some of the clients. */
        if (clientHasPendingReplies(c)) {
            installClientWriteHandler(c);//为 client 注册一个命令回复器 sendReplyToClient
        }
    }
    while(listLength(server.clients_pending_write) > 0) {
        listUnlinkNode(server.clients_pending_write, server.clients_pending_write->head);
    }

    /* Update processed count on server */
    server.stat_io_writes_processed += processed;

    return processed;
}
//这个命令回复器sendReplyToClient在什么时候调用呢?connSetWriteHandlerWithBarrier会调用CT_Socket的set_write_handler函数,设置当前连
//接对象conn的写回调函数为sendReplyToClient,并通过aeCreateFileEvent设置Reactor事务对象fe的写回调函数为CT_Socket的ae_handler函数
//等下次触发socket的写事件,会调用ae_handler函数,ae_handler函数再调conn的写回调函数sendReplyToClient,进而调writeToClient去把数据回写,真饶昂
void installClientWriteHandler(client *c) {
   //省略代码
    if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
        freeClientAsync(c);
    }
}

2.7、写回客户端

这里就比较简单了,上一节介绍最终通过writeToClient写数据到客户端,该函数调用链如下:

writeToClient
==> _writeToClient
====> _writevToClient
======> connWritev
========>conn->type->writev
==========>CT_Socket的writev函数,即connSocketWritev
============>writev(系统函数)

这里我们多说一下connSocketWritev,看看其对writev错误的处理。

static int connSocketWritev(connection *conn, const struct iovec *iov, int iovcnt) {
    int ret = writev(conn->fd, iov, iovcnt);
    if (ret < 0 && errno != EAGAIN) {
        conn->last_errno = errno;
        /* Don't overwrite the state of a connection that is not alreadyconnected, not to mess with handler callbacks*/
        if (errno != EINTR && conn->state == CONN_STATE_CONNECTED)
            conn->state = CONN_STATE_ERROR;
    }
    return ret;
}

可以看到ret<0 && errno != EAGAIN 情况下才判错,其中errno != EAGAIN 怎么理解呢?
我们在本章【第一次读事件(accept)】小节中说了在accept后会将socket设置为nonblock,在nonblock下如何socket写缓冲区不足就会赋给全局变量errno以EAGAIN,告诉程序我满了,你等我通知你有空间了(有空间会触发写事件)再发。

ps:全局变量errno是线程安全的。
read/write在nonblock下的行为

所以说可能有一次写不完的情况,那咋办呢?细心的读者肯定记得在【本章第6节写任务分配】源码中有 :

最后再遍历一次 clients_pending_write 队列,检查是否还有 client 的写出缓冲区中有残留数据,
如果有,那就为 client 注册一个命令回复器 sendReplyToClient,等待客户端写就绪再继续把数据回写。

就是说等下次socket触发写事件会将剩余的数据写回客户端。

三、总结

其实按部就班读下来,对Redis的Reactor模型和命令执行过程一定有了深刻的认知,这里在总结下这两点。

3.1、Redis的Reactor模型

ps:一般业务处理包含三步:decode、compute、encode
在看源码过程中一定注意到了这两点:

  1. 只有一个线程池,其即处理read,write,也处理一部分业务,即decode(解析client数据),见2.2.2和2.4小节;
  2. compute(执行命令)、encode(生成响应client的数据)由Reactor主线程处理,见2.5小节。
    那么我们就可以回答第一章第二节的问题了,即V6.0之后Redis使用的Reactor模型是那种?
    【主从reactor多线程的变形】

如下图:
redis的reactor
相比经典的主从Reactor多线程差距一目了然:
1)从rector和worker的线程池是同一个,这个倒是影响不大;
2)handler的任务做了重新安排,特别是业务处理的compute、encode由Reactor主线程处理,这个是迥异与经典主从Reactor多线程的。

3.2、命令执行过程

Redis函数调用流程图大概如下图:
Redis函数调用流程图

  1. Redis 服务器启动,注册 connAcceptHandler 连接应答处理器到用户配置的监听端口对应的文件描述符,等待新连接到来,最后通过aeMain开启主线程事件循环(Event Loop);
  2. 客户端和服务端建立网络连接;
  3. acceptTcpHandler 被调用,主线程使用 aeCreateFileEvent 将 readQueryFromClient 命令读取处理器绑定到新连接对应的文件描述符上,并初始化一个 client 绑定这个客户端连接;
  4. 客户端发送请求命令,触发读就绪事件,主线程不会通过 socket 去读取客户端的请求命令,而是先将 client 放入一个 LIFO 队列 server.clients_pending_read;
  5. 在事件循环(Event Loop)中,主线程执行 beforeSleep -->handleClientsWithPendingReadsUsingThreads,利用 Round-Robin 轮询负载均衡策略,把 server.clients_pending_read队列中的连接均匀地分配给 worker线程各自的本地 FIFO 任务队列 io_threads_list[id] 和主线程自己,worker线程通过 socket 读取客户端的请求命令,存入 client->querybuf 并解析第一个命令,但不执行命令,主线程忙轮询,等待所有 I/O 线程完成读取任务;
  6. 主线程和所有worker 线程都完成了读取任务,主线程结束忙轮询,遍历 server.clients_pending_read 队列,执行所有客户端连接的请求命令,先调用 processCommandAndResetClient 执行第一条已经解析好的命令,然后调用 processInputBuffer 解析并执行客户端连接的所有命令,在其中使用 processInlineBuffer 或者 processMultibulkBuffer 根据 Redis 协议解析命令,最后调用 processCommand 执行命令;
  7. 根据请求命令的类型(SET, GET, DEL, EXEC 等),分配相应的命令执行器去执行,最后调用 addReply 函数族的一系列函数将响应数据写入到对应 client 的写出缓冲区:client->buf 或者 client->reply ,client->buf 是首选的写出缓冲区,固定大小 16KB,一般来说可以缓冲足够多的响应数据,但是如果客户端在时间窗口内需要响应的数据非常大,那么则会自动切换到 client->reply 链表上去,使用链表理论上能够保存无限大的数据(受限于机器的物理内存),最后把 client 添加进一个 LIFO 队列 server.clients_pending_write;
  8. 在事件循环(Event Loop)中,主线程执行 beforeSleep --> handleClientsWithPendingWritesUsingThreads,利用 Round-Robin 轮询负载均衡策略,把 server.clients_pending_write 队列中的连接均匀地分配给worker 线程各自的本地 FIFO 任务队列 io_threads_list[id] 和主线程自己,I/O 线程通过调用 writeToClient 把 client 的写出缓冲区里的数据回写到客户端,主线程忙轮询,等待所有 I/O 线程完成写出任务;
  9. 主线程和所有 worker 线程都完成了写出任务, 主线程结束忙轮询,遍历 server.clients_pending_write 队列,如果 client 的写出缓冲区还有数据遗留,则注册 sendReplyToClient 到该连接的写就绪事件,等待客户端可写时在事件循环中再继续回写残余的响应数据。

Redis主线程和worker线程协作时序图如下:

Redis主线程和worker线程协作示意图

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/582126.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于MATALB编程的深度信念网络DBN的01分类编码三分类预测,DBN算法详细原理

目录 背影 DBN神经网络的原理 DBN神经网络的定义 受限玻尔兹曼机(RBM) DBN的语音分类识别 基本结构 主要参数 数据 MATALB代码 结果图 展望 背影 DBN是一种深度学习神经网络,拥有提取特征,非监督学习的能力,是一种非常好的分类算法,本文将DBN算法对数据采用01编码分析…

【数据湖仓架构】数据湖和仓库:Azure Synapse 视角

是时候将数据分析迁移到云端了。我们将讨论 Azure Synapse 在数据湖和数据仓库范式规模上的定位。 在本文中&#xff0c;我们将讨论 Microsoft 的 Azure Synapse Analytics 框架。具体来说&#xff0c;我们关注如何在其中看到数据仓库和数据湖范式的区别。为了熟悉这个主题&…

【Linux】Linux环境基础工具的基本使用及配置(yum、vim)

​ ​&#x1f4dd;个人主页&#xff1a;Sherry的成长之路 &#x1f3e0;学习社区&#xff1a;Sherry的成长之路&#xff08;个人社区&#xff09; &#x1f4d6;专栏链接&#xff1a;Linux &#x1f3af;长路漫漫浩浩&#xff0c;万事皆有期待 文章目录 Linux软件包管理器 - y…

60分钟吃掉detectron2

本范例演示使用非常有名的目标检测框架detectron2 &#x1f917;&#x1f917; 在自己的数据集(balloon数据)上训练实例分割模型MaskRCNN的方法。 detectron2框架的设计有以下一些优点&#xff1a; 1&#xff0c;强大&#xff1a;提供了包括目标检测、实例分割、全景分割等非常…

Spring Boot启动流程

1 Springboot 启动流程 创建一个StopWatch实例&#xff0c;用来记录SpringBoot的启动时间。 通过SpringFactoriesLoader加载listeners&#xff1a;比如EventPublishingRunListener。 发布SprintBoot开始启动事件&#xff08;EventPublishingRunListener#starting()&#xff0…

性能测试——基本性能监控系统使用

这里写目录标题 一、基本性能监控系统组成二、环境搭建1、准备数据文件 type.db collectd.conf2、启动InfluxDB3、启动grafana4、启动collectd5、Grafana中配置数据源 一、基本性能监控系统组成 Collectd InfluxdDB Grafana Collectd 是一个守护(daemon)进程&#xff0c;用来…

【数据结构】时间复杂度与空间复杂度

目录 前言一、算法效率1. 算法效率的定义 二、时间复杂度1. 时间复杂度的定义2. 时间复杂度的计算 三、空间复杂度1. 空间复杂度的定义2. 空间复杂度的计算 四、时间复杂度曲线图结尾 前言 在学习C语言的时候&#xff0c;大多数的小伙伴们并不会对算法的效率了解&#xff0c;也…

视频采集到录制 - 音频采集到降噪

继续上篇的视频采集到录制 视频采集相对来说还是算正常&#xff0c;如果资源够用&#xff0c;使用第三方库也是种解决办法 但音频采集网上资料相对也少&#xff0c;走了一遍&#xff0c;也发现存在很多坑 1. 音频采集 一般来说&#xff0c;采用MIC采集&#xff0c;采集出来的格…

内存泄露的循环引用问题

内存泄漏一直是很多大型系统故障的根源&#xff0c;也是一个面试热点。那么在编程语言层面已经提供了内存回收机制&#xff0c;为什么还会产生内存泄漏呢&#xff1f; 这是因为应用的内存管理一直处于一个和应用程序执行并发的状态&#xff0c;如果应用程序申请内存的速度&…

希尔伯特旅馆里,住着AI的某种真相

“无穷”和“无穷1”&#xff0c;哪个更大&#xff1f; 已经吸收了不知道多少数据的AI模型&#xff0c;和比他多学习一条数据的模型&#xff0c;哪个更智能&#xff1f; 想聊聊这个问题&#xff0c;出于一个偶然的机会。很早之前我在测试ChatGPT的时候&#xff0c;突然想问他个…

简单工厂、工厂方法、抽象工厂模式-这仨货的区别

要想明白这三玩意的区别就需要知道这三玩意的优缺点&#xff1b; 之所以有三种工厂模式&#xff0c;就说明它们各有所长&#xff0c;能解决不同场景的问题&#xff1b; 一、简单工厂模式 UML图 代码 public class MobileFactory {public static Mobile getMobile(String brand)…

【Linux】线程概述、创建线程、终止线程

目录 线程概述1、创建线程函数解析代码举例 2、终止线程函数解析代码举例 橙色 线程概述 与进程类似&#xff0c;线程是允许应用程序并发执行多个任务的一种机制。一个进程可以包含多个线程。 进程是 CPU 分配资源的最小单位&#xff0c;线程是操作系统调度执行的最小单位。…

回归预测 | MATLAB实现SSA-CNN-LSTM麻雀算法优化卷积长短期记忆神经网络多输入单输出回归预测

回归预测 | MATLAB实现SSA-CNN-LSTM麻雀算法优化卷积长短期记忆神经网络多输入单输出回归预测 目录 回归预测 | MATLAB实现SSA-CNN-LSTM麻雀算法优化卷积长短期记忆神经网络多输入单输出回归预测预测效果基本介绍模型描述程序设计学习总结参考资料 预测效果 基本介绍 MATLAB实现…

【笔记整理】轻量级神经网络 MobileNetV3

【笔记整理】轻量级神经网络 MobileNetV3 文章目录 【笔记整理】轻量级神经网络 MobileNetV31、深度可分离卷积2、翻转残差块和线性瓶颈结构3、h-swish 函数和 SE 模块4、网络结构搜索 近年来关于 CNN 的研究在飞速发展&#xff0c;CNN 模型在目标检测、图像分割等领域都取得了…

力扣sql中等篇练习(二十九)

力扣sql中等篇练习(二十九) 1 计算每个销售人员的影响力 1.1 题目内容 1.1.1 基本题目信息1 1.1.2 基本题目信息2 1.1.3 示例输入输出 a 示例输入 b 示例输出 1.2 示例sql语句 # Write your MySQL query statement below SELECT s1.salesperson_id,s1.name,IFNULL(t.total…

毕业季到底是去大厂还是去小公司

(点击即可收听) 毕业季到底是去大厂还是去小公司 相信很多人在选择大小公司的时候,会比较痛苦,外面的人想进去,里面的人想出来&#xff0c;至于选择大厂还是小公司 这是因人而异的,不同的阶段都可以有不同的选择 进大厂不一定就是对的,进小公司也不一定就是错的,学习东西,增长经…

股票量化分析工具QTYX使用攻略——涨停个股挖掘热门板块(更新2.6.5)

搭建自己的量化系统 如果要长期在市场中立于不败之地&#xff01;必须要形成一套自己的交易系统。 行情不等人&#xff01;边学习边实战&#xff0c;在实战中学习才是最有效地方式。于是我们分享一个即可以用于学习&#xff0c;也可以用于实战炒股分析的量化系统——QTYX。 QTY…

软考A计划-试题模拟含答案解析-卷九

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例 &#x1f449;关于作者 专注于Android/Unity和各种游戏开发技巧&#xff0c;以及各种资源分享&am…

JetBrains的多数据库管理和SQL工具DataGrip 2023版本在Win10系统的下载与安装配置教程

目录 前言一、DataGrip 安装二、使用配置总结 前言 DataGrip是一款多数据库管理和SQL工具&#xff0c;适用于不同类型的数据库。它提供了丰富的功能和工具&#xff0c;可以帮助开发人员更高效地管理数据库、编写SQL查询和执行数据操作。 DataGrip的主要特点&#xff1a; ——…

这里有3个Tips,也许可以帮你躲过ChatGPT大规模封号 | AIGC实践

据说&#xff0c;从昨天开始&#xff0c;ChatGPT又双叒叕开始大规模封号&#xff0c;很多注册用户收到这样一则消息&#xff1a; 大意是说&#xff1a;OpenAI 发现了你的 ChatGPT 账号存在可疑活动&#xff0c;为了保障平台安全&#xff0c;已自动退款并取消你的 ChatGPT Plus …