1、首先IO模型的内容。
感觉可以简单理解为:我们写代码时,在基础的 IO 操作上做了一些其他的策略,根据策略的不同,一般有阻塞IO和非阻塞IO
1、阻塞IO
就是在操作的时候,比如网络通信中,某一线程使用下面这三个函数接收数据的时候,都有flags参数,就可以设定成非阻塞 MSG_DONTWAIT,这样就不会将本线程的运行卡在这个函数这里,可以进行其他的操作了。
ssize_t recv(int sockfd, void *buf, size_t len, int flags); ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen); ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
这也有问题,因为本线程既然调用这个函数了,就是用它获取信息的,现在设定成非阻塞后。没有信息直接就做别的事情,怎么去继续读取信息呢?
这就只能来回的循环这一步,来进行查询信息是否已经从内核态读取到了用户态,这样次数多了,也是浪费CPU资源。所以用select和epoll等来解决这个问题。
(这个IO的话,我理解上它是我们调用函数比如recv,read这样的系统调用函数时,会对内核进行访问读取数据,当有数据准备好时,从内核空间拷贝到用户空间或者说会直接读取到我们在程序中自己设定的存储空间中。
简单说就是这样的过程 : 从 磁盘---》内核空间 -----》用户空间(程序存储区)
比如 read( int fd, void *buf , size_t count); //buf 就是我们设定的程序存储空间。fd的话,就是磁盘中的文件对应的文件描述符 / 或者网络通信链接的connfd等
2、怎么实现线程安全
1、首先,线程安全这个问题(基本上就是指对共享资源的访问是安全的),只要提到就一定是发生在多线程的情况下,并且这个问题是针对多个线程访问同一共享资源的情况(线程共享堆区,静态变量,全局变量,还有文件系统的东西等), 当多个线程对同一资源进行“写”操作的时候,或者说对同一资源进行改动的时候,会不会出现问题。(而我们使用多线程就是为了能够执行更多的任务,做更多的功能,提高效率,且线程之间一定需要有隔离,不能乱掺和其他线程的工作,)
比如我们其中一个线程对配置文件进行读取操作,读取后使用配置信息进行工作。
但是,在我们读取完之前,另外一个线程对配置文件进行了更新迭代!!!,这种情况下,其他正在读取配置文件的线程,可能就读取不到需要的信息,也就不能进行接下来的工作,,这多线程不就崩了(其他能解决的方法很多,比如循环读取验证配置信息,但是这样不是更加浪费时间资源在这个配置文件的读取校验上)。
互斥锁解决线程安全的问题:
使用互斥锁就是挺好使的一个机制(使用互斥锁对共享资源进行上锁,或者说,更细节一些的,我们在每一个可能会导致多线程访问共享资源的点上都使用互斥锁去给共享资源上个锁,这样就安全了,一定记得解锁)
接Socket网络编程-池化的思路-CSDN博客
互斥锁的使用信息上篇内容写过,需要注意的一个点就是,当上锁后,这个锁到解锁的代码区域里,如果访问到贡献资源,就是自己单独访问,其他线程都需要等待本线程访问完释放这个共享资源才行。
其他的线程如果访问不到,运行到pthread_cond_wait时,这个函数本身就会自动给这个线程解锁,被条件变量阻塞在这里,等到被唤醒后,再次去进行资源访问,(如果还不行可能就还需要阻塞在这里,等下一次唤醒),不过一般这时候就已经可以访问贡献资源了。
2、对栈区和寄存器的访问是独有的。
2、Reactor模型
C++实现算法的一些巧妙点-CSDN博客
1、Socket函数的具体应用过程
这里以读事件做示例,核心功能就是使用服务器实现读操作,使用 epoll_wait 监听socket等就绪事件(一般就是文件描述符,sockfd关联的客户端链接)
(1)、其中比较特别的就是,我们使用线程时,进行任务处理的就是pthread_create 的回调函数;这里我们就需要注册这个函数。
还需要注册监听的事件(将相应的事件加到epollfd中,epoll_ctl())。
(注册,简单说提前设定好函数功能,和监听的事件,当代码运行到相应的位置时,会去调用或者处理这些事件)下面文章也有介绍Linux 信号和信号量小记-CSDN博客
(2)另外呢就是accept这个函数,之前也写过;这个函数只从listen 管理的两个队列(全链接和半链接队列)中,从全连接队列取出一个已经链接的客户端(fd),生成一个新的文件描述符,具体如下。
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
accept()系统调用:
用于基于连接的套接字类型(SOCK_STREAM, SOCK_SEQPACKET)。它从监听套接字sockfd的阻塞连接队列中提取第一个连接请求,创建一个新的连接套接字,并返回一个引用该套接字的新文件描述符。新创建的套接字未处于监听状态。原始套接字sockfd不受此调用的影响。(从sockfd关联的的队列中,提取第一个链接请求 再创建新的套接字,这个新的也需要我们设置添加进监听集合中,这个新的套接字就代表原来的链接请求。)
select ()函数:
select()允许程序监视多个文件描述符,直到一个或多个文件描述符“准备好”进行某类I/O操作。如果可以执行相应的I/O操作(例如,read(2)或足够小的write(2))而不阻塞,则认为文件描述符已准备就绪(到底什么时候算就绪,后面还得研究清楚)
这个应该是和内核空间(链接的文件描述符的读写缓冲区的设定有关,达到条件可读,可写)接Socket网络编程-池化的思路-CSDN博客这篇里面有写,每个sockfd创建的时候都有读写缓冲区。(类似我们写一个char * buf[ 1024] ,可以设定字节的大小,比如内容超过50字节可读,内容小于100字节,可写入等)----> 这个具体问题,后面找找源码看看。
2、select示例程序
这是一个服务器简单代码,只是将一个客户端发过来的消息,转发给其他的客户端(类似我们的消息群发)
int main(int argc, char *argv[]){ // ./server ARGS_CHECK(argc,3); int sockFd = socket(AF_INET,SOCK_STREAM,0); ERROR_CHECK(sockFd,-1,"socket"); int optval = 1; int ret = setsockopt(sockFd,SOL_SOCKET,SO_REUSEADDR,&optval,sizeof(int)); ERROR_CHECK(ret,-1,"setsockopt"); struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(atoi(argv[2])); addr.sin_addr.s_addr = inet_addr(argv[1]); ret = bind(sockFd,(struct sockaddr *)&addr,sizeof(addr)); ERROR_CHECK(ret,-1,"bind"); ret = listen(sockFd,10); ERROR_CHECK(ret,-1,"listen"); fd_set rdset;//单纯地去保存就绪的fd fd_set monitorSet;//使用一个单独的监听集合 FD_ZERO(&monitorSet); FD_SET(sockFd,&monitorSet); char buf[4096] = {0}; int netFdArr[10] = {0}; int curConn = 0; //服务端不再监听标准输入 //服务端的功能:处理新的客户端 监听sockFd //处理客户端发送的消息的转发 //这里使用monitor做所有文件描述符的监听集合,使用rdset 只监听已经就绪的链接fd(就绪的标准) //就是accept从listen的已连接队列中取到的链接fd,然后将其加入select监听中,看看是可读还是可写 while(1){ memcpy(&rdset,&monitorSet,sizeof(fd_set)); select(20,&rdset,NULL,NULL,NULL); //select 能够监视我们需要监视的文件描述符的变化情况——可读写或是异常。 if(FD_ISSET(sockFd,&rdset)){ netFdArr[curConn]= accept(sockFd,NULL,NULL); //此时从listen的全链接队列取出一个,生成新的netfdAddr,这时这个链接的fd还不知道有没有就绪 ERROR_CHECK(netFdArr[curConn],-1,"accept"); FD_SET(netFdArr[curConn],&monitorSet); //这里加入总的监听集合中,因为rdset,是从总的监听集合的来的,所以需要先加进去,再转移给rdset,使用select再进行监听。 printf("new connect is accepted!,curConn = %d\n", curConn); ++curConn; } for(int i = 0;i < curConn; ++i){ if(FD_ISSET(netFdArr[i],&rdset)){ //这个循环只是查找已经读写就绪的fd,之后循环将数据转发给除了发送方以外的客户端 bzero(buf,sizeof(buf)); recv(netFdArr[i],buf,sizeof(buf),0);//这里从当前的链接客户端 读取信息 for(int j = 0; j < curConn; ++j){ //这里循环查询,将信息转发给除了自己以外的客户端 if(j == i){ continue; } send(netFdArr[j],buf,strlen(buf),0); } } } } close(sockFd); }
3、模型代码使用epoll
EPOLLIN : 可读
EPOLLOUT:可写
过程就是
使用epoll_ctl将服务端关联的sockfd 添加监听的时候,就相当于将所有的链接请求加入到了监听中。而epoll_wait 就是返回这些被监听的链接中,已经就绪的链接(是一个链表结构);这些链接fd也可以使用,就是需要判断是不是sockfd链接,如果是别的文件描述符,就出错了。
这是一个简单的测试案例,
#include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <error.h> #include <sys/epoll.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <iostream> #include <string> using std::string; using std::cin; using std::cout; using std::endl; void test() { // 1、创建监听服务器的套接字 int listenfd = socket(AF_INET,SOCK_STREAM, 0); if(listenfd==-1){ perror("socket"); return; } struct sockaddr_in serveraddr; memset(&serveraddr, 0, sizeof(serveraddr)); serveraddr.sin_family = AF_INET; serveraddr.sin_addr.s_addr = inet_addr("127.0.0.1"); serveraddr.sin_port = htons(8888); socklen_t length = sizeof(serveraddr); //端口重用 int opt = 1; int setAddResusetRet = setsockopt(listenfd, SOL_SOCKET,SO_REUSEADDR, &opt, sizeof(opt)); if(-1 == setAddResusetRet){ perror("set sockoptADDR is error"); return ; } int setPortReuseRet = setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)); if(-1 == setPortReuseRet){ perror("set sockoptPORT is error"); return ; } //2、服务器绑定网络地址信息 if(::bind(listenfd,(struct sockaddr *)&serveraddr, sizeof(serveraddr))<0){ perror("bind"); return; } printf("server is listenning ...\n"); //3、让服务器开始监听 //listenfd 跟所有的新连接打交道 if(::listen(listenfd, 128) < 0){ perror("listen"); close(listenfd); return; } int efd = epoll_create1(0); //底层实现实现使用红黑树,还会有个就绪链表 struct epoll_event ev; ev.events = EPOLLIN | EPOLLOUT; //注册的epoll事件类型,可读和可写 ev.data.fd = listenfd; //epoll 要进行监听操作:对listenfd的读写事件进行监听 // //Reactor :注册读就绪事件 int ret = ::epoll_ctl(efd,EPOLL_CTL_ADD,listenfd,&ev); //将socket的listenfd加入epoll的监听中(efd) //不过这个接口套接字添加到监听中后是在什么地位还不清楚 //更像是加入了一个接口(门户),可以通过它可以链接各个客户端的sockfd //accept从listen的全链接队列取出已经链接的请求事件 if(ret < 0){ perror("epoll_ctl1"); close(listenfd); close(efd); return ; } //就绪事件队列设置; struct epoll_event * evtList = (struct epoll_event *)malloc(1024 *sizeof(struct epoll_event)); while(1){ int ready = ::epoll_wait(efd,evtList,1024,3000); //evtlist 就是epoll_wait 执行后,得到的可用客户端链接文件描述符的列表, //和select功能一样,起到一个监听的作用 //在整个结构中,这里是只起到监听功能的和用链表结构表示出来已经就绪的客户端链接 for(int idx = 0; idx < ready; ++idx){ if((evtList[idx].data.fd == listenfd) && (evtList[idx].events & EPOLLIN)){ //判断是否时sockfd就绪,还是其他的文件描述符就绪 //这里表示有就绪的链接fd,使用accept获取新连接,且可读(这里有个问题,就是evtlist 就是返回的可用文件描述符, //为什么不直接拿来用呢,非要经过accept的处理,还没有具体弄清楚过程后面再写) int peerfd = accept(listenfd,NULL,NULL); //TCPConnection connect(peerfd) //将新链接添加到epoll的监听实例中 struct epoll_event evt; evt.data.fd = peerfd; evt.events = EPOLLIN | EPOLLOUT |EPOLLERR; ret = ::epoll_ctl(efd,EPOLL_CTL_ADD,peerfd,&evt); if(ret < 0){ perror("epoll_ctl"); continue; } //新链接到来之后的处理 printf(">> new connection has connected , fd = %d\n",peerfd); //>> 此处可以记录日志,使用log4CPP 完成 //个性化定制 ==》事件处理器 //也可以调用线程去安排处理别的任务等 } else{ char buff[128] = {0}; if(evtList[idx].events& EPOLLIN){ int fd = evtList[idx].data.fd; int ret = ::recv(fd,buff,sizeof(buff),0); if(ret > 0) { //表示获取到的数据大于0,可能是序列化的数据 //对应用层数据进行分析 //拿到最终数据后,进行业务逻辑处理 // //处理完后,是否需要返回给客户 ret = send(fd ,buff, strlen(buff),0); } else if(ret == 0){ printf("connection has closed \n"); struct epoll_event ev; ev.data.fd = fd; epoll_ctl(efd,EPOLL_CTL_DEL,fd, &ev); //将fd和其事件信息从监听结构中删除 //可以记录日志信息 //或者其他工作信息 } } } } } close(listenfd); } int main() { test(); return 0; }
//本来想这一次写详细些Reactor模型的池化和其他东西,但是越写思考的东西越多,下一篇再写