Socket编程---TCP篇

news2025/1/22 9:12:10

目录

一.   TCP协议

二.   服务端模块代码实现

三.   服务端调用模块代码实现

四.   客户端模块代码实现

五.   初始版本结果展示

六.   多进程版服务端

七.   多线程版服务端

八.   线程池版服务端


前文已经讲了UDP的知识(点此查看)。今天来讲讲TCP。

一.   TCP协议

何为TCP协议的含义,之前粗略提及了一下TCP与UDP的区别:

TCP:

• 传输层协议

• 有连接

• 可靠传输

• 面向字节流

UDP:

• 传输层协议

• 无连接

• 不可靠传输

• 面向数据报

那何为可靠,何为不可靠呢?

 TCP协议是有连接的。如果两台主机想要建立通信,就必须先建立连接,通过三次握手(后续博客会讲到)建立连接,只有当连接成功后,才能进行通信。

TCP可靠性体现在:如果数据在传输过程中出现了丢包等等情况,会有相应的解决方法。

TCP可靠性实现方法:

  1. 确认和重传: TCP 使用确认和重传机制来确保数据的完整性和可靠性。接收方会发送确认(ACK)给发送方,告知已成功接收到数据,如果发送方未收到确认,会重新发送数据。
  2. 序号和顺序控制: TCP 会为每个数据段分配一个序号,并且在接收端按序重组数据,以确保数据包按正确的顺序交付。
  3. 流量控制: TCP 使用滑动窗口协议进行流量控制,确保发送方和接收方之间的数据传输速率合理,避免了数据包的过载和丢失。
  4. 拥塞控制: TCP 还实现了拥塞控制机制,通过动态调整发送速率来避免网络拥塞,以提高整体网络性能和稳定性。

但是,并不是说,TCP就是百利而无一害的。前面说了,TCP还有一个特性---面向字节流,这就导致了,目标主机读取到的内容可能并不是完整的源主机发送的内容。此处来填补这个知识。

由于TCP是面向字节流的,所以可能会产生粘包问题

顾名思义,粘包问题指的是多条数据黏在了一起,被当做一条数据,造成读取错误。

其本质就是tcp在传输层对应用数据边界不敏感(不关注应用层数据边界)。

因此需要程序猿在应用层进行数据边界管理。如何管理呢?

  • 特殊字符间隔: 使用此方法则必须对数据中的特殊字符进行转义,否则会造成二义
  • 数据定长:规定固定长度的数据,实际数据少的则需要进行补位
  • 在应用层头部定义数据长度(例如http协议,udp协议,先取头部,再根据头部中的数据长度取出数据) 

二.   服务端模块代码实现

同样我们将服务端封装成一个类。

const static int defaultsockfd=-1;

class TcpServer
{
public:
    TcpServer(int port)
    :_port(port)
    ,_listensock(defaultsockfd)
    ,_isrunning(false)
    {}


    ~TcpServer()
    {
        if(_listensock>defaultsockfd)
        {
            ::close(_listensock);
        }
    }
private:
    uint16_t _port;
    int _listensock;
    bool _isrunning;
};

前两个成员不必多说,即端口号套接字。我们还加了一个bool类型的变量,表示服务器是否在运行,那我们每次启动服务器的时候就应该判断_isrunning==true吗?只有为真才能启动服务器!

与UDP一样,服务器需要一个初始化函数。

服务端初始成员函数:

enum
{
    SOCKET_ERROR = 1,
    BIND_ERROR,
    LISTEN_ERROR,
    USAGE_ERROR
};

const static int gbacklog=10;

void InitServer()
{
    //1.创建流式套接字
    _listensock=::socket(AF_INET,SOCK_STREAM,0);
    if(_listensock<0)
    {
        LOG(FATAL,"socket error\n");
        exit(SOCKET_ERROR);
    }
    LOG(DEBUG,"socket create success, sockfd is: %d\n",_listensock);

    //2.bind
    struct sockaddr_in local;
    memset(&local,0,sizeof(local));
    local.sin_family=AF_INET;
    local.sin_port=htons(_port);
    local.sin_addr.s_addr=INADDR_ANY;
    int n=::bind(_listensock,(struct sockaddr*)&local,sizeof(local));
    if(n<0)
    {
        LOG(FATAL,"bind error\n");
        exit(BIND_ERROR);
    }
    LOG(DEBUG,"bind success,sockfd is: %d\n",_listensock);

    //3.tcp是面向连接的,所以通信之前,必须先建立连接,服务器是被连接的
    //tcpserver启动,未来首先要一直等待客户的连接到来
    n=::listen(_listensock,gbacklog);
    if(n<0)
    {
        LOG(FATAL,"listen error\n");
        exit(LISTEN_ERROR);
    }
    LOG(DEBUG,"listen success,sockfd is: %d\n",_listensock);

}

LOG函数是我们实现的日志功能函数。

#pragma once

//日志
#include<iostream>
#include<fstream>
#include<cstdio>
#include<string>
#include<ctime>
#include<unistd.h>
#include<sys/types.h>
#include<stdarg.h>
#include<pthread.h>
#include"LockGuard.hpp"

using namespace std;

bool gIsSave=false;
const string logname="log.txt";


void SaveFile(const string& filename,const string& message)
{
    ofstream out(filename,ios::app);
    if(!out.is_open())
    {
        return;
    }
    out<<message;
    out.close();
}

//1.日志是有等级的
enum Level
{
    DEBUG=0,
    INFO,
    WARNING,
    ERROR,
    FATAL
};


string LevelToString(int level)
{
    switch(level)
    {
        case DEBUG: return "Debug";break;
        case INFO: return "Info";break;
        case WARNING: return "Warning";break;
        case ERROR: return "Error";break;
        case FATAL: return "Fatal";break;
        default: return "Unknown";break;
    }
}

string GetTimeString()
{
    time_t curr_time=time(nullptr);
    struct tm* format_time=localtime(&curr_time);
    if(format_time==nullptr) return "None";
    char time_buffer[64];
    snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d",
    format_time->tm_year+1900,format_time->tm_mon+1,format_time->tm_mday,
    format_time->tm_hour,format_time->tm_min,format_time->tm_sec);

    return time_buffer;
}

pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;

//2.日志是由格式的
// 日志等级 时间 代码所在的文件名/行数 日志的内容...
void LogMessage(string filename,int line,bool issave,int level,const char* format,...)
{
    string levelstr=LevelToString(level);
    string timestr=GetTimeString();
    pid_t selfid=getpid();

    //可变参数部分处理
    char buffer[1024];
    va_list arg;
    va_start(arg,format);
    vsnprintf(buffer,sizeof(buffer),format,arg);
    va_end(arg);

    LockGuard lockguard(&lock);

    string message;
    message="["+timestr+"]"+"["+levelstr+"]"+"[pid: "
            +to_string(selfid)+"]"+"["+filename+"]"
            +"["+to_string(line)+"]"+buffer+"\n";
    if(!issave)
    {
        cout<<message;
    }
    else
    {
        SaveFile(logname,message);
    }
}

void Test(int num,...)
{
    va_list arg;
    va_start(arg,num);

    while(true)
    {
        int data=va_arg(arg,int);
        cout<<"data: "<<data<<endl;
        num--;
    }

    va_end(arg);//arg==NULL
}

//C99新特性 __VA_ARGS__
#define LOG(level,format,...) do {LogMessage(__FILE__,__LINE__,gIsSave,level,format,##__VA_ARGS__);} while(0)
#define EnableFile() do {gIsSave=true;} while(0)
#define EnableScreen() do {gIsSave=false;} while(0)

可以看见前面两部跟UDP是大差不差的,都是创建套接字,然后绑定结构体信息。而我们的TCP还多了一步,就是需要先通过listen函数建立连接,因为TCP是面向连接的,所以通信之前必须先建立连接。

先来介绍一下listen接口。

int listen(int sockfd, int backlog);

参数介绍: 

  • sockfd:即开始创建的套接字。
  • backlog:定义sockfd的挂起连接队列可能增长到的最大长度,通俗来讲就是最多可以有多少字节。如果超过了这个长度,就会发生错误。

返回值介绍:

 成功的话,0会被返回;失败1会被返回,错误码也会被设置。

 服务端运行成员函数:

我们先来看看初始版本 ,下面会讲解其他版本。

void Service(int sockfd,InetAddr client)
{
    LOG(DEBUG,"get a new link,info %s:%d,fd: %d\n",client.Ip().c_str(),client.Port(),sockfd);
    string clientaddr="["+client.Ip()+":"+to_string(client.Port())+"]# ";
    while(true)
    {
        char inbuffer[1024];
        //1. tcp面向字节流,不能保证inbuffer,是一个完整的命令字符串,例如"ls -a -l",可能只读到ls -a
        //所以对于tcp,我们必须保证在应用层面我们收到的是一个完整的请求
        ssize_t n=read(sockfd,inbuffer,sizeof(inbuffer)-1);//bug
        if(n>0)
        {
            inbuffer[n]=0;
            cout<<clientaddr<<inbuffer<<endl;

            string echo_string="[server echo]# ";
            echo_string+=inbuffer;

            write(sockfd,echo_string.c_str(),echo_string.size());
        }
        else if(n==0)
        {
            //client退出&&关闭连接了
            LOG(INFO,"%s quit\n",clientaddr.c_str());
            break;
        }
        else
        {
            LOG(ERROR,"read error\n",clientaddr.c_str());
            break;
        }
    }
    cout<<"server开始退出"<<endl;
    shutdown(sockfd,SHUT_RD);
    cout<<"shut _ rd"<<endl;
    //::close(sockfd);//不关闭会发生文件描述符泄露
}

void Loop()//服务端运行函数
{
    _isrunning=true;
    //4.不能直接接收数据,应该先获取连接
    while(true)
    {
        struct sockaddr_in peer;
        socklen_t len=sizeof(peer);
        int sockfd=::accept(_listensock,(struct sockaddr*)&peer,&len);
        //注意此处accept返回值也是一个文件描述符,要区分于_listensock
        if(sockfd<0)
        {
            LOG(WARNING,"accept error\n");//accept失败了,并不影响,再次accept即可
            continue;
        }

        //Version 0
        Service(sockfd,InetAddr(peer));
    }
    _isrunning=false;
}

InetAddr是我们封装的类。

#include<iostream>
#include<string>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>

using namespace std;

class InetAddr
{
private:
    void GetAddress(string* ip,uint16_t* port)
    {
        *port=ntohs(_addr.sin_port);//网络字节序转为主机字节序
        *ip=inet_ntoa(_addr.sin_addr);//将网络字节序IP转为点分式十进制IP
    }
public:
    InetAddr(const struct sockaddr_in &addr)
    :_addr(addr)
    {
        GetAddress(&_ip,&_port);
    }

    string Ip()
    {
        return _ip;
    }

    uint16_t Port()
    {
        return _port;
    }

    ~InetAddr()
    {}
private:
    struct sockaddr_in _addr;
    string _ip;
    uint16_t _port;
};

先来讲讲服务端运行函数---Loop,此处我们不能像UDP一样直接read接收数据,而应该获取连接。再次说明了TCP是有连接的。

先来介绍接收函数accept

int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);

参数介绍: 

  • sockfd:即开始创建的套接字。
  • addr:指向sockaddr结构体。这个结构体的信息用接收到的客户端的信息填充(端口号、协议簇等等)
  • addrlen:前面addr指向的结构的长度取地址。

返回值介绍:

成功的话,会返回一个文件描述符;失败则会返回-1,错误码会被设置。

此处会有读者疑惑,为什么又返回了一个文件描述符,那之前TcpServer类里面的成员_listensock又跟这个什么关系呢?下面来举个例子:

假如,今天天气不错,读者大大出去逛着街,饭点了,刚好经过一家餐馆,门口有一位服务员小A,他看见你,就来招呼你进去吃饭,你也同意了。然后服务员小A领着你进门,朝里面喊了一句:客人一位,然后后厨出来了一个服务员小B,就领着你到餐桌,让你点菜。小A又继续出去询问路过的人吃不吃饭。

在这个故事中,服务员小A就是我们的TcpServer类中的成员_listensock,服务员小B就是我们的accept接口返回的文件描述符。

所以我们的Loop函数中,如果accept函数接受失败了,那么也无所谓,只需不用做处理,下一次继续接收就行。

三.   服务端调用模块代码实现

只需创建出TcpServer对象,然后依次调用初始化函数InitServer和Loop函数即可。

#include"TcpServer.hpp"
#include<memory>

using namespace std;

void Usage(string proc)
{
    cout<<"Usage:\n\t"<<proc<<" local_port\n"<<endl;
}

// ./tcpserver port
int main(int argc,char *argv[])
{
    if(argc!=2)
    {
        Usage(argv[0]);
        return 1;
    }
    EnableScreen();
    uint16_t port=stoi(argv[1]);
    unique_ptr<TcpServer> tsvr=make_unique<TcpServer>(port);
    tsvr->InitServer();
    tsvr->Loop();

    return 0;
}

TcpServer.hpp文件即我们的服务端代码。 

四.   客户端模块代码实现

#include<iostream>
#include<string>
#include<unistd.h>
#include<cstring>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>

#include"Log.hpp"

using namespace std;

void Usage(string proc)
{
    cout<<"Usage:\n\t"<<proc<<" serverip serverport\n"<<endl;
}


// ./tcpclient serverip serverport
int main(int argc,char *argv[])
{
    if(argc!=3)
    {
        Usage(argv[0]);
        exit(1);
    }
    string serverip=argv[1];
    uint16_t serverport=stoi(argv[2]);

    int sockfd=::socket(AF_INET,SOCK_STREAM,0);
    if(sockfd<0)
    {
        cerr<<"socket error\n"<<endl;
        exit(2);
    }

    //与udpclient一样,不需显式bind

    //构建目标主机的socket信息
    struct sockaddr_in server;
    memset(&server,0,sizeof(server));
    server.sin_family=AF_INET;
    server.sin_port=htons(serverport);
    server.sin_addr.s_addr=inet_addr(serverip.c_str());

    int n=connect(sockfd,(struct sockaddr*)&server,sizeof(server));
    if(n<0)
    {
        cerr<<"connect error"<<endl;
        exit(3);
    }

    while(true)
    {
        cout<<"Please Enter# ";
        string outstring;
        getline(cin,outstring);

        ssize_t s=send(sockfd,outstring.c_str(),outstring.size(),0);//也可以用write
        if(s>0)
        {
            char inbuffer[1024];
            ssize_t m=recv(sockfd,inbuffer,sizeof(inbuffer)-1,0);//也可以用read
            if(m>0)
            {
                inbuffer[m]=0;
                cout<<inbuffer<<endl;
            }
            else
            {
                break;
            }
        }
        else
        {
            break;
        }
    }
    shutdown(sockfd,SHUT_WR);
    //close(sockfd);
    return 0;
}

前面两步依旧与UDP一样,都是创建出套接字,然后绑定结构体信息。由于TCP是有连接的,所以第三步需要调用connect接口进行连接。

下面先来讲讲connect接口:

int connect(int sockfd, const struct sockaddr *addr,socklen_t addrlen);

参数介绍: 

  • sockfd:即开始创建的套接字。
  • addr:指向sockaddr结构体。这个结构体的信息用接收到的客户端的信息填充(端口号、协议簇等等)
  • addrlen:前面addr指向的结构的长度取地址。

返回值介绍:

成功的话,会返回0;失败则会返回-1,错误码会被设置。

后续就是与UDP类似的了,先发出信息,然后接收服务端返回回来的信息。

五.   初始版本结果展示

首先,可以实现要求。但是既然我们说他是初始版本,那肯定说明还有更优的版本。我们来看看这个版本有什么缺陷。

当上升到多台客户端的时候,发现只有一个客户端能与服务器通信。

这就是我们这个版本的缺陷:一次只能处理一个请求。

为什么呢?因为是单进程的,所以服务器只能容许 一个客户端与其通信。

那为什么前面UDP单进程的时候,还是能实现多个客户端通信呢?

因为前面UDP只有一个sockfd,此处涉及多个sockfd。

所以自然而然的想到了第二个版本----采用多进程来实现服务端

六.   多进程版服务端

由于前面单进程版服务端的问题,所以来介绍多进程版的服务端。 

void Service(int sockfd,InetAddr client)
{
    LOG(DEBUG,"get a new link,info %s:%d,fd: %d\n",client.Ip().c_str(),client.Port(),sockfd);
    string clientaddr="["+client.Ip()+":"+to_string(client.Port())+"]# ";
    while(true)
    {
        char inbuffer[1024];
        //1. tcp面向字节流,不能保证inbuffer,是一个完整的命令字符串,例如"ls -a -l",可能只读到ls -a
        //所以对于tcp,我们必须保证在应用层面我们收到的是一个完整的请求
        ssize_t n=read(sockfd,inbuffer,sizeof(inbuffer)-1);//bug
        if(n>0)
        {
            inbuffer[n]=0;
            cout<<clientaddr<<inbuffer<<endl;

            string echo_string="[server echo]# ";
            echo_string+=inbuffer;

            write(sockfd,echo_string.c_str(),echo_string.size());
        }
        else if(n==0)
        {
            //client退出&&关闭连接了
            LOG(INFO,"%s quit\n",clientaddr.c_str());
            break;
        }
        else
        {
            LOG(ERROR,"read error\n",clientaddr.c_str());
            break;
        }
    }
    cout<<"server开始退出"<<endl;
    shutdown(sockfd,SHUT_RD);
    cout<<"shut _ rd"<<endl;
    //::close(sockfd);//不关闭会发生文件描述符泄露
}

void Loop()
{
    _isrunning=true;
    //4.不能直接接收数据,应该先获取连接
    while(true)
    {
        struct sockaddr_in peer;
        socklen_t len=sizeof(peer);
        int sockfd=::accept(_listensock,(struct sockaddr*)&peer,&len);
        //注意此处accept返回值也是一个文件描述符,要区分于_listensock
        if(sockfd<0)
        {
            LOG(WARNING,"accept error\n");//accept失败了,并不影响,再次accept即可
            continue;
        }

        pid_t id=fork();
        if(id==0)
        {
            //child:关心sockfd,不关心listensock
            ::close(_listensock);//不需要listensock,建议关掉,不影响父进程
            if(fork()>0) exit(0);
            Service(sockfd,InetAddr(peer));//孙子进程--孤儿进程--系统领养
            exit(0);

        }
        //father:关心listensock,不关心sockfd
        ::close(sockfd);//不需要sockfd,建议关掉
        waitpid(id,nullptr,0);
    }
    _isrunning=false;
}

 Service函数还是跟前面一样,重点是Loop函数

我们发现多进程用了两次fork函数创建子进程。我们来着重研究一下。

pid_t id=fork();
if(id==0)
{
    //child:关心sockfd,不关心listensock
    ::close(_listensock);//不需要listensock,建议关掉,不影响父进程
    if(fork()>0) exit(0);
    Service(sockfd,InetAddr(peer));//孙子进程--孤儿进程--系统领养
    exit(0);

}
//father:关心listensock,不关心sockfd
::close(sockfd);//不需要sockfd,建议关掉
waitpid(id,nullptr,0);

来看看最后的效果吧:

首先单客户端仍然能进行通信,那多个呢?

可以看到仍然能正常通信。

既然多进程可以实现,猜想一下多线程是不是也能实现呢?结果是显然的。

七.   多线程版服务端

struct ThreadData
{
public:
    ThreadData(int fd,InetAddr addr,TcpServer* s)
    :sockfd(fd)
    ,clientaddr(addr)
    ,self(s)
    {}
public:
    int sockfd;
    InetAddr clientaddr;
    TcpServer* self;
};

void Service(int sockfd,InetAddr client)
{
    LOG(DEBUG,"get a new link,info %s:%d,fd: %d\n",client.Ip().c_str(),client.Port(),sockfd);
    string clientaddr="["+client.Ip()+":"+to_string(client.Port())+"]# ";
    while(true)
    {
        char inbuffer[1024];
        //1. tcp面向字节流,不能保证inbuffer,是一个完整的命令字符串,例如"ls -a -l",可能只读到ls -a
        //所以对于tcp,我们必须保证在应用层面我们收到的是一个完整的请求
        ssize_t n=read(sockfd,inbuffer,sizeof(inbuffer)-1);//bug
        if(n>0)
        {
            inbuffer[n]=0;
            cout<<clientaddr<<inbuffer<<endl;

            string echo_string="[server echo]# ";
            echo_string+=inbuffer;

            write(sockfd,echo_string.c_str(),echo_string.size());
        }
        else if(n==0)
        {
            //client退出&&关闭连接了
            LOG(INFO,"%s quit\n",clientaddr.c_str());
            break;
        }
        else
        {
            LOG(ERROR,"read error\n",clientaddr.c_str());
            break;
        }
    }
    cout<<"server开始退出"<<endl;
    shutdown(sockfd,SHUT_RD);
    cout<<"shut _ rd"<<endl;
    //::close(sockfd);//不关闭会发生文件描述符泄露
}

static void* HandlerSock(void* args)
{
    pthread_detach(pthread_self());
    ThreadData* td=static_cast<ThreadData*>(args);
    td->self->Service(td->sockfd,td->clientaddr);
    delete td;
    return nullptr;
}

void Loop()
{
    _isrunning=true;
    //4.不能直接接收数据,应该先获取连接
    while(true)
    {
        struct sockaddr_in peer;
        socklen_t len=sizeof(peer);
        int sockfd=::accept(_listensock,(struct sockaddr*)&peer,&len);
        //注意此处accept返回值也是一个文件描述符,要区分于_listensock
        if(sockfd<0)
        {
            LOG(WARNING,"accept error\n");//accept失败了,并不影响,再次accept即可
            continue;
        }

        pthread_t t;
        ThreadData* td=new ThreadData(sockfd,InetAddr(peer),this);
        pthread_create(&t,nullptr,HandlerSock,td);//将线程分离,使主线程不用等待
    }
    _isrunning=false;
}

此处我们新封装了一个结构体ThreadData,方便处理。

其实大致思路还是不变,只是利用了线程分离,让创造出来的多线程去处理业务,让主线程不需等待。

来看看效果:

也是能实现预期效果的。

我们还可以利用线程池来实现业务处理。

八.   线程池版服务端

using task_t=function<void()>;


void Service(int sockfd,InetAddr client)
{
    LOG(DEBUG,"get a new link,info %s:%d,fd: %d\n",client.Ip().c_str(),client.Port(),sockfd);
    string clientaddr="["+client.Ip()+":"+to_string(client.Port())+"]# ";
    while(true)
    {
        char inbuffer[1024];
        //1. tcp面向字节流,不能保证inbuffer,是一个完整的命令字符串,例如"ls -a -l",可能只读到ls -a
        //所以对于tcp,我们必须保证在应用层面我们收到的是一个完整的请求
        ssize_t n=read(sockfd,inbuffer,sizeof(inbuffer)-1);//bug
        if(n>0)
        {
            inbuffer[n]=0;
            cout<<clientaddr<<inbuffer<<endl;

            string echo_string="[server echo]# ";
            echo_string+=inbuffer;

            write(sockfd,echo_string.c_str(),echo_string.size());
        }
        else if(n==0)
        {
            //client退出&&关闭连接了
            LOG(INFO,"%s quit\n",clientaddr.c_str());
            break;
        }
        else
        {
            LOG(ERROR,"read error\n",clientaddr.c_str());
            break;
        }
    }
    cout<<"server开始退出"<<endl;
    shutdown(sockfd,SHUT_RD);
    cout<<"shut _ rd"<<endl;
    //::close(sockfd);//不关闭会发生文件描述符泄露
}

void Loop()
{
    _isrunning=true;
    //4.不能直接接收数据,应该先获取连接
    while(true)
    {
        struct sockaddr_in peer;
        socklen_t len=sizeof(peer);
        int sockfd=::accept(_listensock,(struct sockaddr*)&peer,&len);
        //注意此处accept返回值也是一个文件描述符,要区分于_listensock
        if(sockfd<0)
        {
            LOG(WARNING,"accept error\n");//accept失败了,并不影响,再次accept即可
            continue;
        }

        task_t t=bind(&TcpServer::Service,this,sockfd,InetAddr(peer));
        ThreadPool<task_t>::GetInstance()->Enqueue(t);
    }
    _isrunning=false;
}

线程池代码为:

#pragma once

//单例模式的线程池
#include<iostream>
#include<vector>
#include<queue>
#include<pthread.h>
#include"Thread.hpp"
#include"Log.hpp"
#include"LockGuard.hpp"


using namespace std;
using namespace ThreadModule;

const static int gdefaultthreadnum=3;

template<typename T>
class ThreadPool
{
private:
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }

    void UnLockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }

    void ThreadSleep()
    {
        pthread_cond_wait(&_cond,&_mutex);
    }

    void ThreadWakeup()
    {
        pthread_cond_signal(&_cond);
    }

    void ThreadWakeAll()
    {
        pthread_cond_broadcast(&_cond);
    }

    //私有的
    ThreadPool(int threadnum=gdefaultthreadnum)
    :_threadnum(threadnum)
    ,_waitnum(0)
    ,_isrunning(false)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_cond,nullptr);
        LOG(INFO,"ThreadPool Construct()");
    }

    void Start()
    {
        for(auto& thread:_threads)
        {
            thread.Start();
        }
    }

    void HandlerTask(string name)//类的成员方法,也可以成为另一个类的回调方法,方便我们继续类级别的互相调用
    {
        LOG(INFO,"%s is running\n",name.c_str());
        while(true)
        {
            //1.保证队列安全
            LockQueue();
            //2.队列中不一定有数据
            while(_task_queue.empty() && _isrunning)
            {
                _waitnum++;
                ThreadSleep();
                _waitnum--;
            }
            //2.1 如果线程池已经退出了 && 任务队列是空的
            if(_task_queue.empty() && !_isrunning)
            {
                UnLockQueue();
                break;
            }
            //2.2 如果线程池不退出 && 任务队列不是空的
            //2.3 如果线程池已经退出 && 任务队列不是空的 --- 处理完所有的任务,然后再退出
            //3.一定有任务,处理任务
            T t=_task_queue.front();
            _task_queue.pop();
            UnLockQueue();

            LOG(DEBUG,"%s get a task",name.c_str());
            //4.处理任务,这个任务属于线程独占的任务,所以不能放在加锁和解锁之间
            t();

            //LOG(DEBUG,"%s handler a task,result is: %s",name.c_str(),t.ResultToString());
        }
    }

    void InitThreadPool()
    {
        //指向构建出所有的线程,并不自动
        for(int num=0;num<_threadnum;num++)
        {
            string name="thread-"+to_string(num+1);
            _threads.emplace_back(bind(&ThreadPool::HandlerTask,this,placeholders::_1),name);
            LOG(INFO,"init thread %s done\n",name.c_str());
        }
        _isrunning=true;
    }

    //复制拷贝禁用
    ThreadPool<T> &operator=(const ThreadPool<T>&)=delete;
    ThreadPool(const ThreadPool<T> &)=delete;
public:
    static ThreadPool<T> *GetInstance()
    {
        //如果是多线程获取线程池对象,下面的代码就有问题,所以要加锁
        //双判断的方式,可以有效减少获取单例的加锁成本,而且保证线程安全
        if(_instance==nullptr)//只有第一次会创建对象,后续都是获取,这样就不用每次都申请锁
        {//保证第二次之后,所有线程,不用再加锁,直接返回_instance单例对象
            LockGuard lockguard(&_lock);
            if (_instance == nullptr)
            {
                _instance = new ThreadPool<T>();
                _instance->InitThreadPool();
                _instance->Start();
                LOG(DEBUG, "创建线程池单例\n");
            }
            else
            {
                LOG(DEBUG, "获取线程池单例\n");
            }
        }

        return _instance;
    }

    bool Enqueue(const T& t)
    {
        bool ret=false;
        LockQueue();
        if(_isrunning)
        {   
            _task_queue.push(t);
            if(_waitnum>0)
            {
                ThreadWakeup();
            }
            LOG(DEBUG,"enqueue task success\n");
            ret=true;
        }
        UnLockQueue();
        return ret;
    }

    void Stop()
    {
        LockQueue();
        _isrunning=false;
        ThreadWakeAll();
        UnLockQueue();
    }

    void Wait()
    {
        for(auto& thread:_threads)
        {
            thread.Join();
            LOG(INFO,"%s is quit",thread.name().c_str());
        }
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
private:
    int _threadnum;
    vector<Thread> _threads;
    queue<T> _task_queue;
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;

    int _waitnum;
    bool _isrunning;

    //添加单例模式--懒汉
    static ThreadPool<T> *_instance;
    static pthread_mutex_t _lock;//保护单例的锁
};

template<typename T>
ThreadPool<T> *ThreadPool<T>::_instance=nullptr;

template<typename T>
pthread_mutex_t ThreadPool<T>::_lock=PTHREAD_MUTEX_INITIALIZER;

其中Thread.hpp文件是我们封装的原生线程库:

//封装原生线程库

#ifndef __THREAD_HPP__
#define __THREAD_HPP__

#include<iostream>
#include<string>
#include<functional>
#include<unistd.h>
#include<pthread.h>

using namespace std;

namespace ThreadModule
{
    using func_t=function<void(string&)>;
    class Thread
    {
    public:
        void Excute()
        {
            _func(_threadname);
        }

    public:
        Thread(func_t func,const string& name="none-name")
        :_func(func)
        ,_threadname(name)
        ,_stop(true)
        {}

        static void* threadroutine(void* args)//类成员函数,形参是有this指针的!
        {
            Thread *self=static_cast<Thread*>(args);
            self->Excute();
            return nullptr;
        }

        bool Start()
        {
            int n=pthread_create(&_tid,nullptr,threadroutine,this);
            if(!n)
            {
                _stop=false;
                return true;
            }
            else 
            {
                return false;
            }
        }

        void Detach()
        {
            if(!_stop)
            {
                pthread_detach(_tid);
            }
        }

        void Join()
        {
            if(!_stop)
            {
                pthread_join(_tid,nullptr);
            }
        }

        string name()
        {
            return _threadname;
        }

        void Stop()
        {
            _stop=true;
        }

        ~Thread(){}
    private:
        pthread_t _tid;
        string _threadname;
        func_t _func;
        bool _stop;
    };
}

#endif

LockGuard.hpp文件是仿照C++RAII(点此查看)思想封装的锁:

#ifndef __lock_GUARD_HPP__
#define __lock_GUARD_HPP__

#include<iostream>
#include<pthread.h>

class LockGuard
{
public:
    LockGuard(pthread_mutex_t* mutex)
    :_mutex(mutex)
    {
        pthread_mutex_lock(_mutex);//构造加锁
    }

    ~LockGuard()
    {
        pthread_mutex_unlock(_mutex);
    }
private:
    pthread_mutex_t* _mutex;
};


#endif

回归正题,线程池版服务端,就是通过线程池转发任务。

来看看效果:

也符合预期。


总结:

好了,到这里今天的知识就讲完了,大家有错误一点要在评论指出,我怕我一人搁这瞎bb,没人告诉我错误就寄了。

祝大家越来越好,不用关注我(疯狂暗示)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2104013.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

探索AWS EC2:云计算的强大引擎

在数字化转型的浪潮中&#xff0c;企业对计算资源的需求不断增长。亚马逊弹性计算云&#xff08;EC2&#xff09;作为AWS&#xff08;亚马逊网络服务&#xff09;的核心产品之一&#xff0c;凭借其强大的功能和灵活性&#xff0c;成为了全球企业构建和扩展应用的首选平台。无论…

K8S - 外部访问集群

前言 文档可以承接前面的内容看 这里只做外部访问的介绍 正文 kubectl get po#打印某个pod的环境变量 kubectl exec nginx-7c5ddbdf54-6nfw2 \-- printenv |grep KUBERNETES#删除 service kubectl get svc kubectl delete svc nginx#使用 LoadBalancer 的方式重新创建 servi…

Etherpad在线文档协作编辑工具

Etherpad在线文档协作编辑工具 一、前言 Etherpad是一种开源的实时协作编辑器&#xff0c;允许多个用户同时编辑同一文档&#xff0c;并实时显示每个用户的输入内容。Etherpad最初由Etherpad基金会开发&#xff0c;后来被Google收购&#xff0c;现在由Apache软件基金会维护。E…

ElasticSearch-集群架构

核心概念 节点类型分片集群搭建 ES安全认证 集群内部安全通信 生产环境常见集群部署方式 单一角色增加节点水平扩展读写分离架构异地多活架构Hot & Warm 架构集群容量规划 产品信息库搜索时间序列的数据 核心概念 ES集群架构的优势 提高系统的可用性&#xff0c;部分节点…

【QT】学习笔记:枚举桌面窗口句柄

在 Qt 中&#xff0c;虽然 Qt 本身没有直接提供枚举桌面窗口的 API&#xff0c;但可以通过调用 Windows API 来实现枚举桌面上所有窗口的句柄&#xff0c;包括子窗口以及子窗口与父窗口的关系。我们可以使用 Windows 的 EnumWindows 和 EnumChildWindows 函数来枚举所有顶层窗口…

C语言指针进阶三:(回调函数,qsort函数的模拟)

回调函数 回调函数就是通过函数指针调用的函数&#xff0c;如果你把函数的指针作为参数传递给另一个函数&#xff0c;当这个指针被用来调用其指向的函数时&#xff0c;我们所说这就是回调函数。 qsort函数的使用&#xff08;回调函数案例&#xff09; 我们先看看qsort函数的…

【单调栈 】2289. 使数组按非递减顺序排列

本文涉及的基础知识点 单调栈分类、封装和总结 LeetCode2289. 使数组按非递减顺序排列 给你一个下标从 0 开始的整数数组 nums 。在一步操作中&#xff0c;移除所有满足 nums[i - 1] > nums[i] 的 nums[i] &#xff0c;其中 0 < i < nums.length 。 重复执行步骤&a…

【重磅推荐】《一本书读懂大模型:技术创新、商业应用与产业变革》发布!大模型零基础入门到精通

近日&#xff0c;由中国电信研究院天翼智库大模型研究团队编写、中国电信集团科技委主任邵广禄倾情作序的**《一本书读懂大模型&#xff1a;技术创新、商业应用与产业变革》**正式出版。本书系统介绍了大模型技术的发展历程、核心技术、行业应用、产业体系、治理问题以及未来展…

DeFi 发展的岔路口,Pencils Protocol带领投资者们“向前看”

DeFi 市场是否还存在 Alpha 机会&#xff1f; 走下坡路的 DeFi 去中心化金融&#xff08;DeFi&#xff09;曾是区块链世界发展的起点&#xff0c;也是链上世界流动性的重要支柱。然而&#xff0c;自 2021 年 DeFi 领域的总锁仓量&#xff08;TVL&#xff09;达到历史巅峰——…

模型压缩之剪枝

&#xff08;1&#xff09;通道选择 这里要先解释一下&#xff1a; &#xff08;1&#xff09;通道剪枝 那我们实际做法不是上面直接对所有层都添加L1正则项&#xff0c;而是仅仅对BN层权重添加L1正则项。通道剪枝具体步骤如下&#xff1a; 1.BN层权重添加L1正则项&#xf…

ElementUI实现el-table组件的合并行功能

前言 有时遇到一些需求&#xff0c;需要实现ElementUI中&#xff0c;el-tabled组件合并单元格的功能&#xff0c;稍微了解一下它的数据格式&#xff0c;不难可以写出比合并方法。但是在鼠标经过单元行时&#xff0c;会出现高亮的行与鼠标经过的行不一致的BUG。因此还需要实现c…

超级右键 - 为 Mac 的右键菜单升级一下

是不是有很多小伙伴&#xff0c;希望 Mac 也能像 Windows 一样&#xff0c;拥有丰富的右键菜单&#xff0c;快速完成新建、剪切、发送文件等操作。 一个叫作超级右键的工具就能做到&#xff0c;它能为 Mac 右键菜单增添多个功能选项&#xff0c;如 Win 系统般一键新建 / 剪切文…

vue通过html2canvas+jspdf生成PDF问题全解(水印,分页,截断,多页,黑屏,空白,附源码)

前端导出PDF的方法不多&#xff0c;常见的就是利用canvas画布渲染&#xff0c;再结合jspdf导出PDF文件&#xff0c;代码也不复杂&#xff0c;网上的代码基本都可以拿来即用。 如果不是特别追求完美的情况下&#xff0c;或者导出PDF内容单页的话&#xff0c;那么基本上也就满足业…

我的大模型岗位面试总结!太卷了!!!—我面试了24家大模型岗位 只拿了9个offer!

这段时间面试了很多家&#xff08;共24家&#xff0c;9个offer&#xff0c;简历拒了4家&#xff0c;剩下是面试后拒的&#xff09;&#xff0c;也学到了超级多东西。 大模型这方向真的卷&#xff0c;面试时好多新模型&#xff0c;新paper疯狂出&#xff0c;东西出的比我读的快…

传统CV算法——基于opencv的答题卡识别判卷系统

基于OpenCV的答题卡识别系统&#xff0c;其主要功能是自动读取并评分答题卡上的选择题答案。系统通过图像处理和计算机视觉技术&#xff0c;自动化地完成了从读取图像到输出成绩的整个流程。下面是该系统的主要步骤和实现细节的概述&#xff1a; 1. 导入必要的库 系统首先导入…

误删的PPT怎么恢复回来?

在日常工作和学习中&#xff0c;PPT已成为我们不可或缺的工具。然而&#xff0c;有时不小心误删重要的PPT文件&#xff0c;可能会让人倍感焦虑。别担心&#xff0c;本文将为你提供几种实用的方法&#xff0c;帮助你轻松恢复误删的PPT文件。 一、从回收站恢复 当你误删文件时&…

【Grafana】Prometheus结合Grafana打造智能监控可视化平台

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

香港一带一路研究院国际事务研究中心副主任陈景才阐述香港在一带一路建设及区块链金融领域的关键作用

2024年8月28日&#xff0c;香港金管局举行Ensemble项目沙盒&#xff08;以下简称沙盒&#xff09;启动仪式&#xff0c;并宣布首阶段试验将涵盖四大代币化资产用例主题&#xff0c;标志着金融业在代币化技术的实际应用进程中迈出重要一步。香港一带一路研究院国际事务研究中心副…

解剖学上合理的分割:通过先验变形显式保持拓扑结构|文献速递--基于深度学习的医学影像病灶分割

Title 题目 Anatomically plausible segmentations: Explicitly preserving topology through prior deformations 解剖学上合理的分割&#xff1a;通过先验变形显式保持拓扑结构 01 文献速递介绍 进行环向应变或壁厚度的计算&#xff0c;这些测量通常用于诊断肥厚性心肌病…

IDEA 安装lombok插件不兼容的问题及解决方法

解决&#xff1a;IDEA 安装lombok插件不兼容问题&#xff0c;plugin xxxx is incompatible 一、去官网下载最新的2024版本 地址传送通道&#xff1a; lombok插件官网地址https://plugins.jetbrains.com/plugin/6317-lombok/versions/stable 二、修改参数的配置 在压缩包路径…