目录
一. TCP协议
二. 服务端模块代码实现
三. 服务端调用模块代码实现
四. 客户端模块代码实现
五. 初始版本结果展示
六. 多进程版服务端
七. 多线程版服务端
八. 线程池版服务端
前文已经讲了UDP的知识(点此查看)。今天来讲讲TCP。
一. TCP协议
何为TCP协议的含义,之前粗略提及了一下TCP与UDP的区别:
TCP:
• 传输层协议
• 有连接
• 可靠传输
• 面向字节流
UDP:
• 传输层协议
• 无连接
• 不可靠传输
• 面向数据报
那何为可靠,何为不可靠呢?
TCP协议是有连接的。如果两台主机想要建立通信,就必须先建立连接,通过三次握手(后续博客会讲到)建立连接,只有当连接成功后,才能进行通信。
TCP可靠性体现在:如果数据在传输过程中出现了丢包等等情况,会有相应的解决方法。
TCP可靠性实现方法:
- 确认和重传: TCP 使用确认和重传机制来确保数据的完整性和可靠性。接收方会发送确认(ACK)给发送方,告知已成功接收到数据,如果发送方未收到确认,会重新发送数据。
- 序号和顺序控制: TCP 会为每个数据段分配一个序号,并且在接收端按序重组数据,以确保数据包按正确的顺序交付。
- 流量控制: TCP 使用滑动窗口协议进行流量控制,确保发送方和接收方之间的数据传输速率合理,避免了数据包的过载和丢失。
- 拥塞控制: TCP 还实现了拥塞控制机制,通过动态调整发送速率来避免网络拥塞,以提高整体网络性能和稳定性。
但是,并不是说,TCP就是百利而无一害的。前面说了,TCP还有一个特性---面向字节流,这就导致了,目标主机读取到的内容可能并不是完整的源主机发送的内容。此处来填补这个知识。
由于TCP是面向字节流的,所以可能会产生粘包问题。
顾名思义,粘包问题指的是多条数据黏在了一起,被当做一条数据,造成读取错误。
其本质就是
tcp在传输层对应用数据边界不敏感(不关注应用层数据边界)。
因此需要程序猿在应用层进行数据边界管理。如何管理呢?
- 特殊字符间隔: 使用此方法则必须对数据中的特殊字符进行转义,否则会造成二义
- 数据定长:规定固定长度的数据,实际数据少的则需要进行补位
- 在应用层头部定义数据长度(例如http协议,udp协议,先取头部,再根据头部中的数据长度取出数据)
二. 服务端模块代码实现
同样我们将服务端封装成一个类。
const static int defaultsockfd=-1;
class TcpServer
{
public:
TcpServer(int port)
:_port(port)
,_listensock(defaultsockfd)
,_isrunning(false)
{}
~TcpServer()
{
if(_listensock>defaultsockfd)
{
::close(_listensock);
}
}
private:
uint16_t _port;
int _listensock;
bool _isrunning;
};
前两个成员不必多说,即端口号和套接字。我们还加了一个bool类型的变量,表示服务器是否在运行,那我们每次启动服务器的时候就应该判断_isrunning==true吗?只有为真才能启动服务器!
与UDP一样,服务器需要一个初始化函数。
服务端初始成员函数:
enum
{
SOCKET_ERROR = 1,
BIND_ERROR,
LISTEN_ERROR,
USAGE_ERROR
};
const static int gbacklog=10;
void InitServer()
{
//1.创建流式套接字
_listensock=::socket(AF_INET,SOCK_STREAM,0);
if(_listensock<0)
{
LOG(FATAL,"socket error\n");
exit(SOCKET_ERROR);
}
LOG(DEBUG,"socket create success, sockfd is: %d\n",_listensock);
//2.bind
struct sockaddr_in local;
memset(&local,0,sizeof(local));
local.sin_family=AF_INET;
local.sin_port=htons(_port);
local.sin_addr.s_addr=INADDR_ANY;
int n=::bind(_listensock,(struct sockaddr*)&local,sizeof(local));
if(n<0)
{
LOG(FATAL,"bind error\n");
exit(BIND_ERROR);
}
LOG(DEBUG,"bind success,sockfd is: %d\n",_listensock);
//3.tcp是面向连接的,所以通信之前,必须先建立连接,服务器是被连接的
//tcpserver启动,未来首先要一直等待客户的连接到来
n=::listen(_listensock,gbacklog);
if(n<0)
{
LOG(FATAL,"listen error\n");
exit(LISTEN_ERROR);
}
LOG(DEBUG,"listen success,sockfd is: %d\n",_listensock);
}
LOG函数是我们实现的日志功能函数。
#pragma once
//日志
#include<iostream>
#include<fstream>
#include<cstdio>
#include<string>
#include<ctime>
#include<unistd.h>
#include<sys/types.h>
#include<stdarg.h>
#include<pthread.h>
#include"LockGuard.hpp"
using namespace std;
bool gIsSave=false;
const string logname="log.txt";
void SaveFile(const string& filename,const string& message)
{
ofstream out(filename,ios::app);
if(!out.is_open())
{
return;
}
out<<message;
out.close();
}
//1.日志是有等级的
enum Level
{
DEBUG=0,
INFO,
WARNING,
ERROR,
FATAL
};
string LevelToString(int level)
{
switch(level)
{
case DEBUG: return "Debug";break;
case INFO: return "Info";break;
case WARNING: return "Warning";break;
case ERROR: return "Error";break;
case FATAL: return "Fatal";break;
default: return "Unknown";break;
}
}
string GetTimeString()
{
time_t curr_time=time(nullptr);
struct tm* format_time=localtime(&curr_time);
if(format_time==nullptr) return "None";
char time_buffer[64];
snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d",
format_time->tm_year+1900,format_time->tm_mon+1,format_time->tm_mday,
format_time->tm_hour,format_time->tm_min,format_time->tm_sec);
return time_buffer;
}
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
//2.日志是由格式的
// 日志等级 时间 代码所在的文件名/行数 日志的内容...
void LogMessage(string filename,int line,bool issave,int level,const char* format,...)
{
string levelstr=LevelToString(level);
string timestr=GetTimeString();
pid_t selfid=getpid();
//可变参数部分处理
char buffer[1024];
va_list arg;
va_start(arg,format);
vsnprintf(buffer,sizeof(buffer),format,arg);
va_end(arg);
LockGuard lockguard(&lock);
string message;
message="["+timestr+"]"+"["+levelstr+"]"+"[pid: "
+to_string(selfid)+"]"+"["+filename+"]"
+"["+to_string(line)+"]"+buffer+"\n";
if(!issave)
{
cout<<message;
}
else
{
SaveFile(logname,message);
}
}
void Test(int num,...)
{
va_list arg;
va_start(arg,num);
while(true)
{
int data=va_arg(arg,int);
cout<<"data: "<<data<<endl;
num--;
}
va_end(arg);//arg==NULL
}
//C99新特性 __VA_ARGS__
#define LOG(level,format,...) do {LogMessage(__FILE__,__LINE__,gIsSave,level,format,##__VA_ARGS__);} while(0)
#define EnableFile() do {gIsSave=true;} while(0)
#define EnableScreen() do {gIsSave=false;} while(0)
可以看见前面两部跟UDP是大差不差的,都是创建套接字,然后绑定结构体信息。而我们的TCP还多了一步,就是需要先通过listen函数建立连接,因为TCP是面向连接的,所以通信之前必须先建立连接。
先来介绍一下listen接口。
int listen(int sockfd, int backlog);
参数介绍:
- sockfd:即开始创建的套接字。
- backlog:定义sockfd的挂起连接队列可能增长到的最大长度,通俗来讲就是最多可以有多少字节。如果超过了这个长度,就会发生错误。
返回值介绍:
成功的话,0会被返回;失败1会被返回,错误码也会被设置。
服务端运行成员函数:
我们先来看看初始版本 ,下面会讲解其他版本。
void Service(int sockfd,InetAddr client)
{
LOG(DEBUG,"get a new link,info %s:%d,fd: %d\n",client.Ip().c_str(),client.Port(),sockfd);
string clientaddr="["+client.Ip()+":"+to_string(client.Port())+"]# ";
while(true)
{
char inbuffer[1024];
//1. tcp面向字节流,不能保证inbuffer,是一个完整的命令字符串,例如"ls -a -l",可能只读到ls -a
//所以对于tcp,我们必须保证在应用层面我们收到的是一个完整的请求
ssize_t n=read(sockfd,inbuffer,sizeof(inbuffer)-1);//bug
if(n>0)
{
inbuffer[n]=0;
cout<<clientaddr<<inbuffer<<endl;
string echo_string="[server echo]# ";
echo_string+=inbuffer;
write(sockfd,echo_string.c_str(),echo_string.size());
}
else if(n==0)
{
//client退出&&关闭连接了
LOG(INFO,"%s quit\n",clientaddr.c_str());
break;
}
else
{
LOG(ERROR,"read error\n",clientaddr.c_str());
break;
}
}
cout<<"server开始退出"<<endl;
shutdown(sockfd,SHUT_RD);
cout<<"shut _ rd"<<endl;
//::close(sockfd);//不关闭会发生文件描述符泄露
}
void Loop()//服务端运行函数
{
_isrunning=true;
//4.不能直接接收数据,应该先获取连接
while(true)
{
struct sockaddr_in peer;
socklen_t len=sizeof(peer);
int sockfd=::accept(_listensock,(struct sockaddr*)&peer,&len);
//注意此处accept返回值也是一个文件描述符,要区分于_listensock
if(sockfd<0)
{
LOG(WARNING,"accept error\n");//accept失败了,并不影响,再次accept即可
continue;
}
//Version 0
Service(sockfd,InetAddr(peer));
}
_isrunning=false;
}
InetAddr是我们封装的类。
#include<iostream>
#include<string>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
using namespace std;
class InetAddr
{
private:
void GetAddress(string* ip,uint16_t* port)
{
*port=ntohs(_addr.sin_port);//网络字节序转为主机字节序
*ip=inet_ntoa(_addr.sin_addr);//将网络字节序IP转为点分式十进制IP
}
public:
InetAddr(const struct sockaddr_in &addr)
:_addr(addr)
{
GetAddress(&_ip,&_port);
}
string Ip()
{
return _ip;
}
uint16_t Port()
{
return _port;
}
~InetAddr()
{}
private:
struct sockaddr_in _addr;
string _ip;
uint16_t _port;
};
先来讲讲服务端运行函数---Loop,此处我们不能像UDP一样直接read接收数据,而应该获取连接。再次说明了TCP是有连接的。
先来介绍接收函数accept。
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
参数介绍:
- sockfd:即开始创建的套接字。
- addr:指向sockaddr结构体。这个结构体的信息用接收到的客户端的信息填充(端口号、协议簇等等)
- addrlen:前面addr指向的结构的长度取地址。
返回值介绍:
成功的话,会返回一个文件描述符;失败则会返回-1,错误码会被设置。
此处会有读者疑惑,为什么又返回了一个文件描述符,那之前TcpServer类里面的成员_listensock又跟这个什么关系呢?下面来举个例子:
假如,今天天气不错,读者大大出去逛着街,饭点了,刚好经过一家餐馆,门口有一位服务员小A,他看见你,就来招呼你进去吃饭,你也同意了。然后服务员小A领着你进门,朝里面喊了一句:客人一位,然后后厨出来了一个服务员小B,就领着你到餐桌,让你点菜。小A又继续出去询问路过的人吃不吃饭。
在这个故事中,服务员小A就是我们的TcpServer类中的成员_listensock,服务员小B就是我们的accept接口返回的文件描述符。
所以我们的Loop函数中,如果accept函数接受失败了,那么也无所谓,只需不用做处理,下一次继续接收就行。
三. 服务端调用模块代码实现
只需创建出TcpServer对象,然后依次调用初始化函数InitServer和Loop函数即可。
#include"TcpServer.hpp"
#include<memory>
using namespace std;
void Usage(string proc)
{
cout<<"Usage:\n\t"<<proc<<" local_port\n"<<endl;
}
// ./tcpserver port
int main(int argc,char *argv[])
{
if(argc!=2)
{
Usage(argv[0]);
return 1;
}
EnableScreen();
uint16_t port=stoi(argv[1]);
unique_ptr<TcpServer> tsvr=make_unique<TcpServer>(port);
tsvr->InitServer();
tsvr->Loop();
return 0;
}
TcpServer.hpp文件即我们的服务端代码。
四. 客户端模块代码实现
#include<iostream>
#include<string>
#include<unistd.h>
#include<cstring>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include"Log.hpp"
using namespace std;
void Usage(string proc)
{
cout<<"Usage:\n\t"<<proc<<" serverip serverport\n"<<endl;
}
// ./tcpclient serverip serverport
int main(int argc,char *argv[])
{
if(argc!=3)
{
Usage(argv[0]);
exit(1);
}
string serverip=argv[1];
uint16_t serverport=stoi(argv[2]);
int sockfd=::socket(AF_INET,SOCK_STREAM,0);
if(sockfd<0)
{
cerr<<"socket error\n"<<endl;
exit(2);
}
//与udpclient一样,不需显式bind
//构建目标主机的socket信息
struct sockaddr_in server;
memset(&server,0,sizeof(server));
server.sin_family=AF_INET;
server.sin_port=htons(serverport);
server.sin_addr.s_addr=inet_addr(serverip.c_str());
int n=connect(sockfd,(struct sockaddr*)&server,sizeof(server));
if(n<0)
{
cerr<<"connect error"<<endl;
exit(3);
}
while(true)
{
cout<<"Please Enter# ";
string outstring;
getline(cin,outstring);
ssize_t s=send(sockfd,outstring.c_str(),outstring.size(),0);//也可以用write
if(s>0)
{
char inbuffer[1024];
ssize_t m=recv(sockfd,inbuffer,sizeof(inbuffer)-1,0);//也可以用read
if(m>0)
{
inbuffer[m]=0;
cout<<inbuffer<<endl;
}
else
{
break;
}
}
else
{
break;
}
}
shutdown(sockfd,SHUT_WR);
//close(sockfd);
return 0;
}
前面两步依旧与UDP一样,都是创建出套接字,然后绑定结构体信息。由于TCP是有连接的,所以第三步需要调用connect接口进行连接。
下面先来讲讲connect接口:
int connect(int sockfd, const struct sockaddr *addr,socklen_t addrlen);
参数介绍:
- sockfd:即开始创建的套接字。
- addr:指向sockaddr结构体。这个结构体的信息用接收到的客户端的信息填充(端口号、协议簇等等)
- addrlen:前面addr指向的结构的长度取地址。
返回值介绍:
成功的话,会返回0;失败则会返回-1,错误码会被设置。
后续就是与UDP类似的了,先发出信息,然后接收服务端返回回来的信息。
五. 初始版本结果展示
首先,可以实现要求。但是既然我们说他是初始版本,那肯定说明还有更优的版本。我们来看看这个版本有什么缺陷。
当上升到多台客户端的时候,发现只有一个客户端能与服务器通信。
这就是我们这个版本的缺陷:一次只能处理一个请求。
为什么呢?因为是单进程的,所以服务器只能容许 一个客户端与其通信。
那为什么前面UDP单进程的时候,还是能实现多个客户端通信呢?
因为前面UDP只有一个sockfd,此处涉及多个sockfd。
所以自然而然的想到了第二个版本----采用多进程来实现服务端。
六. 多进程版服务端
由于前面单进程版服务端的问题,所以来介绍多进程版的服务端。
void Service(int sockfd,InetAddr client)
{
LOG(DEBUG,"get a new link,info %s:%d,fd: %d\n",client.Ip().c_str(),client.Port(),sockfd);
string clientaddr="["+client.Ip()+":"+to_string(client.Port())+"]# ";
while(true)
{
char inbuffer[1024];
//1. tcp面向字节流,不能保证inbuffer,是一个完整的命令字符串,例如"ls -a -l",可能只读到ls -a
//所以对于tcp,我们必须保证在应用层面我们收到的是一个完整的请求
ssize_t n=read(sockfd,inbuffer,sizeof(inbuffer)-1);//bug
if(n>0)
{
inbuffer[n]=0;
cout<<clientaddr<<inbuffer<<endl;
string echo_string="[server echo]# ";
echo_string+=inbuffer;
write(sockfd,echo_string.c_str(),echo_string.size());
}
else if(n==0)
{
//client退出&&关闭连接了
LOG(INFO,"%s quit\n",clientaddr.c_str());
break;
}
else
{
LOG(ERROR,"read error\n",clientaddr.c_str());
break;
}
}
cout<<"server开始退出"<<endl;
shutdown(sockfd,SHUT_RD);
cout<<"shut _ rd"<<endl;
//::close(sockfd);//不关闭会发生文件描述符泄露
}
void Loop()
{
_isrunning=true;
//4.不能直接接收数据,应该先获取连接
while(true)
{
struct sockaddr_in peer;
socklen_t len=sizeof(peer);
int sockfd=::accept(_listensock,(struct sockaddr*)&peer,&len);
//注意此处accept返回值也是一个文件描述符,要区分于_listensock
if(sockfd<0)
{
LOG(WARNING,"accept error\n");//accept失败了,并不影响,再次accept即可
continue;
}
pid_t id=fork();
if(id==0)
{
//child:关心sockfd,不关心listensock
::close(_listensock);//不需要listensock,建议关掉,不影响父进程
if(fork()>0) exit(0);
Service(sockfd,InetAddr(peer));//孙子进程--孤儿进程--系统领养
exit(0);
}
//father:关心listensock,不关心sockfd
::close(sockfd);//不需要sockfd,建议关掉
waitpid(id,nullptr,0);
}
_isrunning=false;
}
Service函数还是跟前面一样,重点是Loop函数。
我们发现多进程用了两次fork函数创建子进程。我们来着重研究一下。
pid_t id=fork();
if(id==0)
{
//child:关心sockfd,不关心listensock
::close(_listensock);//不需要listensock,建议关掉,不影响父进程
if(fork()>0) exit(0);
Service(sockfd,InetAddr(peer));//孙子进程--孤儿进程--系统领养
exit(0);
}
//father:关心listensock,不关心sockfd
::close(sockfd);//不需要sockfd,建议关掉
waitpid(id,nullptr,0);
来看看最后的效果吧:
首先单客户端仍然能进行通信,那多个呢?
可以看到仍然能正常通信。
既然多进程可以实现,猜想一下多线程是不是也能实现呢?结果是显然的。
七. 多线程版服务端
struct ThreadData
{
public:
ThreadData(int fd,InetAddr addr,TcpServer* s)
:sockfd(fd)
,clientaddr(addr)
,self(s)
{}
public:
int sockfd;
InetAddr clientaddr;
TcpServer* self;
};
void Service(int sockfd,InetAddr client)
{
LOG(DEBUG,"get a new link,info %s:%d,fd: %d\n",client.Ip().c_str(),client.Port(),sockfd);
string clientaddr="["+client.Ip()+":"+to_string(client.Port())+"]# ";
while(true)
{
char inbuffer[1024];
//1. tcp面向字节流,不能保证inbuffer,是一个完整的命令字符串,例如"ls -a -l",可能只读到ls -a
//所以对于tcp,我们必须保证在应用层面我们收到的是一个完整的请求
ssize_t n=read(sockfd,inbuffer,sizeof(inbuffer)-1);//bug
if(n>0)
{
inbuffer[n]=0;
cout<<clientaddr<<inbuffer<<endl;
string echo_string="[server echo]# ";
echo_string+=inbuffer;
write(sockfd,echo_string.c_str(),echo_string.size());
}
else if(n==0)
{
//client退出&&关闭连接了
LOG(INFO,"%s quit\n",clientaddr.c_str());
break;
}
else
{
LOG(ERROR,"read error\n",clientaddr.c_str());
break;
}
}
cout<<"server开始退出"<<endl;
shutdown(sockfd,SHUT_RD);
cout<<"shut _ rd"<<endl;
//::close(sockfd);//不关闭会发生文件描述符泄露
}
static void* HandlerSock(void* args)
{
pthread_detach(pthread_self());
ThreadData* td=static_cast<ThreadData*>(args);
td->self->Service(td->sockfd,td->clientaddr);
delete td;
return nullptr;
}
void Loop()
{
_isrunning=true;
//4.不能直接接收数据,应该先获取连接
while(true)
{
struct sockaddr_in peer;
socklen_t len=sizeof(peer);
int sockfd=::accept(_listensock,(struct sockaddr*)&peer,&len);
//注意此处accept返回值也是一个文件描述符,要区分于_listensock
if(sockfd<0)
{
LOG(WARNING,"accept error\n");//accept失败了,并不影响,再次accept即可
continue;
}
pthread_t t;
ThreadData* td=new ThreadData(sockfd,InetAddr(peer),this);
pthread_create(&t,nullptr,HandlerSock,td);//将线程分离,使主线程不用等待
}
_isrunning=false;
}
此处我们新封装了一个结构体ThreadData,方便处理。
其实大致思路还是不变,只是利用了线程分离,让创造出来的多线程去处理业务,让主线程不需等待。
来看看效果:
也是能实现预期效果的。
我们还可以利用线程池来实现业务处理。
八. 线程池版服务端
using task_t=function<void()>;
void Service(int sockfd,InetAddr client)
{
LOG(DEBUG,"get a new link,info %s:%d,fd: %d\n",client.Ip().c_str(),client.Port(),sockfd);
string clientaddr="["+client.Ip()+":"+to_string(client.Port())+"]# ";
while(true)
{
char inbuffer[1024];
//1. tcp面向字节流,不能保证inbuffer,是一个完整的命令字符串,例如"ls -a -l",可能只读到ls -a
//所以对于tcp,我们必须保证在应用层面我们收到的是一个完整的请求
ssize_t n=read(sockfd,inbuffer,sizeof(inbuffer)-1);//bug
if(n>0)
{
inbuffer[n]=0;
cout<<clientaddr<<inbuffer<<endl;
string echo_string="[server echo]# ";
echo_string+=inbuffer;
write(sockfd,echo_string.c_str(),echo_string.size());
}
else if(n==0)
{
//client退出&&关闭连接了
LOG(INFO,"%s quit\n",clientaddr.c_str());
break;
}
else
{
LOG(ERROR,"read error\n",clientaddr.c_str());
break;
}
}
cout<<"server开始退出"<<endl;
shutdown(sockfd,SHUT_RD);
cout<<"shut _ rd"<<endl;
//::close(sockfd);//不关闭会发生文件描述符泄露
}
void Loop()
{
_isrunning=true;
//4.不能直接接收数据,应该先获取连接
while(true)
{
struct sockaddr_in peer;
socklen_t len=sizeof(peer);
int sockfd=::accept(_listensock,(struct sockaddr*)&peer,&len);
//注意此处accept返回值也是一个文件描述符,要区分于_listensock
if(sockfd<0)
{
LOG(WARNING,"accept error\n");//accept失败了,并不影响,再次accept即可
continue;
}
task_t t=bind(&TcpServer::Service,this,sockfd,InetAddr(peer));
ThreadPool<task_t>::GetInstance()->Enqueue(t);
}
_isrunning=false;
}
线程池代码为:
#pragma once
//单例模式的线程池
#include<iostream>
#include<vector>
#include<queue>
#include<pthread.h>
#include"Thread.hpp"
#include"Log.hpp"
#include"LockGuard.hpp"
using namespace std;
using namespace ThreadModule;
const static int gdefaultthreadnum=3;
template<typename T>
class ThreadPool
{
private:
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnLockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void ThreadSleep()
{
pthread_cond_wait(&_cond,&_mutex);
}
void ThreadWakeup()
{
pthread_cond_signal(&_cond);
}
void ThreadWakeAll()
{
pthread_cond_broadcast(&_cond);
}
//私有的
ThreadPool(int threadnum=gdefaultthreadnum)
:_threadnum(threadnum)
,_waitnum(0)
,_isrunning(false)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_cond,nullptr);
LOG(INFO,"ThreadPool Construct()");
}
void Start()
{
for(auto& thread:_threads)
{
thread.Start();
}
}
void HandlerTask(string name)//类的成员方法,也可以成为另一个类的回调方法,方便我们继续类级别的互相调用
{
LOG(INFO,"%s is running\n",name.c_str());
while(true)
{
//1.保证队列安全
LockQueue();
//2.队列中不一定有数据
while(_task_queue.empty() && _isrunning)
{
_waitnum++;
ThreadSleep();
_waitnum--;
}
//2.1 如果线程池已经退出了 && 任务队列是空的
if(_task_queue.empty() && !_isrunning)
{
UnLockQueue();
break;
}
//2.2 如果线程池不退出 && 任务队列不是空的
//2.3 如果线程池已经退出 && 任务队列不是空的 --- 处理完所有的任务,然后再退出
//3.一定有任务,处理任务
T t=_task_queue.front();
_task_queue.pop();
UnLockQueue();
LOG(DEBUG,"%s get a task",name.c_str());
//4.处理任务,这个任务属于线程独占的任务,所以不能放在加锁和解锁之间
t();
//LOG(DEBUG,"%s handler a task,result is: %s",name.c_str(),t.ResultToString());
}
}
void InitThreadPool()
{
//指向构建出所有的线程,并不自动
for(int num=0;num<_threadnum;num++)
{
string name="thread-"+to_string(num+1);
_threads.emplace_back(bind(&ThreadPool::HandlerTask,this,placeholders::_1),name);
LOG(INFO,"init thread %s done\n",name.c_str());
}
_isrunning=true;
}
//复制拷贝禁用
ThreadPool<T> &operator=(const ThreadPool<T>&)=delete;
ThreadPool(const ThreadPool<T> &)=delete;
public:
static ThreadPool<T> *GetInstance()
{
//如果是多线程获取线程池对象,下面的代码就有问题,所以要加锁
//双判断的方式,可以有效减少获取单例的加锁成本,而且保证线程安全
if(_instance==nullptr)//只有第一次会创建对象,后续都是获取,这样就不用每次都申请锁
{//保证第二次之后,所有线程,不用再加锁,直接返回_instance单例对象
LockGuard lockguard(&_lock);
if (_instance == nullptr)
{
_instance = new ThreadPool<T>();
_instance->InitThreadPool();
_instance->Start();
LOG(DEBUG, "创建线程池单例\n");
}
else
{
LOG(DEBUG, "获取线程池单例\n");
}
}
return _instance;
}
bool Enqueue(const T& t)
{
bool ret=false;
LockQueue();
if(_isrunning)
{
_task_queue.push(t);
if(_waitnum>0)
{
ThreadWakeup();
}
LOG(DEBUG,"enqueue task success\n");
ret=true;
}
UnLockQueue();
return ret;
}
void Stop()
{
LockQueue();
_isrunning=false;
ThreadWakeAll();
UnLockQueue();
}
void Wait()
{
for(auto& thread:_threads)
{
thread.Join();
LOG(INFO,"%s is quit",thread.name().c_str());
}
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
int _threadnum;
vector<Thread> _threads;
queue<T> _task_queue;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
int _waitnum;
bool _isrunning;
//添加单例模式--懒汉
static ThreadPool<T> *_instance;
static pthread_mutex_t _lock;//保护单例的锁
};
template<typename T>
ThreadPool<T> *ThreadPool<T>::_instance=nullptr;
template<typename T>
pthread_mutex_t ThreadPool<T>::_lock=PTHREAD_MUTEX_INITIALIZER;
其中Thread.hpp文件是我们封装的原生线程库:
//封装原生线程库
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include<iostream>
#include<string>
#include<functional>
#include<unistd.h>
#include<pthread.h>
using namespace std;
namespace ThreadModule
{
using func_t=function<void(string&)>;
class Thread
{
public:
void Excute()
{
_func(_threadname);
}
public:
Thread(func_t func,const string& name="none-name")
:_func(func)
,_threadname(name)
,_stop(true)
{}
static void* threadroutine(void* args)//类成员函数,形参是有this指针的!
{
Thread *self=static_cast<Thread*>(args);
self->Excute();
return nullptr;
}
bool Start()
{
int n=pthread_create(&_tid,nullptr,threadroutine,this);
if(!n)
{
_stop=false;
return true;
}
else
{
return false;
}
}
void Detach()
{
if(!_stop)
{
pthread_detach(_tid);
}
}
void Join()
{
if(!_stop)
{
pthread_join(_tid,nullptr);
}
}
string name()
{
return _threadname;
}
void Stop()
{
_stop=true;
}
~Thread(){}
private:
pthread_t _tid;
string _threadname;
func_t _func;
bool _stop;
};
}
#endif
LockGuard.hpp文件是仿照C++RAII(点此查看)思想封装的锁:
#ifndef __lock_GUARD_HPP__
#define __lock_GUARD_HPP__
#include<iostream>
#include<pthread.h>
class LockGuard
{
public:
LockGuard(pthread_mutex_t* mutex)
:_mutex(mutex)
{
pthread_mutex_lock(_mutex);//构造加锁
}
~LockGuard()
{
pthread_mutex_unlock(_mutex);
}
private:
pthread_mutex_t* _mutex;
};
#endif
回归正题,线程池版服务端,就是通过线程池转发任务。
来看看效果:
也符合预期。
总结:
好了,到这里今天的知识就讲完了,大家有错误一点要在评论指出,我怕我一人搁这瞎bb,没人告诉我错误就寄了。
祝大家越来越好,不用关注我(疯狂暗示)