基于tcp协议的网络通信(基础echo版.多进程版,多线程版,线程池版),telnet命令

news2024/11/16 19:54:47

目录

基础版

思路

辅助函数 

服务端 

代码

运行情况 -- telnet +ip +端口号

传输的数据为什么没有转换格式

客户端

思路

代码

多进程版

引入

问题 

解决

注意点 

服务端

代码

运行情况

进程池版(简单介绍)

多线程版

引入

问题+解决

注意点

服务端

代码

运行情况 

线程池版

引入

过程介绍

服务端

代码 

task.hpp

thread_pool.hpp

helper.hpp

运行情况


基础版

思路

和udp不同的是,tcp是面向字节流,面向连接的协议

  • 所以要注意socket建立时的传入的数据类型 -- AF_STREAM

它需要客户端主动先和服务端建立连接,而不是直接发送数据

  • 那么,客户端就需要调用connect函数
  • 相应的,服务端需要一直处于监听(等待连接到来)的状态 -- listen函数,也需要一个接收连接的函数 -- accept(服务端会卡在accept中,直到有连接请求到来)

tcp协议当然也需要创建套接字并与自己的地址信息绑定 -- socket()+bind()

但是,tcp里会有两个不同的套接字文件,这两个的用处不一样

  • 在tcp协议中,服务端里被socket创建,被bind绑定,被accept使用的套接字a,只是用来获取连接的
  • 之后的io操作,由accept创建的新套接字b完成(也就是accept返回的fd)
  • 就像在饭店,有人负责拉客(门口站着的那种),这就是a的工作,所以可以命名为listen_socket(用于和b区分,a一般只有一个,当然也可以有多个)

  • 有人负责提供服务(服务员),这就是b的工作(可以有多个)

注意,每来一个新连接,就会有一个新的fd被返回

  • 即使连接获取失败,也不能说明什么,也许是对方切断了连接
  • 它不像socket那样,获取失败就说明哪里有问题;连接失败是可以被接受的
  • 所以,accept失败后不需要退出程序
  • 难道拉客的时候失败了你就辞职了吗? 不会的,你只会继续下一次的拉客

当客户端与服务端建立好连接后,就可以开始通信了

辅助函数 

获取时间,为客户端封装标识符

#pragma once

#include <string>
#include <cstring>

enum
{
    SOCK_ERROR = 1,
    BIND_ERROR,
    LISTEN_ERROR,
    CONNECT_ERROR
};

std::string get_time()
{
    time_t t = time(nullptr);
    struct tm *ctime = localtime(&t);

    char time_stamp[1024];
    snprintf(time_stamp, sizeof(time_stamp), "[%d-%d-%d %d:%d:%d]:",
             ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
             ctime->tm_hour, ctime->tm_min, ctime->tm_sec);
    return time_stamp;
}

std::string generate_id(const std::string ip, const uint16_t port)
{
    return "[" + ip + ":" + std::to_string(port) + "]";
}

打印日志

#pragma once

#include <iostream>
#include <time.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>

#define INFO 0
#define DEBUG 1
#define WARNING 2
#define ERROR 3
#define FATAL 4 // 致命的错误

#define SIZE 1024

class Log
{
public:
    Log()
    {
    }
    void operator()(int level, const char *format, ...)
    {
        time_t t = time(nullptr);
        struct tm *ctime = localtime(&t);

        char leftbuffer[SIZE];
        snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
                 ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
                 ctime->tm_hour, ctime->tm_min, ctime->tm_sec);

        va_list s;
        va_start(s, format);
        char rightbuffer[SIZE];
        vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
        va_end(s);

        // 格式:默认部分+自定义部分
        char logtxt[SIZE * 2];
        snprintf(logtxt, sizeof(logtxt), "%s %s", leftbuffer, rightbuffer);

        printf("%s\n", logtxt);
    }
    ~Log()
    {
    }

private:
    std::string levelToString(int level)
    {
        switch (level)
        {
        case INFO:
            return "INFO";
        case DEBUG:
            return "DEBUG";
        case WARNING:
            return "WARNING";
        case ERROR:
            return "ERROR";
        case FATAL:
            return "FATAL";
        default:
            return "NONE";
        }
    }
};

Log lg;

服务端 

代码

#include <iostream>
#include <string>

#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <cstring>

#include "Log.hpp"
#include "helper.hpp"


const int backlog = 5;
const int buff_size = 1024;

class tcp_server
{
public:
    tcp_server(const uint16_t port = 8080, const std::string ip = "0.0.0.0")
        : ip_(ip), port_(port), listen_sockfd_(-1)
    {
    }
    void run()
    {
        init();

        sockaddr_in client_addr;
        socklen_t client_len = sizeof(client_addr);
        memset(&client_addr, 0, client_len);
        lg(INFO, "init success");
        while (true)
        {
            int sockfd = accept(listen_sockfd_, reinterpret_cast<struct sockaddr *>(&client_addr), &client_len);
            if (sockfd < 0)
            {
                continue;
            }
            char client_ip[32];
            inet_ntop(AF_INET, &(client_addr.sin_addr), client_ip, sizeof(client_ip));
            int client_port = ntohs(client_addr.sin_port);

            lg(INFO, "get a new link..., sockfd: %d, client ip: %s, client port: %d", sockfd, client_ip, client_port);

            echo(sockfd, client_ip, client_port);
            close(sockfd);
        }
    }
    ~tcp_server() {}

private:
    void init()
    {
        listen_sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
        if (listen_sockfd_ < 0)
        {
            lg(FATAL, "socket create error, sockfd : %d,%s", listen_sockfd_, strerror(errno));
            exit(SOCK_ERROR);
        }
        lg(INFO, "socket create success, sockfd : %d", listen_sockfd_);

        struct sockaddr_in *addr = new sockaddr_in;
        memset(addr, 0, sizeof(*addr));
        addr->sin_family = AF_INET;
        inet_pton(AF_INET, ip_.c_str(), &(addr->sin_addr));
        addr->sin_port = htons(port_);

        int t = bind(listen_sockfd_, reinterpret_cast<struct sockaddr *>(addr), sizeof(*addr));
        if (t < 0)
        {
            lg(FATAL, "bind error, sockfd : %d,%s", listen_sockfd_, strerror(errno));
            exit(BIND_ERROR);
        }
        lg(INFO, "bind success, sockfd : %d", listen_sockfd_);

        if (listen(listen_sockfd_, backlog) < 0)
        {
            lg(FATAL, "listen error, sockfd : %d,%s", listen_sockfd_, strerror(errno));
            exit(LISTEN_ERROR);
        }
        lg(INFO, "listen success, sockfd : %d", listen_sockfd_);
        delete addr;
    }
    void echo(int fd, const char* ip, const uint16_t port)
    {
        char buffer[buff_size];
        memset(buffer, 0, sizeof(buffer));

        while (true)
        {
            int n = read(fd, buffer, sizeof(buffer) - 1);
            if (n < 0)
            {
                lg(ERROR, "%s:%d read error, %s", ip, port, strerror(errno));
                break;
            }
            else if (n == 0) //如果返回0,说明对端关闭了连接
            {
                lg(INFO, "%s:%d quit", ip, port);
                break;
            }
            else
            {
                buffer[n] = 0;
                std::string res = process_info(buffer, ip, port);
                write(fd, res.c_str(), res.size());
            }
        }
    }

    std::string process_info(const std::string &info, const std::string ip, const uint16_t port)
    {
        std::string time_stamp = get_time();
        std::string id = generate_id(ip, port);

        std::string res = id + time_stamp + info;
        return res;
    }

private:
    int listen_sockfd_;
    uint16_t port_;
    std::string ip_;
};

运行情况 -- telnet +ip +端口号

当我们只有服务端,且想要查看服务端是否处于监听状态,就可以用这个命令远程连接指定服务

这样我们就可以将其作为客户端,与服务端通信了:

当我们想要退出时,输入ctrl+],再输入quit命令即可:

传输的数据为什么没有转换格式

我们一直都对ip地址和端口号进行转换,那传输的数据呢?

无论是之前的udp协议,还是今天写的tcp协议,都是直接将字符串传进去了,为什么能这样呢?

  • 因为收发数据的函数会自动帮我们进行转换
  • 而ip地址和端口号是被存到系统级的结构体里的,它规定的数据类型就是那样
  • 所以我们在初始化时必须转成相应类型 ; 当我们要读取时,也要转换成适合显示的类型

客户端

思路

和使用udp协议一样,客户端也需要套接字(因为服务端创建了套接字,他们之间通信的基础就是套接字)

依然也不需要手动绑定,由os为我们随机分配端口号并绑定(因为客户端的端口号不重要,只需要保证客户端的唯一性即可)

那什么时候os为我们绑定呢?

  • udp是在客户端第一次发送消息时绑定,但tcp必须要先连接成功,才能发送消息
  • 而服务端有等待连接的函数,那么客户端肯定也有建立连接的函数 -- connect()
  • 也就是在客户端主动向服务端建立连接时,os调用bind,将客户端的套接字创建好 -- 这是建立连接的前提
  • 既然要主动建立连接,客户端就得提前知道服务端的ip和端口号(和udp里,主动向服务端发送消息一样)
  • 所以,这些信息我们要么在代码里写死,要么以命令行的形式传进去

连接成功后,客户端就可以开始发送数据了

发送完,等待服务端的响应数据

代码

#include <iostream>
#include <string>

#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <cstring>

#include "Log.hpp"
#include "helper.hpp"

class tcp_client
{
public:
    tcp_client(const uint16_t port = 8080, const std::string ip = "47.108.135.233")
        : sockfd_(-1), port_(port), ip_(ip)
    {
    }
    ~tcp_client() {}
    void run()
    {
        // struct sockaddr_in *server_addr = init();

        sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
        if (sockfd_ < 0)
        {
            lg(FATAL, "socket create error, sockfd : %d,%s", sockfd_, strerror(errno));
            exit(SOCK_ERROR);
        }
        lg(INFO, "socket create success, sockfd : %d", sockfd_);

        struct sockaddr_in *server_addr = new sockaddr_in;
        memset(server_addr, 0, sizeof(*server_addr));
        server_addr->sin_family = AF_INET;
        inet_pton(AF_INET, ip_.c_str(), &(server_addr->sin_addr));
        server_addr->sin_port = htons(port_);

        int ret = connect(sockfd_, reinterpret_cast<struct sockaddr *>(server_addr), sizeof(*server_addr));
        if (ret < 0)
        {
            std::cout << "connect fail" << std::endl;
            exit(CONNECT_ERROR);
        }
        while (true)
        {
            std::cout << "please enter:" << std::endl;
            std::string buffer;
            std::getline(std::cin, buffer);
            write(sockfd_, buffer.c_str(), buffer.size());

            char info[1024];
            memset(info, 0, sizeof(info));
            int n = read(sockfd_, info, sizeof(info) - 1);
            if (n > 0)
            {
                info[n] = 0;
                std::cout << info << std::endl;
            }
            else
            {
                break;
            }
        }
    }

private:
    struct sockaddr_in *init()
    {
        sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
        if (sockfd_ < 0)
        {
            lg(FATAL, "socket create error, sockfd : %d,%s", sockfd_, strerror(errno));
            exit(SOCK_ERROR);
        }
        lg(INFO, "socket create success, sockfd : %d", sockfd_);

        struct sockaddr_in *addr = new sockaddr_in;
        memset(addr, 0, sizeof(*addr));
        addr->sin_family = AF_INET;
        inet_pton(AF_INET, ip_.c_str(), &(addr->sin_addr));
        addr->sin_port = htons(port_);

        return addr;
    }

private:
    int sockfd_;
    uint16_t port_;
    std::string ip_;
};

因为tcp是在建立好连接的基础上通信的,如果通信过程中,连接断掉了该怎么办?

就和游戏中有时候会提示:断线重连中(一般是我们自己的网络出现波动/断掉了),我们需要重新调用客户端中的connect函数

当我们在游戏里重新连接上时,有些游戏会将已经进行的游戏内容快速给你播放一遍

这就说明该游戏会将游戏数据一直维护着,重连后将数据全部推送给你,然后让你继续游玩

多进程版

引入

如果有多个客户端运行的话,我们的代码无法支持并发运行

  • 因为服务端是单进程,所以只能一直循环为一个客户端服务
  • 直到这个客户端退出后,才会退出echo函数(里面是while循环),才会重新获取连接(也就是回到while循环的一开始):
  • 让后启动的客户端只能干等着,这显然是不合理的
  • 所以我们需要将服务端改为多进程版本的 -- 当有新客户端连接时,就创建出新的子进程,让子进程去服务,主进程去监听是否有新的连接

问题 

父进程等待子进程是必要的

  • 不然就会形成僵尸进程

又因为父进程不会退出(他负责监听是否有进程连接,连接了就派进程去服务)

  • 所以让子进程变成孤儿进程也是不行的

并且,它也不可以阻塞在等待函数里(他有自己的任务)

  • 不然和之前的代码有什么区别呢

所以,该怎么办呢?

解决

选择非阻塞式等待(也就是轮询)是可以的

但我们还有其他方法:

  • 先明确我们的前提 -- 不能让阻塞式等待的父进程卡在waitpid,也不能托孤->子进程最好立即退出->让其他进程去帮子进程执行
  • 也就是在子进程内部再次fork,让孙子进程实际提供服务->因为子进程的退出,孙子进程成为了孤儿进程,由os释放其资源
  • 这样父进程就可以立即等待到子进程,也就会进入下一次的循环去进行连接了
  • 子进程和孙子进程都不会变成僵尸进程
  • 皆大欢喜~

也可以手动忽略子进程发出的sigchld信号

  • 这样父进程也不需要等待了,由os接手释放资源

注意点 

注意,子进程是去执行io操作的,所以listen_sockfd就没有用了(它只管连接)

  • 那么子进程最好关闭它,防止误操作
  • 子进程关闭了它,并不会使指向的文件真正关闭 -- 还有父进程使用它(os在管理它时,会有一个引用计数字段嘟,只有计数=0时,才会关闭文件)

同理,父进程将io操作交给了子进程去处理,那么用于io的sockfd就没用了

  • 需要关闭它

这样,这两个套接字分别都只有一个进程去使用了

服务端

代码

    void run()
    {
        init();

        sockaddr_in client_addr;
        socklen_t client_len = sizeof(client_addr);
        memset(&client_addr, 0, client_len);
        lg(INFO, "init success");
        while (true)
        {
            int sockfd = accept(listen_sockfd_, reinterpret_cast<struct sockaddr *>(&client_addr), &client_len);
            if (sockfd < 0)
            {
                continue;
            }
            char client_ip[32];
            inet_ntop(AF_INET, &(client_addr.sin_addr), client_ip, sizeof(client_ip));
            int client_port = ntohs(client_addr.sin_port);

            lg(INFO, "get a new link..., sockfd: %d, client ip: %s, client port: %d", sockfd, client_ip, client_port);

            // 单进程版
            // echo(sockfd, client_ip, client_port);
            // close(sockfd);

            // 多进程版 -- 孙子进程版
            int ret = fork();
            if (ret == 0)
            {
                close(listen_sockfd_);
                int t = fork();
                if (t == 0)
                {
                    echo(sockfd, client_ip, client_port);
                }
                exit(0);
            }
            close(sockfd);
            waitpid(ret, nullptr, 0);

            // 多进程版 -- 忽略信号版
            int ret = fork();
            if (ret == 0)
            {
                close(listen_sockfd_);
                echo(sockfd, client_ip, client_port);
                exit(0);
            }
            close(sockfd);
            signal(SIGCHLD, SIG_IGN);
        }
    }

其他的都没有变

运行情况

可以看到,当我们运行了两个客户端时,就有对应的孙子进程被创建,且都变成了孤儿进程,被init进程抚养:

或者是忽略信号的方法,同时运行两个客户端,且其中一个退出后,可以看到并没有形成僵尸进程:

进程池版(简单介绍)

也可以提前创建好进程,每个进程都去执行while循环(从获取连接到提供io服务),这样也可以并发式地让多个客户端同时与服务端通信

  • 那么他们每个进程都需要通过accept获取网络文件,就存在着竞争关系,也就需要加锁(不然可能会出现多个进程打开同一个文件的情况)

多线程版

引入

但是,这样写出的代码需要创建出很多子进程

  • 不仅可能出现一个客户端对应一个子进程的情况
  • 而且创建进程的成本很高,很占据资源

实际上我们只是需要有人去执行任务就行

  • 所以多线程是我们的最佳选择
  • 它是cpu调度的基本单位,可以最低成本地实现我们的需求

问题+解决

但是线程也需要主线程去等待耶,那主线程还是会卡在join那里,直到等待到线程完成任务,这不符合我们的预期

  • 所以,我们让副线程与主线程分离 -- detach(之前一直没用过这个接口,但现在有它的用武之地了)
  • 线程退出时会自动释放资源,而不需要等待其他线程调用pthread_join函数

注意点

和父子进程不同的是,多个线程共享所在进程的文件描述符表

  • 注意是完全共享,而不是父子进程之间的写时拷贝模式
  • 所以不需要关闭
  • 一旦其中某个线程关闭了它,其他线程也就用不了了

因为我们要在类内部创建线程

  • 那么线程执行函数就得是static类型的

但这样就没有this指针了

  • 所以需要定义一个类型,将this指针封装进去
  • 也包括echo函数需要用到的数据(这样在函数内部强转指针后,就可以直接使用了)

服务端

代码

class tcp_server; //提前声明一下tcp_server 是个类类型,不然编译过不去

struct p_data
{
    int fd_;
    uint16_t port_;
    std::string ip_;
    tcp_server *it_;
};
   
 void run()
    {
        init();

        sockaddr_in client_addr;
        socklen_t client_len = sizeof(client_addr);
        memset(&client_addr, 0, client_len);
        lg(INFO, "init success");
        while (true)
        {
            int sockfd = accept(listen_sockfd_, reinterpret_cast<struct sockaddr *>(&client_addr), &client_len);
            if (sockfd < 0)
            {
                continue;
            }
            char client_ip[32];
            inet_ntop(AF_INET, &(client_addr.sin_addr), client_ip, sizeof(client_ip));
            uint16_t client_port = ntohs(client_addr.sin_port);

            lg(INFO, "get a new link..., sockfd: %d, client ip: %s, client port: %d", sockfd, client_ip, client_port);

            // 单进程版
            // echo(sockfd, client_ip, client_port);
            // close(sockfd);

            // 多进程版 -- 孙子进程版
            // int ret = fork();
            // if (ret == 0)
            // {
            //     close(listen_sockfd_);
            //     int t = fork();
            //     if (t == 0)
            //     {
            //         echo(sockfd, client_ip, client_port);
            //     }
            //     exit(0);
            // }
            // close(sockfd);
            // waitpid(ret, nullptr, 0);

            // 多进程版 -- 忽略信号版
            // int ret = fork();
            // if (ret == 0)
            // {
            //     close(listen_sockfd_);
            //     echo(sockfd, client_ip, client_port);
            //     exit(0);
            // }
            // close(sockfd);
            // signal(SIGCHLD, SIG_IGN);

            // 多线程版
            pthread_t tid = 0;
            p_data *p = new p_data({sockfd, client_port, client_ip, this});
            pthread_create(&tid, nullptr, entrance, reinterpret_cast<void *>(p));
        }
    }

    static void *entrance(void *args)
    {
        pthread_detach(pthread_self());

        p_data *p = reinterpret_cast<p_data *>(args);
        tcp_server *it = p->it_;
        it->echo(p->fd_, (p->ip_).c_str(), p->port_);
        delete p;
        return nullptr;
    }

运行情况 

当我们运行起两个客户端后,就可以看见有两个线程创建出来了:

线程池版

引入

虽然比起进程版本的来说,多线程的成本变小了,但仍然存在客户端和线程一对一的弊端

  • 访问量较大时,服务端还是可能带不起来
  • 而且是在客户端已经到来时才创建线程,效率比较低
  • 所以,线程池就可以使用了(之前写过,这里就直接使用了) -- 线程池(图解,本质,模拟实现代码),添加单例模式(懒汉思路+代码)-CSDN博客

过程介绍

首先回顾一下线程池的内容:

  • 提前创建出一定数量的线程,主线程push任务进队列
  • 如果有任务,空闲的线程去竞争任务,拿到任务的线程(pop)去执行任务
  • 如果没有任务,线程就等待任务的到来

在当时的线程池里,我们的重点在于如何放/取任务,但只有这些并不是一个完整的cp模型,在这里就可以填补上这个空缺了

  • 也就是任务的来源和后续的处理
  • 来源 : 客户端的访问
  • 处理 : 将消息封装后回显,然后交回给客户端(也就是我们的echo函数)
  • 这样,线程之间竞争任务就没那么激烈(因为会有部分线程陷于处理任务的状态)

并且,这里设计成每个线程只为客户端提供一次服务

  • 当然,这是要看场景的,这里只是一个echo回显的功能,短时/长时服务都可以
  • 短时服务可以减少服务器的压力
  • 而像shell那种需要长时间的保持,就不能这么写了,Shell 进程会等待用户的输入(有时候也会在等待期间处理其他后台任务:下载文件等)

也就是 -- 只在客户端需要io时,才分配线程去处理,并且在处理完成后,就断开与客户端的连接,当客户端需要io时再连接

服务端

代码 

    void run_pthread_pool()
    {
        // 初始化
        init();
        thread_pool<Task> *tp = thread_pool<Task>::get_instance();
        tp->init();

        sockaddr_in client_addr;
        socklen_t client_len = sizeof(client_addr);
        memset(&client_addr, 0, client_len);
        lg(INFO, "init success");

        while (true)
        {
            int sockfd = accept(listen_sockfd_, reinterpret_cast<struct sockaddr *>(&client_addr), &client_len);
            if (sockfd < 0)
            {
                continue;
            }
            char client_ip[32];
            inet_ntop(AF_INET, &(client_addr.sin_addr), client_ip, sizeof(client_ip));
            uint16_t client_port = ntohs(client_addr.sin_port);

            lg(INFO, "get a new link..., sockfd: %d, client ip: %s, client port: %d", sockfd, client_ip, client_port);

            Task t(sockfd,client_ip,client_port);
            tp->push(t);
        }
    }
task.hpp
#pragma once

#include <iostream>
#include <string>
#include <stdio.h>

#include "helper.hpp"
// 这里的任务是,服务端在收到客户端的连接后的后续工作

class Task
{
public:
    Task() {} // 方便只是为了接收传参而定义一个对象
    Task(int fd, const char ip[32], const uint16_t port)
        : sockfd_(fd), ip_(ip), port_(port)
    {
    }

    void operator()()
    {
        char buffer[buff_size];
        memset(buffer, 0, sizeof(buffer));

        while (true)
        {
            int n = read(sockfd_, buffer, sizeof(buffer) - 1);
            if (n < 0)
            {
                lg(ERROR, "%s:%d read error, %s", ip_.c_str(), port_, strerror(errno));
                break;
            }
            else if (n == 0)
            {
                lg(INFO, "%s:%d quit", ip_.c_str(), port_);
                break;
            }
            else
            {
                buffer[n] = 0;
                std::string res = process_info(buffer, ip_, port_);
                write(sockfd_, res.c_str(), res.size());
            }
        }
    }

private:
    int sockfd_;
    uint16_t port_;
    std::string ip_;
};
thread_pool.hpp
#include <pthread.h>
#include <vector>
#include <queue>
#include <stdlib.h>
#include <string>
#include <unistd.h>
#include <semaphore.h>
#include <iostream>

struct thread
{
    pthread_t tid_;
    std::string name_;
};

template <class T>
class thread_pool
{
private:
    void lock()
    {
        pthread_mutex_lock(&mutex_);
    }
    void unlock()
    {
        pthread_mutex_unlock(&mutex_);
    }
    void wait()
    {
        pthread_cond_wait(&cond_, &mutex_);
    }
    void signal()
    {
        pthread_cond_signal(&cond_);
    }
    T pop()
    {
        T t = task_.front();
        task_.pop();
        return t;
    }
    bool is_empty()
    {
        return task_.size() == 0;
    }

    static void *entry(void *args) // 类成员会有this参数,但入口函数不允许有多余参数
    {
        thread_pool<T> *tp = static_cast<thread_pool<T> *>(args); // this指针,用于拿到成员变量/函数
        while (true)
        {
            tp->lock();
            while (tp->is_empty())
            {
                tp->wait();
            }
            T t = tp->pop();
            tp->unlock();

            t();
        }
        return nullptr;
    }

public:
    static thread_pool<T> *get_instance(int num = 5)
    {
        // 如果这样写,虽然保证了安全,但会在创建对象后,线程依然线性运行
        //  pthread_mutex_lock(&single_mutex_);
        //  if (myself_ == nullptr)
        //  {
        //      myself_ = new thread_pool<T>(num);
        //  }
        //  pthread_mutex_unlock(&single_mutex_);

        if (myself_ == nullptr) // 再加一层判断,就可以提高效率
        {
            pthread_mutex_lock(&single_mutex_);
            if (myself_ == nullptr)
            {
                myself_ = new thread_pool<T>(num);
                //std::cout << "get instance success" << std::endl;
            }
            pthread_mutex_unlock(&single_mutex_);
        }

        return myself_;
    }
    void init()
    {
        for (size_t i = 0; i < num_; ++i)
        {
            pthread_create(&(threads_[i].tid_), nullptr, entry, this);
            pthread_detach(threads_[i].tid_);
        }
    }
    void push(const T data)
    {
        lock();

        task_.push(data);
        signal(); // 放在锁内,确保只有当前线程执行唤醒操作,不然可能会有多次操作

        unlock();
    }

private:
    thread_pool(int num = 5)
        : num_(num), threads_(num)
    {
        pthread_cond_init(&cond_, nullptr);
        pthread_mutex_init(&mutex_, nullptr);
    }
    ~thread_pool()
    {
        pthread_cond_destroy(&cond_);
        pthread_mutex_destroy(&mutex_);
    }

private:
    std::vector<thread> threads_;
    std::queue<T> task_;
    int num_;

    pthread_cond_t cond_;
    pthread_mutex_t mutex_;

    static thread_pool<T> *myself_; // 每次外部想要线程池对象时,返回的都是这一个(只有静态成员变量,才能保证一个类只有一个)
    static pthread_mutex_t single_mutex_;
};

template <class T>
thread_pool<T> *thread_pool<T>::myself_ = nullptr;

template <class T>
pthread_mutex_t thread_pool<T>::single_mutex_ = PTHREAD_MUTEX_INITIALIZER;

因为task.hpp里面需要用到process_info函数,处理我们收到的信息,所以我将这个函数从服务端类内挪到了helper.hpp里(反正这个函数不需要用到类内成员)

helper.hpp
#pragma once

#include <string>
#include <cstring>

enum
{
    SOCK_ERROR = 1,
    BIND_ERROR,
    LISTEN_ERROR,
    CONNECT_ERROR
};

const int buff_size = 1024;

std::string get_time()
{
    time_t t = time(nullptr);
    struct tm *ctime = localtime(&t);

    char time_stamp[1024];
    snprintf(time_stamp, sizeof(time_stamp), "[%d-%d-%d %d:%d:%d]:",
             ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
             ctime->tm_hour, ctime->tm_min, ctime->tm_sec);
    return time_stamp;
}

std::string generate_id(const std::string ip, const uint16_t port)
{
    return "[" + ip + ":" + std::to_string(port) + "]";
}

std::string process_info(const std::string &info, const std::string ip, const uint16_t port)
{
    std::string time_stamp = get_time();
    std::string id = generate_id(ip, port);

    std::string res = id + time_stamp + info;
    return res;
}

运行情况

服务端启动起来后,就有五个新线程被创建出来(因为我们的默认线程数量是5):

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1526290.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java微服务分布式事务框架seata

&#x1f339;作者主页&#xff1a;青花锁 &#x1f339;简介&#xff1a;Java领域优质创作者&#x1f3c6;、Java微服务架构公号作者&#x1f604; &#x1f339;简历模板、学习资料、面试题库、技术互助 &#x1f339;文末获取联系方式 &#x1f4dd; 往期热门专栏回顾 专栏…

SSH远程连接断开后,程序继续运行

1、问题 我们在使用SSH连接远程服务器跑代码的时候&#xff0c;可能会遇到 代码需要跑很久 的情况&#xff0c;你可能会想 断开远程连接&#xff0c;但是&#xff0c;代码仍然要继续跑。 (eg: 晚上关电脑&#xff0c;但是想让代码继续跑着&#xff0c;第二天想看结果) 2、scre…

JSONP漏洞详解

目录 同源策略 JSONP简介 JSONP劫持漏洞 漏洞原理 漏洞利用过程 利用工具 JSONP漏洞挖掘思路 JSONP防御 首先&#xff0c;要了解一下什么是同源策略&#xff1f; 同源策略 同源策略&#xff08;SOP&#xff09;是浏览器的一个安全基石&#xff0c;浏览器为了保证数据…

使用 Jenkins 管道在 Docker Hub 中构建 Docker 镜像

Jenkins Pipeline 是一个强大的工具&#xff0c;可以自动执行部署。在各个阶段之间拆分的灵活和自定义操作是尝试此功能的一个很好的理由。 构建您自己的 Docker 镜像并将其上传到 Docker Hub 以保持存储库更新是了解 Jenkins Pipeline 如何改进您的工作方式的一个很好的示例。…

详细分析Python装饰器(附Demo)

目录 前言1. 基本知识2. 无参装饰器3. 有参装饰器4. 多个装饰器 前言 装饰器类似Java的切点切面增强 推荐阅读&#xff1a; 详细分析Spring中的Around注解&#xff08;附Demo&#xff09;java框架 零基础从入门到精通的学习路线 附开源项目面经等&#xff08;超全&#xff0…

如何实现Git Push之后自动部署到服务器?

在平时个人开发的过程中是不是有这样的烦恼&#xff1a; 项目开发完成&#xff0c;Push之后 登录服务器&#xff0c;手动git pull&#xff0c;然后运行部署命令 这真的很烦诶&#xff01; 那么能不能Git push之后&#xff0c;远端服务器自动 Git pull 然后运行部署命令呢&a…

内网安全之-NTLM协议详解

NTLM&#xff08;New Technology LAN Manager&#xff09;身份验证协议是微软用于Windows身份验证的主要协议之一。早起SMB协议以明文口令的形式在网络上传输&#xff0c;因此产生了安全性问题。后来出现了LM&#xff08;LAN Manager&#xff09;身份验证协议&#xff0c;它非常…

排序问题—java实现

冒泡排序 算法思想&#xff1a; 每次比较相邻元素&#xff0c;若逆序则交换位置&#xff0c;每一趟比较n-1次&#xff0c;确定一个最大值。故需比较n趟&#xff0c;来确定n个数的位置。 外循环来表示比较的趟数&#xff0c;每一趟确定一个最大数的位置内循环来表示相邻数字两…

2024考研国家线公布,各科分数线有哪些变化?考研国家线哪些涨了,哪些跌了?可视化分析告诉你

结论在文章结尾 2024考研国家线 一、近五年国家线趋势图-学术硕士 文学 管理学 工学照顾专业 体育学 交叉学科 军事学 历史学 理学 享受少数名族照顾政策的考生 中医类照顾专业 教育类 艺术类 医学 工学 哲学 法学 农学 经济学 二、近五年国家线趋势图-专业硕士 中医 应用心理 …

IPSEC VPN-详解原理

目录 IPSEC提供的安全服务 IPSEC协议簇 ​编辑 安全协议 1.传输模式 2. 隧道模式 AH ---鉴别头协议 AH提供的安全服务&#xff1a; AH头部 AH的保护范围 1.传输模式 2.隧道模式 ​编辑 ESP ---封装安全载荷协议 ESP提供的安全服务&#xff1a; ESP的头部 ESP的保护范围 1.传输…

如何选择合适的数据可视化工具?

如果是入门级的数据可视化工具&#xff0c;使用Excel插件就足够了&#xff01; Excel插件&#xff0c;tusimpleBI 是一款 Excel 图表插件&#xff0c;提供超过120项图表功能&#xff0c;帮助用户制作各种 Excel 所没有的高级图表&#xff0c;轻轻松松一键出图。 它能够制作10…

【送书福利!第一期】《ARM汇编与逆向工程》

&#x1f42e;博主syst1m 带你 acquire knowledge&#xff01; ✨博客首页——syst1m的博客&#x1f498; &#x1f618;《CTF专栏》超级详细的解析&#xff0c;宝宝级教学让你从蹒跚学步到健步如飞&#x1f648; &#x1f60e;《大数据专栏》大数据从0到秃头&#x1f47d;&…

SD-WAN解决企业在工业互联网时代的新困境

工业互联网迎来全新的发展契机&#xff0c;而SD-WAN技术将成为制造企业快速崭露头角的得力助手&#xff01; 制造业的数字化转型已成为经济全球化和数字化浪潮的必然产物。许多制造企业迅速向人口密集区域扩张&#xff0c;呈现出分支众多、布局分散的特点。随着工业互联网的蓬勃…

Linux环境开发工具之yum

前言 前面我们已经对基本的指令和权限进行了介绍&#xff0c;本期开始我们将介绍常用的开发工具。例如&#xff1a;软件包管理器yum。 本期内容介绍 Linux上安装软件的方式 什么是yum yum的相关操作 yum的本地配置和yum源 一、Linux上安装软件的方式 在介绍Linux上如何安装一…

upload-lab 11-20解法

pass11 查看代码 这里我们先解读下代码 $is_upload false; $msg null; if(isset($_POST[submit])){# 定义了白名单数组$ext_arr array(jpg,png,gif);# 截取上传文件名最后一个带点的文件后缀 $file_ext substr($_FILES[upload_file][name],strrpos($_FILES[upload_file][n…

10:00面试,10:06就出来了,问的问题有点变态。。。

从小厂出来&#xff0c;没想到在另一家公司又寄了。 到这家公司开始上班&#xff0c;加班是每天必不可少的&#xff0c;看在钱给的比较多的份上&#xff0c;就不太计较了。没想到8月一纸通知&#xff0c;所有人不准加班&#xff0c;加班费不仅没有了&#xff0c;薪资还要降40%…

SHELL——条件判断语句练习

目录 一、练习题目 二、解答过程 1、判断当前磁盘剩余空间是否有20G&#xff0c;如果小于20G&#xff0c;则将报警邮件发送给管理员&#xff0c;每天检查次磁盘剩余空间。 安装邮件服务 配置邮件服务 编写脚本work1.sh 添加计划任务 2、判断web服务是否运行&#xff1a;…

基于PyTorch的视频分类实战

1、数据集下载 官方链接&#xff1a;https://serre-lab.clps.brown.edu/resource/hmdb-a-large-human-motion-database/#Downloads 百度网盘连接&#xff1a; https://pan.baidu.com/s/1sSn--u_oLvTDjH-BgOAv_Q?pwdxsri 提取码: xsri 官方链接有详细的数据集介绍&#xf…

疯狂 META:Aavegotchi 新一季稀有度挖矿来了!

经过数周的激烈讨论和参与&#xff0c;AavegotchiDAO 再次投票决定资助新一季的稀有度挖矿活动&#xff0c;这也是我们神奇的第八季&#xff01;朋友们&#xff0c;我们又开始啦——拿出你们最好的装备&#xff0c;擦亮那些可穿戴设备&#xff0c;准备好赚钱吧&#xff01; 与…

HarmonyOS系统开发ArkTS入门案例及组件

目录 一、声明式UI 二、ArkTs 快速入门案例 三、组件 四、渲染控制 一、声明式UI 声明式UI就是一种编写用户界面的范式或方式、 ArArkTS 在继承了Typescript语法的基础上&#xff0c;主要扩展了声明式UI开发相关的能力。 声明式UI开发范式大致流程&#xff1a;定义页面…