简单的TCP网络程序·线程池(后端服务器)

news2024/11/25 17:33:00

目录

版本四:线程池

注意事项

文件:Task.hpp -- 任务单独为一个文件

组件:日志修改

新函数:vprintf()

可变参数的提取逻辑

vfprintf()的工作原理

初始化一个va_list

日志准备

获取时间小知识

日志初版

日志启动测试

TCP通用服务器(守护进程) *

新指令1:jobs -- 查看进程作业

新指令2:fg -- foreground(前台)

新指令3:bg(Ctrl + Z) -- background(后台)

创建新的会话

新接口2:daemon() -- 选用(本文不用)

新接口3:setsid() *

小细节1:null文件 -- 任何请求都可以被接收,但是都会被丢弃

接口1:open() -- 打开文件

接口2:dup2 -- 重定向

接口3:chdir() -- 进程执行路径更改(选填)

文件:daemon.hpp(守护进程)

测试:守护进程

测试服务器的正常回显

日志修改:日志写入文件中保存

全部代码文件

daemon.hpp -- 守护进程

log.hpp -- 日志文件

makefile

tcpClient.cc -- 客户端1

tcpClient.hpp -- 客户端2

tcpServer.cc -- 服务端1

tcpServer.hpp -- 服务端2

Task.hpp -- 形成任务 

Thread.hpp -- 线程池1

ThreadPool.hpp -- 线程池2

LockGuard.hpp -- 线程池(加锁部分)


接上文:

版本四:线程池

        至此,多线程与多进程的版本完成,但是由于无论是线程的创建还是进程的创建,都是事情到来的时候(链接到来的时候)才创建任务,所以会存在一个频繁创建的问题。还有一个问题,就是客户有多少,就需要多少个进程或者线程,那一个服务器的效率就不会很高,应付几十、几百可以,但是一但多一点就不行了,为此这里就引入一个基于管道式的线程池的组件来实现第四个版本

        一个主线程打开的文件,新线程是可以看到的,也就是说线程池对于资源共享有天然的优势,于是可以把新链接构成一个新任务,把任务传递给线程池来统一执行

        注意这里就直接引入一个线程池组件进行使用,就不再讲解了

        关于线程池的介绍可以参考文章 -- 待更新

        这里的线程池是一个单例模式

注意事项

        注意线程池的版本对于代码是有整体修改的,所以下面的代码会进行一定程度的整理和修改

        因为这里的serviceIO服务和类里面是没有任何关系的,所以代码可以直接拿出去,将其放入Task.hpp文件中去

文件:Task.hpp -- 任务单独为一个文件

#pragma once

#include <iostream>
#include <string>
#include <cstdio>
#include <functional>

// 因为这里的服务和类里面是没有任何关系的,所以代码可以直接拿出去
void serviceIO(int sock)
{
    char buffer[1024];
    while (true)
    {
        ssize_t n = read(sock, buffer, sizeof(buffer) - 1);
        if (n > 0)
        {
            // 目前我们把读到的数据当成字符串, 截止目前
            buffer[n] = 0;
            std::cout << "recv message: " << buffer << std::endl;

            std::string outbuffer = buffer;
            outbuffer += " server[echo]";

            write(sock, outbuffer.c_str(), outbuffer.size()); // 多路转接
        }
        else if (n == 0)
        {
            // 代表client退出
            logMessage(NORMAL, "client quit, me too!");
            break;
        }
    }
    close(sock);
}

class Task
{
    using func_t = std::function<void(int)>;

public:
    Task()
    {
    }
    Task(int sock, func_t func)
        : _sock(sock), _callback(func)
    {
    }
    void operator()()
    {
        _callback(_sock);
    }

private:
    int _sock;
    func_t _callback;
};

组件:日志修改

新函数:vprintf()

头文件:<stdarg.h>

首先使用函数前要搞明白可变参数列表

可变参数

提出问题:实现下面的写法

logMessage(NORMAL, "create socket success: %d", _listensock);

为了至此这种可变参数,需要几个宏

va_list

参考文献:va_list_百度百科

可变参数的提取逻辑

关于可变参数的提取模拟网上也大有代码可以提供参考,这里就不再赘述了

vfprintf()的工作原理

        vfprintf()会直接向文件中写入,为了后序显示打印结果,这里直接使用vsnfprintf(),工作原理大差不差

初始化一个va_list

日志准备

获取时间小知识

获取时间的方式有很多种比如:

日志初版

#pragma once

#include <iostream>
#include <string>
#include <stdarg.h>
#include <ctime>
#include <unistd.h>

// 定义五种不同的信息
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3 // 一种不影响服务器的错误
#define FATAL 4 // 致命错误

const char *to_levelstr(int level)
{
    switch (level)  // 这里直接return了
    {
    case DEBUG:
        return "DEBUG";
    case NORMAL:
        return "NORMAL";
    case WARNING:
        return "WARNING";
    case ERROR:
        return "ERROR";
    case FATAL:
        return "FATAL";
    default:
        return nullptr;
    }
}

// voidlogMessage(DEBUG, "hello %f, %d, %c", 3.14, 10, 'C');
void logMessage(int level, const char *format, ...)
{
    // 格式如下
    // [日志等级] [时间戳/时间] [pid] [message]
    // [FATAL0] [2023-06-11 16:46:07] [123] [创建套接字失败]

    // 可变参数的提取逻辑
    /*     va_list start;
        va_start(start);
        while (*p)
        {
            switch (*p)
            {
            case '%':
                p++;
                if (*p == 'f')
                    arg = va_arg(start, float);
                ...
            }
        }
        va_end(start); */

#define NUM 1024
    // 获取前缀信息[日志等级] [时间戳/时间] [pid]
    char logprefix[NUM];
    snprintf(logprefix, sizeof(logprefix), "[%s][%ld][pid: %d]",
             to_levelstr(level), (long int)time(nullptr), getpid());

    // 获取内容
    char logcontent[NUM];
    va_list arg;
    va_start(arg, logcontent); // 因为压栈是反过来的,所以直接使用左边那个参数就行了

    vsnprintf(logcontent, sizeof(logcontent), format, arg); // 第三个参数是格式, 第四个就是初始化好的可变参数

    std::cout << logprefix << logcontent << std::endl;
}

日志启动测试

这里就像一个日志的格式的样子了,稍后将其写入文件中

TCP通用服务器(守护进程) *
 

        这时候一个基本的TCP服务器已经完成,但是有一个很糟糕的显现会出现,我这里是使用Xshell来进行远程链接的,但是一旦关闭Xshell窗口的时候,服务器就自动退出了,显然实际中的服务器是不能这样的,于是我们需要引入守护进程(精灵进程)的概念,利用这个方法去解决这个问题,将它守护进程化

新指令1:jobs -- 查看进程作业

新指令2:fg -- foreground(前台)

新指令3:bg(Ctrl + Z) -- background(后台)

有且只有一个前台任务!

创建新的会话

当Xshell退出的时候,会话窗口就会退出,那么任务就会被自动清理,所以需要创建新的会话

新接口2:daemon() -- 选用(本文不用)

Linux里面提供了创建守护进程的方式,但是下面就模拟实现一个daemon,来方便学习

为了完成守护进程我们满足下面四点

#pragma once

#include <unistd.h>

void daemonSelf()
{
    // 1. 让调用进程忽略异常的信号 -- 否则一些进程碰到就直接挂了

    // 2. 如何让自己不是组长, setsid

    // 3. 守护进程是脱离终端的,关闭或者重定向以前进程默认打开的文件

    // 4. 可选:进程执行路径发生更改

}

新接口3:setsid() *

小细节1:null文件 -- 任何请求都可以被接收,但是都会被丢弃

        守护进程是脱离终端的,关闭或者重定向以前进程默认打开的文件,因为 0 1 2 这几个文件已经被打开使用了,是不能用的,且不能关闭 0 1 2 这几个文件 -- 一旦出现一个打印,就相当于向一个不存在的文件里面写入了,立马报错,然后崩溃,所以我们就可以利用一个特殊的文件中写入 -- null

接口1:open() -- 打开文件

返回值

open函数的返回值如果操作成功,它将返回一个文件描述符,如果操作失败,它将返回-1。

参数含义:

1、pathname:

在open函数中第一个参数pathname是指向想要打开的文件路径名,或者文件名。我们需要注意的是,这个路径名是绝对路径名。文件名则是在当前路径下的。

2、flags:

flags参数表示打开文件所采用的操作,我们需要注意的是:必须指定以下三个常量的一种,且只允许指定一个

  • O_RDONLY:只读模式
  • O_WRONLY:只写模式
  • O_RDWR:可读可写

参考文献:linux open函数详解_open 函数具体做了什么

接口2:dup2 -- 重定向

接口3:chdir() -- 进程执行路径更改(选填)

文件:daemon.hpp(守护进程)

#pragma once

#include <unistd.h>
#include <signal.h>
#include <cstdlib>
#include <cassert>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#define DEV "/dev/null"

void daemonSelf(const char *currPath = nullptr)
{
    // 1. 让调用进程忽略异常的信号 -- 否则一些进程碰到就直接挂了
    signal(SIGPIPE, SIG_IGN); // 意思是说,如果客户端提前关闭了,服务器还在写入就会导致错误写入导致服务器崩掉

    // 2. 如何让自己不是组长, setsid
    if (fork() > 0)
        exit(0);
    // 子进程 -- 守护进程(精灵进程):本质就是孤儿进程的一种
    pid_t n = setsid();
    assert(n != -1);

    // 3. 守护进程是脱离终端的,关闭或者重定向以前进程默认打开的文件
    // 因为 0 1 2 这几个文件已经被打开使用了,是不能用的
    // 且不能关闭 0 1 2 这几个文件 -- 一旦出现一个打印,就相当于向一个不存在的文件里面写入了,立马报错,然后崩溃
    // 所以我们就可以利用一个特殊的文件中写入 -- null
    // 这样重定向到nuill中,这样进程的写入和读取就不会报错了

    int fd = open(DEV, O_RDWR);
    if(fd >= 0)
    {
        dup2(fd, 0);    // 本来从 0 1 2 中读,现在从fd中读,也就是dev/null 中去读
        dup2(fd, 1);
        dup2(fd, 2);

        close(fd);
    }
    else
    {
        close(0);
        close(1);
        close(2);
    }


    // 4. 可选:进程执行路径发生更改

    if(currPath) chdir(currPath); // 如果currPath被设置,这里就进行更改
}

测试:守护进程

测试服务器的正常回显

日志修改:日志写入文件中保存

全部代码文件

daemon.hpp -- 守护进程

#pragma once

#include <unistd.h>
#include <signal.h>
#include <cstdlib>
#include <cassert>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#define DEV "/dev/null"

void daemonSelf(const char *currPath = nullptr)
{
    // 1. 让调用进程忽略异常的信号 -- 否则一些进程碰到就直接挂了
    signal(SIGPIPE, SIG_IGN); // 意思是说,如果客户端提前关闭了,服务器还在写入就会导致错误写入导致服务器崩掉

    // 2. 如何让自己不是组长, setsid
    if (fork() > 0)
        exit(0);
    // 子进程 -- 守护进程(精灵进程):本质就是孤儿进程的一种
    pid_t n = setsid(); // 走到这里,就已经是一个 画手进程,组长进程了,这时基于第二步产生的结果
    assert(n != -1);

    // 3. 守护进程是脱离终端的,关闭或者重定向以前进程默认打开的文件
    // 因为 0 1 2 这几个文件已经被打开使用了,是不能用的
    // 且不能关闭 0 1 2 这几个文件 -- 一旦出现一个打印,就相当于向一个不存在的文件里面写入了,立马报错,然后崩溃
    // 所以我们就可以利用一个特殊的文件中写入 -- null
    // 这样重定向到nuill中,这样进程的写入和读取就不会报错了

    int fd = open(DEV, O_RDWR);
    if(fd >= 0)
    {
        dup2(fd, 0);    // 本来从 0 1 2 中读,现在从fd中读,也就是dev/null 中去读
        dup2(fd, 1);
        dup2(fd, 2);

        close(fd);
    }
    else
    {
        close(0);
        close(1);
        close(2);
    }


    // 4. 可选:进程执行路径发生更改

    if(currPath) chdir(currPath); // 如果currPath被设置,这里就进行更改
}

log.hpp -- 日志文件

#pragma once

#include <iostream>
#include <string>
#include <stdarg.h>
#include <ctime>
#include <unistd.h>


#define LOG_NORMAL "log.txt"    // 前三个放入这里,后两个信息放入下面的文件中去
#define LOG_ERR "log.error"

// 定义五种不同的信息
#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3 // 一种不影响服务器的错误
#define FATAL   4 // 致命错误

const char *to_levelstr(int level)
{
    switch (level)  // 这里直接return了
    {
    case DEBUG:
        return "DEBUG";
    case NORMAL:
        return "NORMAL";
    case WARNING:
        return "WARNING";
    case ERROR:
        return "ERROR";
    case FATAL:
        return "FATAL";
    default:
        return nullptr;
    }
}

// voidlogMessage(DEBUG, "hello %f, %d, %c", 3.14, 10, 'C');
void logMessage(int level, const char *format, ...)
{
    // 格式如下
    // [日志等级] [时间戳/时间] [pid] [message]
    // [FATAL0] [2023-06-11 16:46:07] [123] [创建套接字失败]

    // 可变参数的提取逻辑
    /*     va_list start;
        va_start(start);
        while (*p)
        {
            switch (*p)
            {
            case '%':
                p++;
                if (*p == 'f')
                    arg = va_arg(start, float);
                ...
            }
        }
        va_end(start); */

#define NUM 1024
    // 获取前缀信息[日志等级] [时间戳/时间] [pid]
    char logprefix[NUM];
    snprintf(logprefix, sizeof(logprefix), "[%s][%ld][pid: %d]",
             to_levelstr(level), (long int)time(nullptr), getpid());

    // 获取内容
    char logcontent[NUM];
    va_list arg;
    va_start(arg, format); // 因为压栈是反过来的,所以直接使用左边那个参数就行了

    vsnprintf(logcontent, sizeof(logcontent), format, arg); // 第三个参数是格式, 第四个就是初始化好的可变参数

    // std::cout << logprefix << logcontent << std::endl;

    FILE *log = fopen(LOG_NORMAL, "a"); // 追加式写入
    FILE *err = fopen(LOG_ERR, "a");
    if(log != nullptr && err != nullptr)
    {
        FILE *curr = nullptr;
        if(level == DEBUG || level == NORMAL || level == WARNING) curr = log;
        if(level == ERROR || level == FATAL) curr = err;
        if(curr) fprintf(curr, "%s%s\n", logprefix, logcontent);

        fclose(log);
        fclose(err);
    }

}

makefile

cc=g++
.PHONY:all
all:tcpserver tcpclient

tcpclient:tcpClient.cc
	$(cc) -o $@ $^ -std=c++11

tcpserver:tcpServer.cc
	$(cc) -o $@ $^ -std=c++11 -lpthread
 
.PHONY:clean
clean:
	rm -f tcpserver tcpclient

tcpClient.cc -- 客户端1

#include "tcpClient.hpp"
#include <memory>

using namespace std;

static void Usage(string proc)
{
    cout << "Usage:\n\t" << proc << " serverip serverport\n\n"; // 命令提示符
}

// ./tcpclient serverip serverport  调用逻辑
int main(int argc, char *argv[])
{
    if(argc != 3)
    {
        Usage(argv[0]);
        exit(1);
    }
    string serverip = argv[1];
    uint16_t serverport = atoi(argv[2]);

    unique_ptr<TcpClient> tcli(new TcpClient(serverip, serverport));
    tcli->initClient();
    tcli->start();

    return 0;
}

tcpClient.hpp -- 客户端2

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>

#define NUM 1024

class TcpClient
{
public:
    TcpClient(const std::string &serverip, const uint16_t &port)
        : _sock(1), _serverip(serverip), _serverport(port)
    {
    }
    void initClient()
    {
        // 1. 创建socket
        _sock = socket(AF_INET, SOCK_STREAM, 0);
        if (_sock < 0)
        {
            // 客户端也可以有日志,不过这里就不再实现了,直接打印错误
            std::cout << "socket create error" << std::endl;
            exit(2);
        }

        // 2. tcp的客户端要不要bind? 要的! 但是不需要显示bind,这里的client port要让OS自定!
        // 3. 要不要listen? -- 不需要!客户端不需要建立链接
        // 4. 要不要accept? -- 不要!
        // 5. 要什么? 要发起链接!
    }

    void start()
    {
        struct sockaddr_in server;
        memset(&server, 0, sizeof(server));
        server.sin_family = AF_INET;
        server.sin_port = htons(_serverport);
        server.sin_addr.s_addr = inet_addr(_serverip.c_str());

        if (connect(_sock, (struct sockaddr *)&server, sizeof(server)) != 0)
        {
            std::cerr << "socket connect error" << std::endl;
        }
        else
        {
            std::string msg;
            while (true)
            {
                std::cout << "Enter# ";
                std::getline(std::cin, msg);
                write(_sock, msg.c_str(), msg.size());

                char buffer[NUM];
                int n = read(_sock, buffer, sizeof(buffer) - 1);
                if (n > 0)
                {
                    // 目前我们把读到的数据当成字符串, 截至目前
                    buffer[n] = 0;
                    std::cout << "Server回显# " << buffer << std::endl;
                }
                else
                {
                    break;
                }
            }
        }
    }
    ~TcpClient()
    {
        if(_sock >= 0) close(_sock);    //不写也行,因为文件描述符的生命周期随进程,所以进程退了,自然也就会自动回收了
    }

private:
    int _sock;
    std::string _serverip;
    uint16_t _serverport;
};

tcpServer.cc -- 服务端1

#include "tcpServer.hpp"
#include "daemon.hpp"
#include <memory>

using namespace server;
using namespace std;

static void Usage(string proc)
{
    cout << "Usage:\n\t" << proc << " local_port\n\n"; // 命令提示符
}

// tcp服务器,启动上和udp server一模一样
// ./tcpserver local_port
int main(int argc, char *argv[])
{ 
    if (argc != 2)
    {
        Usage(argv[0]);
        exit(USAGE_ERR);
    }
    // 测试 守护进程
    uint16_t port = atoi(argv[1]);
    
    unique_ptr<TcpServer> tsvr(new TcpServer(port));
    tsvr->initServer();

    daemonSelf();
    tsvr->start();

  

    return 0;
}

tcpServer.hpp -- 服务端2

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <signal.h>
#include <pthread.h>

#include "log.hpp"
#include "ThreadPool.hpp"
#include "Task.hpp"

namespace server
{
    enum
    {
        USAGE_ERR = 1,
        SOCKET_ERR,
        BIND_ERR,
        LISTEN_ERR

    };

    static const uint16_t gport = 8080;
    static const int gbacklog = 5; // 10、20、50都可以,但是不要太大比如5千,5万
    class TcpServer;               // 声明

    // 用以线程传参
    class ThreadData
    {
    public:
        ThreadData(TcpServer *self, int sock) : _self(self), _sock(sock)
        {
        }

    public:
        TcpServer *_self;
        int _sock;
    };

    class TcpServer
    {
    public:
        TcpServer(const uint16_t &port = gport) : _listensock(-1), _port(port)
        {
        }
        void initServer()
        {
            // 1. 创建socket文件套接字对象 -- 流式套接字
            _listensock = socket(AF_INET, SOCK_STREAM, 0); // 第三个参数默认 0
            if (_listensock < 0)
            {
                logMessage(FATAL, "create socket error");
                exit(SOCKET_ERR);
            }
            logMessage(NORMAL, "create socket success: %d", _listensock);

            // 2.bind绑定自己的网路信息 -- 注意包含头文件
            struct sockaddr_in local;
            memset(&local, 0, sizeof(local));
            local.sin_family = AF_INET;
            local.sin_port = htons(_port);      // 这里有个细节,我们会发现当我们接受数据的时候是不需要主机转网路序列的,因为关于IO类的接口,内部都帮我们实现了这一功能,这里不帮我们做是因为我们传入的是一个结构体,系统做不到
            local.sin_addr.s_addr = INADDR_ANY; // 接受任意ip地址
            if (bind(_listensock, (struct sockaddr *)&local, sizeof(local)) < 0)
            {
                logMessage(FATAL, "bind socket error");
                exit(BIND_ERR);
            }
            logMessage(NORMAL, "bind socket success");

            // 3. 设置socket 为监听状态 -- TCP与UDP不同,它先要建立链接之后,TCP是面向链接的,后面还会有“握手”过程
            if (listen(_listensock, gbacklog) < 0) // 第二个参数backlog后面再填这个坑
            {
                logMessage(FATAL, "listen socket error");
                exit(LISTEN_ERR);
            }
            logMessage(NORMAL, "listen socket success");
        }

        void start()
        {
            // version 4. 线程池初始化
            ThreadPool<Task>::getInstance()->run(); // 让它跑起来
            logMessage(NORMAL, "Thread init success");

            for (;;) // 一个死循环
            {
                // 4. server 获取新链接
                // sock 和client 进行通信的fd
                struct sockaddr_in peer;
                socklen_t len = sizeof(peer);
                int sock = accept(_listensock, (struct sockaddr *)&peer, &len);
                if (sock < 0)
                {
                    logMessage(ERROR, "accept error, next"); // 这个不影响服务器的运行,用ERROR,就像张三不会因为没有把人招呼进来就不干了
                    continue;
                }
                logMessage(NORMAL, "accept a new link success, get new sock: %d", sock);    // 因为支持可变参数了
                // 日志测试
                logMessage(DEBUG, "accept error, next");
                logMessage(WARNING, "accept error, next");
                logMessage(FATAL, "accept error, next");
                logMessage(NORMAL, "accept error, next");

                logMessage(NORMAL, "accept a new link success, get new sock: %d", sock); // ?
                logMessage(DEBUG, "accept error, next");
                logMessage(WARNING, "accept error, next");
                logMessage(FATAL, "accept error, next");
                logMessage(NORMAL, "accept error, next");

                logMessage(NORMAL, "accept a new link success, get new sock: %d", sock); // ?
                logMessage(DEBUG, "accept error, next");
                logMessage(WARNING, "accept error, next");
                logMessage(FATAL, "accept error, next");
                logMessage(NORMAL, "accept error, next");


                // version 4 线程池 -- 默认启动10个
                ThreadPool<Task>::getInstance()->push(Task(sock, serviceIO));
            }
        }

        ~TcpServer() {}

    private:
        int _listensock; // 修改二:改为listensock 不是用来进行数据通信的,它是用来监听链接到来,获取新链接的!
        uint16_t _port;
    };

} // namespace server

Task.hpp -- 形成任务 

​
#pragma once

#include <iostream>
#include <string>
#include <cstdio>
#include <functional>

// 因为这里的服务和类里面是没有任何关系的,所以代码可以直接拿出去
void serviceIO(int sock)
{
    char buffer[1024];
    while (true)    // 其实这里的任务是不适合线程池处理的,一个任务来了就会拿走一个线程,所以线程池处理的应该是一个很快就可以结束的服务
    {
        ssize_t n = read(sock, buffer, sizeof(buffer) - 1);
        if (n > 0)
        {
            // 目前我们把读到的数据当成字符串, 截止目前
            buffer[n] = 0;
            std::cout << "recv message: " << buffer << std::endl;

            std::string outbuffer = buffer;
            outbuffer += " server[echo]";

            write(sock, outbuffer.c_str(), outbuffer.size()); // 多路转接
        }
        else if (n == 0)
        {
            // 代表client退出
            logMessage(NORMAL, "client quit, me too!");
            break;
        }
    }
    close(sock);    // 在内部关闭就行了
}

class Task
{
    using func_t = std::function<void(int)>;

public:
    Task()
    {
    }
    Task(int sock, func_t func)
        : _sock(sock), _callback(func)
    {
    }
    void operator()()
    {
        _callback(_sock);
    }

private:
    int _sock;
    func_t _callback;
};

​

Thread.hpp -- 线程池1

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <cassert>
#include <functional>
#include <pthread.h>

namespace ThreadNs
{
    typedef std::function<void *(void *)> func_t;
    const int num = 1024;

    class Thread
    {
    private:
        // 在类内创建线程,想让线程执行对应的方法,需要将方法设置成为static
        static void *start_routine(void *args) // 类内成员,有缺省参数!
        {
            Thread *_this = static_cast<Thread *>(args);
            return _this->callback();
        }
    public:
        Thread()
        {
            char namebuffer[num];
            snprintf(namebuffer, sizeof namebuffer, "thread-%d", threadnum++);
            name_ = namebuffer;
        }

        void start(func_t func, void *args = nullptr)
        {
            func_ = func;
            args_ = args;
            int n = pthread_create(&tid_, nullptr, start_routine, this); // TODO
            assert(n == 0);                                            
            (void)n;
        }

        void join()
        {
            int n = pthread_join(tid_, nullptr);
            assert(n == 0);
            (void)n;
        }

        std::string threadname()
        {
            return name_;
        }

        ~Thread()
        {
            // do nothing
        }
        void *callback() { return func_(args_);}
    private:
        std::string name_;
        func_t func_;
        void *args_;
        pthread_t tid_;

        static int threadnum;
    };
    int Thread::threadnum = 1;
} // end namespace ThreadNs

ThreadPool.hpp -- 线程池2

#pragma once

#include "Thread.hpp"
#include "LockGuard.hpp"
#include "log.hpp"
#include <vector>
#include <queue>
#include <mutex>
#include <pthread.h>
#include <unistd.h>

using namespace ThreadNs;

const int gnum = 10;

template <class T>
class ThreadPool;

template <class T>
class ThreadData
{
public:
    ThreadPool<T> *threadpool;
    std::string name;

public:
    ThreadData(ThreadPool<T> *tp, const std::string &n) : threadpool(tp), name(n)
    {
    }
};

template <class T>
class ThreadPool
{
private:
    static void *handlerTask(void *args)
    {
        ThreadData<T> *td = (ThreadData<T> *)args;
        while (true)
        {
            T t;
            {
                LockGuard lockguard(td->threadpool->mutex());
                while (td->threadpool->isQueueEmpty())
                {
                    td->threadpool->threadWait();
                }
                t = td->threadpool->pop(); // pop的本质,是将任务从公共队列中,拿到当前线程自己独立的栈中
            }
            t();
        }
        delete td;
        return nullptr;
    }

    ThreadPool(const int &num = gnum) : _num(num)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
        for (int i = 0; i < _num; i++)
        {
            _threads.push_back(new Thread());
        }
    }

    void operator=(const ThreadPool &) = delete;
    ThreadPool(const ThreadPool &) = delete;

public:
    void lockQueue() { pthread_mutex_lock(&_mutex); }
    void unlockQueue() { pthread_mutex_unlock(&_mutex); }
    bool isQueueEmpty() { return _task_queue.empty(); }
    void threadWait() { pthread_cond_wait(&_cond, &_mutex); }
    T pop()
    {
        T t = _task_queue.front();
        _task_queue.pop();
        return t;
    }
    pthread_mutex_t *mutex()
    {
        return &_mutex;
    }

public:
    void run()
    {
        for (const auto &t : _threads)
        {
            ThreadData<T> *td = new ThreadData<T>(this, t->threadname());
            t->start(handlerTask, td);
            //std::cout << t->threadname() << " start ..." << std::endl;
            logMessage(DEBUG, "%s start ...", t->threadname().c_str());
        }
    }
    void push(const T &in)
    {
        LockGuard lockguard(&_mutex);
        _task_queue.push(in);
        pthread_cond_signal(&_cond);
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
        for (const auto &t : _threads)
            delete t;
    }

    static ThreadPool<T> *getInstance()
    {
        if (nullptr == tp)
        {
            _singlock.lock();
            if (nullptr == tp)
            {
                tp = new ThreadPool<T>();
            }
            _singlock.unlock();
        }
        return tp;
    }

private:
    int _num;
    std::vector<Thread *> _threads;
    std::queue<T> _task_queue;
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;

    static ThreadPool<T> *tp;
    static std::mutex _singlock;
};

template <class T>
ThreadPool<T> *ThreadPool<T>::tp = nullptr;

template <class T>
std::mutex ThreadPool<T>::_singlock;

LockGuard.hpp -- 线程池(加锁部分)

​
#pragma once

#include <iostream>
#include <pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t *lock_p = nullptr): lock_p_(lock_p)
    {}
    void lock()
    {
        if(lock_p_) pthread_mutex_lock(lock_p_);
    }
    void unlock()
    {
        if(lock_p_) pthread_mutex_unlock(lock_p_);
    }
    ~Mutex()
    {}
private:
    pthread_mutex_t *lock_p_;
};

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *mutex): mutex_(mutex)
    {
        mutex_.lock(); //在构造函数中进行加锁
    }
    ~LockGuard()
    {
        mutex_.unlock(); //在析构函数中进行解锁
    }
private:
    Mutex mutex_;
};

​

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/647877.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DAY23:二叉树(十三)二叉树的最近公共祖先+二叉搜索树的最近公共祖先

文章目录 236.二叉树的最近公共祖先思路完整版后序遍历的进一步理解为什么左为空右不为空的时候return right这个逻辑是否包含p/q本身就是公共祖先的情况 235.二叉搜索树的最近公共祖先思路关于遍历顺序 递归法最开始的写法debug测试修改版 迭代法最开始的写法为什么最开始这种…

chatgpt赋能python:Python收费介绍

Python收费介绍 什么是Python? Python是一种高级的、解释性、面向对象、纯粹的动态语言&#xff0c;多用于快速应用程序开发、脚本编写、系统管理任务等。它有一个简单直观优美的语法&#xff0c;非常容易学习。 Python的收费形式 Python语言本身是免费的&#xff0c;任何…

chatgpt赋能python:Python如何操作Word文档

Python如何操作Word文档 简介 Python是一种高级编程语言&#xff0c;具有易于学习和使用、高效、可移植性强等优点。相信许多Python开发者都遇到过需要使用Python操作Word文档的情况。本文旨在介绍如何使用Python操作Word文档&#xff0c;使开发者能够方便地实现自己的需求。…

chatgpt赋能python:Python怎么改为中文?

Python怎么改为中文&#xff1f; Python是一种高级编程语言&#xff0c;具有易读性、简单性和可扩展性的特点。它广泛应用于Web开发、数据分析、人工智能等领域。如何将Python改为中文&#xff1f;下面将为您详细介绍。 为什么要将Python改为中文&#xff1f; Python的英文是由…

[读论文][谷歌-12s生成] Speed is all your need

论文简要总结 刚读了下speed is all you need这个论文, https://arxiv.org/pdf/2304.11267.pdf 只是用的SD1.4没有对网络进行改造。 只做了4个改动 1 是对norm采用了groupnorm (GPU shader加速) 2 采用了GELU (GPU shader加速) 3 采用了两种attention优化&#xff0c;是partiti…

C语言之指针详解(2)

目录 本章重点 1. 字符指针 2. 数组指针 3. 指针数组 4. 数组传参和指针传参 5. 函数指针 6. 函数指针数组 7. 指向函数指针数组的指针 8. 回调函数 9. 指针和数组面试题的解析 数组指针 数组指针的定义 数组指针是指针&#xff1f;还是数组&#xff1f; 答案是&…

数据结构-队列

数据结构之队列 队列的概念顺序队列循环队列 顺序循环队列的ADT定义1、简单结构体定义2、初始化3、队列的清空4、计算队列的长度5、判断队列是否为空6、插入新的元素7、元素的删除8、遍历输出队列内的所有元素 链队列的ADT定义1、链队列简单结构体定义2、初始化链队列3、判断链…

chatgpt赋能python:Python怎么断行-让代码更易读

Python怎么断行 - 让代码更易读 大多数Python程序员都知道&#xff0c;代码可读性非常重要。好的代码应该易于阅读和理解&#xff0c;而不是让人困惑和痛苦。 然而&#xff0c;我们经常会发现一些Python代码在一行中拥挤着多个表达式、长变量名混杂其中&#xff0c;让人感到相…

数组:为什么数组都从0开始编号?

提到数组&#xff0c;我想你肯定不陌生&#xff0c;甚至还会自信地说&#xff0c;它很简单啊。 是的&#xff0c;在每一种编程语言中&#xff0c;基本都会有数组这种数据类型。不过&#xff0c;它不仅仅是一种编程语言中的数据类型&#xff0c;还是一种最基础的数据结构。尽管…

word中使用通配符批量将参考文献设置为上角标

目录 一、word中的通配符匹配规则 二、匹配单个参考文献 三、匹配多个参考文献 四、操作方式 &#xff08;1&#xff09;打开word中的替换功能 &#xff08;2&#xff09;输入要查找的内容 &#xff08;3&#xff09;选择替换格式 &#xff08;4&#xff09;点击替换 一…

Linux调试工具GDB(1)

文章目录 前言一、GDB概念二、GDB具体使用方法三、GDB断点类型总结 前言 本篇文章我们来介绍一下Linux中的调试利器GDB工具&#xff0c;在Linux的调试中GDB可以帮助我们来解决非常多的问题。 一、GDB概念 GDB是一个功能强大的调试工具&#xff0c;可以用于分析程序崩溃&…

temporal简介

文章目录 前言一、temporal是什么&#xff1f;二、使用步骤1.执行以下命令以启动预构建映像以及所有依赖项。2.运行示例 总结 前言 这两天在国外的网站发现了一个新的golang的微服务框架&#xff0c;感觉挺不错&#xff0c;分亨出来&#xff0c;大家一起看看。 一、temporal是…

python:并发编程(四)

前言 本文将和大家一起探讨python的多进程并发编程&#xff0c;使用内置基本库multiprocessing来实现并发&#xff0c;先通过官方来简单使用这个模块。先打好基础&#xff0c;能够有个基本的用法与认知&#xff0c;后续文章&#xff0c;我们再进行详细使用。 本文为python并发…

【数据库原理与应用 - 第四章】关系数据库规范化理论

目录 一、关系模式规范化理论 1、关系模式规范化概念 2、关系模式应满足的基本要求 3、关系规范化的意义 &#xff08;1&#xff09;一个好的数据库应遵循的标准 &#xff08;2&#xff09;规范化的意义 二、函数依赖 1、函数依赖的概念 &#xff08;1&#xff09;平凡…

Vue--》Vue3打造可扩展的项目管理系统后台的完整指南(五)

今天开始使用 vue3 ts 搭建一个项目管理的后台&#xff0c;因为文章会将项目的每一个地方代码的书写都会讲解到&#xff0c;所以本项目会分成好几篇文章进行讲解&#xff0c;我会在最后一篇文章中会将项目代码开源到我的GithHub上&#xff0c;大家可以自行去进行下载运行&…

学生成绩管理系统(逻辑清楚-简单实用)

1、需求分析 1.1、需求分析概述 需求分析是我们在软件开发中的重要环节&#xff0c;是软件开发的第一步也是最基础的环节&#xff0c;这将决定我们所实现的目标以及系统的各个组成部分、各部分的任务职能、以及使用到的数据结构、各个部门之间的组成关系和数据流程&#xff0…

chatgpt赋能python:Python列表操作:如何使用Python将数据放入列表中

Python列表操作&#xff1a;如何使用Python将数据放入列表中 在Python中&#xff0c;列表是一种重要的数据结构&#xff0c;允许我们将多个项目存储在单个变量中。在本文中&#xff0c;我们将介绍如何将数据放入Python列表中。我们将讨论Python中的列表数据类型以及如何向列表…

chatgpt赋能python:Python中如何放大图片

Python中如何放大图片 简介 图片是网站优化中不可或缺的一部分&#xff0c;然而&#xff0c;当图片在网站中被缩小或拉伸时&#xff0c;会导致其模糊或失真。在这种情况下&#xff0c;可以使用Python中的一些库来放大图片&#xff0c;同时保持图像的清晰度和质量。在本篇文章…

JavaScript 进阶 - 第3天

文章目录 JavaScript 进阶 - 第3天1 编程思想1.1 面向过程1.2 面向对象&#xff08;oop&#xff09; 2 构造函数3 原型对象3.1 原型3.2 constructor 属性3.3 对象原型3.4 原型继承3.5 原型链&#xff08;面试高频&#xff09; JavaScript 进阶 - 第3天 了解构造函数原型对象的语…

C++友元函数friend使用的学习总结

C友元函数friend使用的学习总结 1. 友元函数简介1.1 使用友元函数的目的1.2 友元函数的三种实现方法 2.全局函数做友元3.类做友元4.成员函数做友元注意&#xff01; 1. 友元函数简介 1.1 使用友元函数的目的 允许一个函数或者类访问另一个类中的私有成员,使得两个类可以共享同…