目录
1.TCP网络程序
1.1 TCP socket API
1.1.1 socket():
1.1.2 bind():
1.1.3 listen():
1.1.4 accept():
1.1.5 connect():
2. 封装 TCP socket
2.1 实现一个简单的英译汉的功能
3.简单的TCP网络程序(多进程版本)
4. 简单的TCP网络程序(多线程版本)
5. 线程池版本的 TCP 服务器
5.1 TCP协议通讯流程
5.1.1 服务器初始化
5.1.2 建立连接的过程
5.1.3 数据传输的过程
5.1.4 断开连接的过程
6. TCP 和 UDP 对比
7. TCP网络编程(多版本、多功能)
版本1:
版本2:
版本3:
后记:●由于作者水平有限,文章难免存在谬误之处,敬请读者斧正,俚语成篇,恳望指教!
——By 作者:新晓·故知
TCP网络通信GIF:
1.TCP网络程序
1.1 TCP socket API
下面介绍程序中用到的 socket API, 这些函数都在 sys/socket.h 中。1.1.1 socket():
- socket()打开一个网络通讯端口,如果成功的话,就像open()一样返回一个文件描述符;
- 应用程序可以像读写文件一样用read/write在网络上收发数据;
- 如果socket()调用出错则返回-1;
- 对于IPv4, family参数指定为AF_INET;
- 对于TCP协议,type参数指定为SOCK_STREAM, 表示面向流的传输协议
- protocol参数的介绍从略,指定为0即可
1.1.2 bind():
- 服务器程序所监听的网络地址和端口号通常是固定不变的,客户端程序得知服务器程序的地址和端口号后就可以向服务器发起连接; 服务器需要调用bind绑定一个固定的网络地址和端口号;
- bind()成功返回0,失败返回-1。
- bind()的作用是将参数sockfd和myaddr绑定在一起, 使sockfd这个用于网络通讯的文件描述符监听myaddr所描述的地址和端口号;
- 前面讲过,struct sockaddr *是一个通用指针类型,myaddr参数实际上可以接受多种协议的sockaddr结构体,而它们的长度各不相同,所以需要第三个参数addrlen指定结构体的长度;
我们的程序中对 myaddr参数是这样初始化的:1. 将整个结构体清零 ;2. 设置地址类型为 AF_INET;3. 网络地址为 INADDR_ANY, 这个宏表示本地的任意 IP 地址 , 因为服务器可能有多个网卡 , 每个网卡也可能绑定多个IP 地址 , 这样设置可以在所有的 IP 地址上监听 , 直到与某个客户端建立了连接时才确定下来到底用哪个IP 地址 ;4. 端口号为 SERV_PORT, 我们定义为 9999;1.1.3 listen():
- listen()声明sockfd处于监听状态, 并且最多允许有backlog个客户端处于连接等待状态, 如果接收到更多的连接请求就忽略, 这里设置不会太大(一般是5), 具体细节同学们课后深入研究;
- listen()成功返回0,失败返回-1
1.1.4 accept():
- 三次握手完成后, 服务器调用accept()接受连接;
- 如果服务器调用accept()时还没有客户端的连接请求,就阻塞等待直到有客户端连接上来;
- addr是一个传出参数,accept()返回时传出客户端的地址和端口号;
- 如果给addr 参数传NULL,表示不关心客户端的地址;
- addrlen参数是一个传入传出参数(value-result argument), 传入的是调用者提供的, 缓冲区addr的长度以避免缓冲区溢出问题, 传出的是客户端地址结构体的实际长度(有可能没有占满调用者提供的缓冲区);
我们的服务器程序结构是这样的 :理解 accecpt 的返回值 : 饭店拉客例子1.1.5 connect():
- 客户端需要调用connect()连接服务器;
- connect和bind的参数形式一致, 区别在于bind的参数是自己的地址, 而connect的参数是对方的地址;
- connect()成功返回0,出错返回-1;
2. 封装 TCP socket
2.1 实现一个简单的英译汉的功能
界面演示:
tcp_socket.hpp:#pragma once #include <stdio.h> #include <string.h> #include <stdlib.h> #include <string> #include <cassert> #include <unistd.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <fcntl.h> typedef struct sockaddr sockaddr; typedef struct sockaddr_in sockaddr_in; #define CHECK_RET(exp) \ if (!(exp)) \ { \ return false; \ } class TcpSocket { public: TcpSocket() : fd_(-1) { } TcpSocket(int fd) : fd_(fd) { } bool Socket() { fd_ = socket(AF_INET, SOCK_STREAM, 0); if (fd_ < 0) { perror("socket"); return false; } printf("open fd = %d\n", fd_); return true; } bool Close() const { close(fd_); printf("close fd = %d\n", fd_); return true; } bool Bind(const std::string &ip, uint16_t port) const { sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_addr.s_addr = inet_addr(ip.c_str()); addr.sin_port = htons(port); int ret = bind(fd_, (sockaddr *)&addr, sizeof(addr)); if (ret < 0) { perror("bind"); return false; } return true; } bool Listen(int num) const { int ret = listen(fd_, num); if (ret < 0) { perror("listen"); return false; } return true; } bool Accept(TcpSocket *peer, std::string *ip = NULL, uint16_t *port = NULL) const { sockaddr_in peer_addr; socklen_t len = sizeof(peer_addr); int new_sock = accept(fd_, (sockaddr *)&peer_addr, &len); if (new_sock < 0) { perror("accept"); return false; } printf("accept fd = %d\n", new_sock); peer->fd_ = new_sock; if (ip != NULL) { *ip = inet_ntoa(peer_addr.sin_addr); } if (port != NULL) { *port = ntohs(peer_addr.sin_port); } return true; } bool Recv(std::string *buf) const { buf->clear(); char tmp[1024 * 10] = {0}; // [注意!] 这里的读并不算很严谨, 因为一次 recv 并不能保证把所有的数据都全部读完 // 参考 man 手册 MSG_WAITALL 节. ssize_t read_size = recv(fd_, tmp, sizeof(tmp), 0); if (read_size < 0) { perror("recv"); return false; } if (read_size == 0) { return false; } buf->assign(tmp, read_size); return true; } bool Send(const std::string &buf) const { ssize_t write_size = send(fd_, buf.data(), buf.size(), 0); if (write_size < 0) { perror("send"); return false; } return true; } bool Connect(const std::string &ip, uint16_t port) const { sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_addr.s_addr = inet_addr(ip.c_str()); addr.sin_port = htons(port); int ret = connect(fd_, (sockaddr *)&addr, sizeof(addr)); if (ret < 0) { perror("connect"); return false; } return true; } int GetFd() const { return fd_; } private: int fd_; };
TCP通用服务器
tcp_server.hpp:
#pragma once #include <functional> #include "tcp_socket.hpp" typedef std::function<void(const std::string &req, std::string *resp)> Handler; class TcpServer { public: TcpServer(const std::string &ip, uint16_t port) : ip_(ip), port_(port) { } bool Start(Handler handler) { // 1. 创建 socket; CHECK_RET(listen_sock_.Socket()); // 2. 绑定端口号 CHECK_RET(listen_sock_.Bind(ip_, port_)); // 3. 进行监听 CHECK_RET(listen_sock_.Listen(5)); // 4. 进入事件循环 for (;;) { // 5. 进行 accept TcpSocket new_sock; std::string ip; uint16_t port = 0; if (!listen_sock_.Accept(&new_sock, &ip, &port)) { continue; } printf("[client %s:%d] connect!\n", ip.c_str(), port); // 6. 进行循环读写 for (;;) { std::string req; // 7. 读取请求. 读取失败则结束循环 bool ret = new_sock.Recv(&req); if (!ret) { printf("[client %s:%d] disconnect!\n", ip.c_str(), port); // [注意!] 需要关闭 socket new_sock.Close(); break; } // 8. 计算响应 std::string resp; handler(req, &resp); // 9. 写回响应 new_sock.Send(resp); printf("[%s:%d] req: %s, resp: %s\n", ip.c_str(), port, req.c_str(), resp.c_str()); } } return true; } private: TcpSocket listen_sock_; std::string ip_; uint64_t port_; };
英译汉服务器
EnToCh.cc:
#include <unordered_map> #include "tcp_server.hpp" std::unordered_map<std::string, std::string> g_dict; void Translate(const std::string &req, std::string *resp) { auto it = g_dict.find(req); if (it == g_dict.end()) { *resp = "未找到"; return; } *resp = it->second; return; } int main(int argc, char *argv[]) { if (argc != 3) { printf("Usage ./dict_server [ip] [port]\n"); return 1; } // 1. 初始化词典 g_dict.insert(std::make_pair("hello", "你好")); g_dict.insert(std::make_pair("world", "世界")); g_dict.insert(std::make_pair("linux", "开源的操作系统")); // 2. 启动服务器 TcpServer server(argv[1], atoi(argv[2])); server.Start(Translate); return 0; }
TCP 通用客户端tcp_client.hpp:#pragma once #include "tcp_socket.hpp" class TcpClient { public: TcpClient(const std::string &ip, uint16_t port) : ip_(ip), port_(port) { // [注意!!] 需要先创建好 socket sock_.Socket(); } ~TcpClient() { sock_.Close(); } bool Connect() { return sock_.Connect(ip_, port_); } bool Recv(std::string *buf) { return sock_.Recv(buf); } bool Send(const std::string &buf) { return sock_.Send(buf); } private: TcpSocket sock_; std::string ip_; uint16_t port_; };
英译汉客户端
dict_client.cc:#include "tcp_client.hpp" #include <iostream> int main(int argc, char *argv[]) { if (argc != 3) { printf("Usage ./dict_client [ip] [port]\n"); return 1; } TcpClient client(argv[1], atoi(argv[2])); bool ret = client.Connect(); if (!ret) { return 1; } for (;;) { std::cout << "请输入要查询的单词:" << std::endl; std::string word; std::cin >> word; if (!std::cin) { break; } client.Send(word); std::string result; client.Recv(&result); std::cout << result << std::endl; } return 0; }
由于客户端不需要固定的端口号,因此不必调用bind(),客户端的端口号由内核自动分配.
注意 :
- 客户端不是不允许调用bind(), 只是没有必要调用bind()固定一个端口号. 否则如果在同一台机器上启动多个客户端, 就会出现端口号被占用导致不能正确建立连接;
- 服务器也不是必须调用bind(), 但如果服务器不调用bind(), 内核会自动给服务器分配监听端口, 每次启动服务器时端口号都不一样, 客户端要连接服务器就会遇到麻烦;
测试多个连接的情况再启动一个客户端 , 尝试连接服务器 , 发现第二个客户端 , 不能正确的和服务器进行通信 .分析原因 , 是因为我们 accecpt 了一个请求之后 , 就在一直 while 循环尝试 read, 没有继续调用到 accecpt, 导致不能接受新的请求.我们当前的这个 TCP, 只能处理一个连接 , 这是不科学的
3.简单的TCP网络程序(多进程版本)
通过每个请求 , 创建子进程的方式来支持多连接 ;tcp_process_server.hpp:#pragma once #include <functional> #include <signal.h> #include "tcp_socket.hpp" typedef std::function<void(const std::string &req, std::string *resp)> Handler; // 多进程版本的 Tcp 服务器 class TcpProcessServer { public: TcpProcessServer(const std::string &ip, uint16_t port) : ip_(ip), port_(port) { // 需要处理子进程 signal(SIGCHLD, SIG_IGN); } void ProcessConnect(const TcpSocket &new_sock, const std::string &ip, uint16_t port, Handler handler) { int ret = fork(); if (ret > 0) { // father // 父进程不需要做额外的操作, 直接返回即可. // 思考, 这里能否使用 wait 进行进程等待? // 如果使用 wait , 会导致父进程不能快速再次调用到 accept, 仍然没法处理多个请求 // [注意!!] 父进程需要关闭 new_sock new_sock.Close(); return; } else if (ret == 0) { // child // 处理具体的连接过程. 每个连接一个子进程 for (;;) { std::string req; bool ret = new_sock.Recv(&req); if (!ret) { // 当前的请求处理完了, 可以退出子进程了. 注意, socket 的关闭在析构函数中就完成了 printf("[client %s:%d] disconnected!\n", ip.c_str(), port); exit(0); } std::string resp; handler(req, &resp); new_sock.Send(resp); printf("[client %s:%d] req: %s, resp: %s\n", ip.c_str(), port, req.c_str(), resp.c_str()); } } else { perror("fork"); } } bool Start(Handler handler) { // 1. 创建 socket; CHECK_RET(listen_sock_.Socket()); // 2. 绑定端口号 CHECK_RET(listen_sock_.Bind(ip_, port_)); // 3. 进行监听 CHECK_RET(listen_sock_.Listen(5)); // 4. 进入事件循环 for (;;) { // 5. 进行 accept TcpSocket new_sock; std::string ip; uint16_t port = 0; if (!listen_sock_.Accept(&new_sock, &ip, &port)) { continue; } printf("[client %s:%d] connect!\n", ip.c_str(), port); ProcessConnect(new_sock, ip, port, handler); } return true; } private: TcpSocket listen_sock_; std::string ip_; uint64_t port_; };
dict_server.cc 稍加修改
将 TcpServer 类改成 TcpProcessServer 类即可
4. 简单的TCP网络程序(多线程版本)
通过每个请求 , 创建一个线程的方式来支持多连接 ;tcp_thread_server.hpp:#pragma once #include <functional> #include <pthread.h> #include "tcp_socket.hpp" typedef std::function<void(const std::string &, std::string *)> Handler; struct ThreadArg { TcpSocket new_sock; std::string ip; uint16_t port; Handler handler; }; class TcpThreadServer { public: TcpThreadServer(const std::string &ip, uint16_t port) : ip_(ip), port_(port) { } bool Start(Handler handler) { // 1. 创建 socket; CHECK_RET(listen_sock_.Socket()); // 2. 绑定端口号 CHECK_RET(listen_sock_.Bind(ip_, port_)); // 3. 进行监听 CHECK_RET(listen_sock_.Listen(5)); // 4. 进入循环 for (;;) { // 5. 进行 accept ThreadArg *arg = new ThreadArg(); arg->handler = handler; bool ret = listen_sock_.Accept(&arg->new_sock, &arg->ip, &arg->port); if (!ret) { continue; } printf("[client %s:%d] connect\n", arg->ip.c_str(), arg->port); // 6. 创建新的线程完成具体操作 pthread_t tid; pthread_create(&tid, NULL, ThreadEntry, arg); pthread_detach(tid); } return true; } // 这里的成员函数为啥非得是 static? static void *ThreadEntry(void *arg) { // C++ 的四种类型转换都是什么? ThreadArg *p = reinterpret_cast<ThreadArg *>(arg); ProcessConnect(p); // 一定要记得释放内存!!! 也要记得关闭文件描述符 p->new_sock.Close(); delete p; return NULL; } // 处理单次连接. 这个函数也得是 static static void ProcessConnect(ThreadArg *arg) { // 1. 循环进行读写 for (;;) { std::string req; // 2. 读取请求 bool ret = arg->new_sock.Recv(&req); if (!ret) { printf("[client %s:%d] disconnected!\n", arg->ip.c_str(), arg->port); break; } std::string resp; // 3. 根据请求计算响应 arg->handler(req, &resp); // 4. 发送响应 arg->new_sock.Send(resp); printf("[client %s:%d] req: %s, resp: %s\n", arg->ip.c_str(), arg->port, req.c_str(), resp.c_str()); } } private: TcpSocket listen_sock_; std::string ip_; uint16_t port_; };
Makefile:
.PHONY:all all:dictclient entoch dictclient: dict_client.cc g++ -o $@ $^ -std=c++11 entoch:EnToCh.cc g++ -o $@ $^ -std=c++11 .PHONY:clean clean: rm -f dictclient entoch
5. 线程池版本的 TCP 服务器
根据之前学过的线程池 , 自行修改服务器为线程池版本 .5.1 TCP协议通讯流程
谈恋爱例子下图是基于 TCP 协议的客户端 / 服务器程序的一般流程 :5.1.1 服务器初始化
- 调用socket, 创建文件描述符;
- 调用bind, 将当前的文件描述符和ip/port绑定在一起; 如果这个端口已经被其他进程占用了, 就会bind失败;
- 调用listen, 声明当前这个文件描述符作为一个服务器的文件描述符, 为后面的accept做好准备;
- 调用accecpt, 并阻塞, 等待客户端连接过来;
5.1.2 建立连接的过程
- 调用socket, 创建文件描述符;
- 调用connect, 向服务器发起连接请求;
- connect会发出SYN段并阻塞等待服务器应答; (第一次)
- 服务器收到客户端的SYN, 会应答一个SYN-ACK段表示"同意建立连接"; (第二次)
- 客户端收到SYN-ACK后会从connect()返回, 同时应答一个ACK段; (第三次)
这个建立连接的过程 , 通常称为 三次握手 ;5.1.3 数据传输的过程
- 建立连接后,TCP协议提供全双工的通信服务; 所谓全双工的意思是, 在同一条连接中, 同一时刻, 通信双方可以同时写数据; 相对的概念叫做半双工, 同一条连接在同一时刻, 只能由一方来写数据;
- 服务器从accept()返回后立刻调 用read(), 读socket就像读管道一样, 如果没有数据到达就阻塞等待;
- 这时客户端调用write()发送请求给服务器, 服务器收到后从read()返回,对客户端的请求进行处理, 在此期间客户端调用read()阻塞等待服务器的应答;
- 服务器调用write()将处理结果发回给客户端, 再次调用read()阻塞等待下一条请求;
- 客户端收到后从read()返回, 发送下一条请求,如此循环下去;
5.1.4 断开连接的过程
- 如果客户端没有更多的请求了, 就调用close()关闭连接, 客户端会向服务器发送FIN段(第一次);
- 此时服务器收到FIN后, 会回应一个ACK, 同时read会返回0 (第二次);
- read返回之后, 服务器就知道客户端关闭了连接, 也调用close关闭连接, 这个时候服务器会向客户端发送一个FIN; (第三次)
- 客户端收到FIN, 再返回一个ACK给服务器; (第四次)
这个断开连接的过程 , 通常称为 四次挥手在学习 socket API 时要注意应用程序和 TCP 协议层是如何交互的 :
- 应用程序调用某个socket函数时TCP协议层完成什么动作,比如调用connect()会发出SYN段
- 应用程序如何知道TCP协议层的状态变化,比如从某个阻塞的socket函数返回就表明TCP协议收到了某些段,再比如read()返回0就表明收到了FIN段
6. TCP 和 UDP 对比
- 可靠传输 vs 不可靠传输
- 有连接 vs 无连接
- 字节流 vs 数据报
7. TCP网络编程(多版本、多功能)
版本1:
运行演示:
serverTcp.cc:
#include "util.hpp" class ServerTcp { public: ServerTcp(uint16_t port, const std::string &ip = "") : port_(port), ip_(ip), sock_(-1) { } ~ServerTcp() { } public: void init() { // 1. 创建socket sock_ = socket(PF_INET, SOCK_STREAM, 0); if (sock_ < 0) { logMessage(FATAL, "socket: %s", strerror(errno)); exit(SOCKET_ERR); } logMessage(DEBUG, "socket: %s, sock_: %d", strerror(errno), sock_); // 2. bind绑定 // 2.1 填充服务器信息 struct sockaddr_in local; // 用户栈 memset(&local, 0, sizeof local); local.sin_family = PF_INET; //进行网络通信 local.sin_port = htons(port_); ip_.empty() ? (local.sin_addr.s_addr = INADDR_ANY) : (inet_aton(ip_.c_str(), &local.sin_addr)); // 2.2 本地socket信息,写入sock_对应的内核区域 if (bind(sock_, (const struct sockaddr *)&local, sizeof local) < 0) { logMessage(FATAL, "bind: %s", strerror(errno)); exit(BIND_ERR); } logMessage(DEBUG, "bind: %s, sock_: %d", strerror(errno), sock_); // 3. 监听socket,为何要监听呢?tcp是面向连接的! if (listen(sock_, 5 /*后面再说*/) < 0) { logMessage(FATAL, "listen: %s", strerror(errno)); exit(LISTEN_ERR); } logMessage(DEBUG, "listen: %s, sock_: %d", strerror(errno), sock_); // 允许别人来连接你了 } void loop() { while (true) { // 4. 获取连接, accept 的返回值是一个新的socket fd ?? // logMessage(DEBUG, "server 提供 service start ..."); // sleep(1); } } private: // sock int sock_; // port uint16_t port_; // ip std::string ip_; }; int main() { ServerTcp svr(8080); svr.init(); svr.loop(); return 0; }
clientTcp.cc:
#include "util.hpp" int main() { return 0; }
log.hpp:
#pragma once #include <cstdio> #include <ctime> #include <cstdarg> #include <cassert> #include <cstring> #include <cerrno> #include <stdlib.h> #define DEBUG 0 #define NOTICE 1 #define WARINING 2 #define FATAL 3 const char *log_level[]={"DEBUG", "NOTICE", "WARINING", "FATAL"}; // logMessage(DEBUG, "%d", 10); void logMessage(int level, const char *format, ...) { assert(level >= DEBUG); assert(level <= FATAL); char *name = getenv("USER"); char logInfo[1024]; va_list ap; // ap -> char* va_start(ap, format); vsnprintf(logInfo, sizeof(logInfo)-1, format, ap); va_end(ap); // ap = NULL FILE *out = (level == FATAL) ? stderr:stdout; fprintf(out, "%s | %u | %s | %s\n", \ log_level[level], \ (unsigned int)time(nullptr),\ name == nullptr ? "unknow":name,\ logInfo); // char *s = format; // while(s){ // case '%': // if(*(s+1) == 'd') int x = va_arg(ap, int); // break; // } }
util.hpp:
#pragma once #include <iostream> #include <string> #include <cstring> #include <cstdlib> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include "log.hpp" #define SOCKET_ERR 1 #define BIND_ERR 2 #define LISTEN_ERR 3
makefile:
.PHONY:all all:clientTcp serverTcp clientTcp: clientTcp.cc g++ -o $@ $^ -std=c++11 serverTcp:serverTcp.cc g++ -o $@ $^ -std=c++11 .PHONY:clean clean: rm -f serverTcp clientTcp
版本2:
执行演示:
serverTcp.cc:
#include "util.hpp" #include <signal.h> #include <sys/types.h> #include <sys/wait.h> #include <pthread.h> class ServerTcp; // 申明一下ServerTcp class ThreadData { public: uint16_t clientPort_; std::string clinetIp_; int sock_; ServerTcp *this_; public: ThreadData(uint16_t port, std::string ip, int sock, ServerTcp *ts) : clientPort_(port), clinetIp_(ip), sock_(sock),this_(ts) {} }; class ServerTcp { public: ServerTcp(uint16_t port, const std::string &ip = "") : port_(port), ip_(ip), listenSock_(-1) { } ~ServerTcp() { } public: void init() { // 1. 创建socket listenSock_ = socket(PF_INET, SOCK_STREAM, 0); if (listenSock_ < 0) { logMessage(FATAL, "socket: %s", strerror(errno)); exit(SOCKET_ERR); } logMessage(DEBUG, "socket: %s, %d", strerror(errno), listenSock_); // 2. bind绑定 // 2.1 填充服务器信息 struct sockaddr_in local; // 用户栈 memset(&local, 0, sizeof local); local.sin_family = PF_INET; local.sin_port = htons(port_); ip_.empty() ? (local.sin_addr.s_addr = INADDR_ANY) : (inet_aton(ip_.c_str(), &local.sin_addr)); // 2.2 本地socket信息,写入sock_对应的内核区域 if (bind(listenSock_, (const struct sockaddr *)&local, sizeof local) < 0) { logMessage(FATAL, "bind: %s", strerror(errno)); exit(BIND_ERR); } logMessage(DEBUG, "bind: %s, %d", strerror(errno), listenSock_); // 3. 监听socket,为何要监听呢?tcp是面向连接的! if (listen(listenSock_, 5 /*后面再说*/) < 0) { logMessage(FATAL, "listen: %s", strerror(errno)); exit(LISTEN_ERR); } logMessage(DEBUG, "listen: %s, %d", strerror(errno), listenSock_); // 允许别人来连接你了 } static void *threadRoutine(void *args) { pthread_detach(pthread_self()); //设置线程分离 ThreadData *td = static_cast<ThreadData*>(args); td->this_->transService(td->sock_, td->clinetIp_, td->clientPort_); delete td; return nullptr; } void loop() { // signal(SIGCHLD, SIG_IGN); // only Linux while (true) { struct sockaddr_in peer; socklen_t len = sizeof(peer); // 4. 获取连接, accept 的返回值是一个新的socket fd ?? int serviceSock = accept(listenSock_, (struct sockaddr *)&peer, &len); if (serviceSock < 0) { // 获取链接失败 logMessage(WARINING, "accept: %s[%d]", strerror(errno), serviceSock); continue; } // 4.1 获取客户端基本信息 uint16_t peerPort = ntohs(peer.sin_port); std::string peerIp = inet_ntoa(peer.sin_addr); logMessage(DEBUG, "accept: %s | %s[%d], socket fd: %d", strerror(errno), peerIp.c_str(), peerPort, serviceSock); // 5 提供服务, echo -> 小写 -> 大写 // 5.0 v0 版本 -- 单进程 -- 一旦进入transService,主执行流,就无法进行向后执行,只能提供完毕服务之后才能进行accept //transService(serviceSock, peerIp, peerPort); // 5.1 v1 版本 -- 多进程版本 -- 父进程打开的文件会被子进程继承吗?会的 // pid_t id = fork(); // assert(id != -1); // if(id == 0) // { // close(listenSock_); //建议 // //子进程 // transService(serviceSock, peerIp, peerPort); // exit(0); // 进入僵尸 // } // // 父进程 // close(serviceSock); //这一步是一定要做的! // 5.1 v1.1 版本 -- 多进程版本 -- 也是可以的 // 爷爷进程 // pid_t id = fork(); // if(id == 0) // { // // 爸爸进程 // close(listenSock_);//建议 // // 又进行了一次fork,让 爸爸进程 // if(fork() > 0) exit(0); // // 孙子进程 -- 就没有爸爸 -- 孤儿进程 -- 被系统领养 -- 回收问题就交给了系统来回收 // transService(serviceSock, peerIp, peerPort); // exit(0); // } // // 父进程 // close(serviceSock); //这一步是一定要做的! // // 爸爸进程直接终止,立马得到退出码,释放僵尸进程状态 // pid_t ret = waitpid(id, nullptr, 0); //就用阻塞式 // assert(ret > 0); // (void)ret; // 5.2 v2 版本 -- 多线程 // 这里不需要进行关闭文件描述符吗??不需要啦 // 多线程是会共享文件描述符表的! ThreadData *td = new ThreadData(peerPort, peerIp, serviceSock, this); pthread_t tid; pthread_create(&tid, nullptr, threadRoutine, (void*)td); // waitpid(); 默认是阻塞等待!WNOHANG // 方案1 // logMessage(DEBUG, "server 提供 service start ..."); // sleep(1); } } // 大小写转化服务 // TCP && UDP: 支持全双工 void transService(int sock, const std::string &clientIp, uint16_t clientPort) { assert(sock >= 0); assert(!clientIp.empty()); assert(clientPort >= 1024); char inbuffer[BUFFER_SIZE]; while (true) { ssize_t s = read(sock, inbuffer, sizeof(inbuffer) - 1); //我们认为我们读到的都是字符串 if (s > 0) { // read success inbuffer[s] = '\0'; if(strcasecmp(inbuffer, "quit") == 0) { logMessage(DEBUG, "client quit -- %s[%d]", clientIp.c_str(), clientPort); break; } logMessage(DEBUG, "trans before: %s[%d]>>> %s", clientIp.c_str(), clientPort, inbuffer); // 可以进行大小写转化了 for(int i = 0; i < s; i++) { if(isalpha(inbuffer[i]) && islower(inbuffer[i])) inbuffer[i] = toupper(inbuffer[i]); } logMessage(DEBUG, "trans after: %s[%d]>>> %s", clientIp.c_str(), clientPort, inbuffer); write(sock, inbuffer, strlen(inbuffer)); } else if (s == 0) { // pipe: 读端一直在读,写端不写了,并且关闭了写端,读端会如何?s == 0,代表对端关闭 // s == 0: 代表对方关闭,client 退出 logMessage(DEBUG, "client quit -- %s[%d]", clientIp.c_str(), clientPort); break; } else { logMessage(DEBUG, "%s[%d] - read: %s", clientIp.c_str(), clientPort, strerror(errno)); break; } } // 只要走到这里,一定是client退出了,服务到此结束 close(sock); // 如果一个进程对应的文件fd,打开了没有被归还,文件描述符泄漏! logMessage(DEBUG, "server close %d done", sock); } private: // sock int listenSock_; // port uint16_t port_; // ip std::string ip_; }; static void Usage(std::string proc) { std::cerr << "Usage:\n\t" << proc << " port ip" << std::endl; std::cerr << "example:\n\t" << proc << " 8080 127.0.0.1\n" << std::endl; } // ./ServerTcp local_port local_ip int main(int argc, char *argv[]) { if(argc != 2 && argc != 3 ) { Usage(argv[0]); exit(USAGE_ERR); } uint16_t port = atoi(argv[1]); std::string ip; if(argc == 3) ip = argv[2]; ServerTcp svr(port, ip); svr.init(); svr.loop(); return 0; }
clientTcp.cc:
#include "util.hpp" // 2. 需要bind吗??需要,但是不需要自己显示的bind! 不要自己bind!!!! // 3. 需要listen吗?不需要的! // 4. 需要accept吗?不需要的! volatile bool quit = false; static void Usage(std::string proc) { std::cerr << "Usage:\n\t" << proc << " serverIp serverPort" << std::endl; std::cerr << "Example:\n\t" << proc << " 127.0.0.1 8081\n" << std::endl; } // ./clientTcp serverIp serverPort int main(int argc, char *argv[]) { if (argc != 3) { Usage(argv[0]); exit(USAGE_ERR); } std::string serverIp = argv[1]; uint16_t serverPort = atoi(argv[2]); // 1. 创建socket SOCK_STREAM int sock = socket(AF_INET, SOCK_STREAM, 0); if (sock < 0) { std::cerr << "socket: " << strerror(errno) << std::endl; exit(SOCKET_ERR); } // 2. connect,发起链接请求,你想谁发起请求呢??当然是向服务器发起请求喽 // 2.1 先填充需要连接的远端主机的基本信息 struct sockaddr_in server; memset(&server, 0, sizeof(server)); server.sin_family = AF_INET; server.sin_port = htons(serverPort); inet_aton(serverIp.c_str(), &server.sin_addr); // 2.2 发起请求,connect 会自动帮我们进行bind! if (connect(sock, (const struct sockaddr *)&server, sizeof(server)) != 0) { std::cerr << "connect: " << strerror(errno) << std::endl; exit(CONN_ERR); } std::cout << "info : connect success: " << sock << std::endl; std::string message; while (!quit) { message.clear(); std::cout << "请输入你的消息>>> "; std::getline(std::cin, message); if (strcasecmp(message.c_str(), "quit") == 0) quit = true; ssize_t s = write(sock, message.c_str(), message.size()); if (s > 0) { message.resize(1024); ssize_t s = read(sock, (char *)(message.c_str()), 1024); if (s > 0) message[s] = 0; std::cout << "Server Echo>>> " << message << std::endl; } else if (s <= 0) { break; } } close(sock); return 0; }
log.hpp:
#pragma once #include <cstdio> #include <ctime> #include <cstdarg> #include <cassert> #include <cstring> #include <cerrno> #include <stdlib.h> #define DEBUG 0 #define NOTICE 1 #define WARINING 2 #define FATAL 3 const char *log_level[]={"DEBUG", "NOTICE", "WARINING", "FATAL"}; // logMessage(DEBUG, "%d", 10); void logMessage(int level, const char *format, ...) { assert(level >= DEBUG); assert(level <= FATAL); char *name = getenv("USER"); char logInfo[1024]; va_list ap; // ap -> char* va_start(ap, format); vsnprintf(logInfo, sizeof(logInfo)-1, format, ap); va_end(ap); // ap = NULL FILE *out = (level == FATAL) ? stderr:stdout; fprintf(out, "%s | %u | %s | %s\n", \ log_level[level], \ (unsigned int)time(nullptr),\ name == nullptr ? "unknow":name,\ logInfo); // char *s = format; // while(s){ // case '%': // if(*(s+1) == 'd') int x = va_arg(ap, int); // break; // } }
util.hpp:
#pragma once #include <iostream> #include <string> #include <cstring> #include <cstdlib> #include <cassert> #include <ctype.h> #include <unistd.h> #include <strings.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include "log.hpp" #define SOCKET_ERR 1 #define BIND_ERR 2 #define LISTEN_ERR 3 #define USAGE_ERR 4 #define CONN_ERR 5 #define BUFFER_SIZE 1024
makefile:
.PHONY:all all:clientTcp serverTcp clientTcp: clientTcp.cc g++ -o $@ $^ -std=c++11 serverTcp:serverTcp.cc g++ -o $@ $^ -std=c++11 -lpthread .PHONY:clean clean: rm -f serverTcp clientTcp
版本3:
一般以服务器方式工作,对外提供服务的服务器,都是以守护进程(精灵进程)的方式在服务器中工作的,一旦启动之后,除非用户主动关闭。守护进程需要自己编写。
必须调用一个函数setsid(),用于将调用进程设置成为独立的会话,但进程组的组长(一般为第一个进程),不能调用setsid().因为一旦其调用就成为新的其他会话,那原来的进程组组员就没有了“领导”。
如何不成为组长? 可以作为进程组内的第二个进程!常规做法,fork()子进程,子进程就不再是组长进程,就可以成功调用setsid()。
serverTcp.cc:
#include "util.hpp" #include "Task.hpp" #include "ThreadPool.hpp" #include "daemonize.hpp" #include <signal.h> #include <sys/types.h> #include <sys/wait.h> #include <pthread.h> class ServerTcp; // 申明一下ServerTcp // 大小写转化服务 // TCP && UDP: 支持全双工 void transService(int sock, const std::string &clientIp, uint16_t clientPort) { assert(sock >= 0); assert(!clientIp.empty()); assert(clientPort >= 1024); char inbuffer[BUFFER_SIZE]; while (true) { ssize_t s = read(sock, inbuffer, sizeof(inbuffer) - 1); //我们认为我们读到的都是字符串 if (s > 0) { // read success inbuffer[s] = '\0'; if (strcasecmp(inbuffer, "quit") == 0) { logMessage(DEBUG, "client quit -- %s[%d]", clientIp.c_str(), clientPort); break; } logMessage(DEBUG, "trans before: %s[%d]>>> %s", clientIp.c_str(), clientPort, inbuffer); // 可以进行大小写转化了 for (int i = 0; i < s; i++) { if (isalpha(inbuffer[i]) && islower(inbuffer[i])) inbuffer[i] = toupper(inbuffer[i]); } logMessage(DEBUG, "trans after: %s[%d]>>> %s", clientIp.c_str(), clientPort, inbuffer); write(sock, inbuffer, strlen(inbuffer)); } else if (s == 0) { // pipe: 读端一直在读,写端不写了,并且关闭了写端,读端会如何?s == 0,代表对端关闭 // s == 0: 代表对方关闭,client 退出 logMessage(DEBUG, "client quit -- %s[%d]", clientIp.c_str(), clientPort); break; } else { logMessage(DEBUG, "%s[%d] - read: %s", clientIp.c_str(), clientPort, strerror(errno)); break; } } // 只要走到这里,一定是client退出了,服务到此结束 close(sock); // 如果一个进程对应的文件fd,打开了没有被归还,文件描述符泄漏! logMessage(DEBUG, "server close %d done", sock); } void execCommand(int sock, const std::string &clientIp, uint16_t clientPort) { assert(sock >= 0); assert(!clientIp.empty()); assert(clientPort >= 1024); char command[BUFFER_SIZE]; while (true) { ssize_t s = read(sock, command, sizeof(command) - 1); //我们认为我们读到的都是字符串 if (s > 0) { command[s] = '\0'; logMessage(DEBUG, "[%s:%d] exec [%s]", clientIp.c_str(), clientPort, command); // 考虑安全 std::string safe = command; if((std::string::npos != safe.find("rm")) || (std::string::npos != safe.find("unlink"))) { break; } // 我们是以r方式打开的文件,没有写入 // 所以我们无法通过dup的方式得到对应的结果 FILE *fp = popen(command, "r"); if(fp == nullptr) { logMessage(WARINING, "exec %s failed, beacuse: %s", command, strerror(errno)); break; } char line[1024]; while(fgets(line, sizeof(line)-1, fp) != nullptr) { write(sock, line, strlen(line)); } // dup2(fd, 1); // dup2(sock, fp->_fileno); // fflush(fp); pclose(fp); logMessage(DEBUG, "[%s:%d] exec [%s] ... done", clientIp.c_str(), clientPort, command); } else if (s == 0) { // pipe: 读端一直在读,写端不写了,并且关闭了写端,读端会如何?s == 0,代表对端关闭 // s == 0: 代表对方关闭,client 退出 logMessage(DEBUG, "client quit -- %s[%d]", clientIp.c_str(), clientPort); break; } else { logMessage(DEBUG, "%s[%d] - read: %s", clientIp.c_str(), clientPort, strerror(errno)); break; } } // 只要走到这里,一定是client退出了,服务到此结束 close(sock); // 如果一个进程对应的文件fd,打开了没有被归还,文件描述符泄漏! logMessage(DEBUG, "server close %d done", sock); } class ThreadData { public: uint16_t clientPort_; std::string clinetIp_; int sock_; ServerTcp *this_; public: ThreadData(uint16_t port, std::string ip, int sock, ServerTcp *ts) : clientPort_(port), clinetIp_(ip), sock_(sock), this_(ts) { } }; class ServerTcp { public: ServerTcp(uint16_t port, const std::string &ip = "") : port_(port), ip_(ip), listenSock_(-1), tp_(nullptr) { } ~ServerTcp() { } public: void init() { // 1. 创建socket listenSock_ = socket(PF_INET, SOCK_STREAM, 0); if (listenSock_ < 0) { logMessage(FATAL, "socket: %s", strerror(errno)); exit(SOCKET_ERR); } logMessage(DEBUG, "socket: %s, %d", strerror(errno), listenSock_); // 2. bind绑定 // 2.1 填充服务器信息 struct sockaddr_in local; // 用户栈 memset(&local, 0, sizeof local); local.sin_family = PF_INET; local.sin_port = htons(port_); ip_.empty() ? (local.sin_addr.s_addr = INADDR_ANY) : (inet_aton(ip_.c_str(), &local.sin_addr)); // 2.2 本地socket信息,写入sock_对应的内核区域 if (bind(listenSock_, (const struct sockaddr *)&local, sizeof local) < 0) { logMessage(FATAL, "bind: %s", strerror(errno)); exit(BIND_ERR); } logMessage(DEBUG, "bind: %s, %d", strerror(errno), listenSock_); // 3. 监听socket,为何要监听呢?tcp是面向连接的! if (listen(listenSock_, 5 /*后面再说*/) < 0) { logMessage(FATAL, "listen: %s", strerror(errno)); exit(LISTEN_ERR); } logMessage(DEBUG, "listen: %s, %d", strerror(errno), listenSock_); // 运行别人来连接你了 // 4. 加载线程池 tp_ = ThreadPool<Task>::getInstance(); } // static void *threadRoutine(void *args) // { // pthread_detach(pthread_self()); //设置线程分离 // ThreadData *td = static_cast<ThreadData *>(args); // td->this_->transService(td->sock_, td->clinetIp_, td->clientPort_); // delete td; // return nullptr; // } void loop() { // signal(SIGCHLD, SIG_IGN); // only Linux tp_->start(); logMessage(DEBUG, "thread pool start success, thread num: %d", tp_->threadNum()); while (true) { struct sockaddr_in peer; socklen_t len = sizeof(peer); // 4. 获取连接, accept 的返回值是一个新的socket fd ?? // 4.1 listenSock_: 监听 && 获取新的链接-> sock // 4.2 serviceSock: 给用户提供新的socket服务 int serviceSock = accept(listenSock_, (struct sockaddr *)&peer, &len); if (serviceSock < 0) { // 获取链接失败 logMessage(WARINING, "accept: %s[%d]", strerror(errno), serviceSock); continue; } // 4.1 获取客户端基本信息 uint16_t peerPort = ntohs(peer.sin_port); std::string peerIp = inet_ntoa(peer.sin_addr); logMessage(DEBUG, "accept: %s | %s[%d], socket fd: %d", strerror(errno), peerIp.c_str(), peerPort, serviceSock); // 5 提供服务, echo -> 小写 -> 大写 // 5.0 v0 版本 -- 单进程 -- 一旦进入transService,主执行流,就无法进行向后执行,只能提供完毕服务之后才能进行accept // transService(serviceSock, peerIp, peerPort); // 5.1 v1 版本 -- 多进程版本 -- 父进程打开的文件会被子进程继承吗?会的 // pid_t id = fork(); // assert(id != -1); // if(id == 0) // { // close(listenSock_); //建议 // //子进程 // transService(serviceSock, peerIp, peerPort); // exit(0); // 进入僵尸 // } // // 父进程 // close(serviceSock); //这一步是一定要做的! // 5.1 v1.1 版本 -- 多进程版本 -- 也是可以的 // 爷爷进程 // pid_t id = fork(); // if(id == 0) // { // // 爸爸进程 // close(listenSock_);//建议 // // 又进行了一次fork,让 爸爸进程 // if(fork() > 0) exit(0); // // 孙子进程 -- 就没有爸爸 -- 孤儿进程 -- 被系统领养 -- 回收问题就交给了系统来回收 // transService(serviceSock, peerIp, peerPort); // exit(0); // } // // 父进程 // close(serviceSock); //这一步是一定要做的! // // 爸爸进程直接终止,立马得到退出码,释放僵尸进程状态 // pid_t ret = waitpid(id, nullptr, 0); //就用阻塞式 // assert(ret > 0); // (void)ret; // 5.2 v2 版本 -- 多线程 // 这里不需要进行关闭文件描述符吗??不需要啦 // 多线程是会共享文件描述符表的! // ThreadData *td = new ThreadData(peerPort, peerIp, serviceSock, this); // pthread_t tid; // pthread_create(&tid, nullptr, threadRoutine, (void*)td); // 5.3 v3 版本 --- 线程池版本 // 5.3.1 构建任务 // 5.3 v3.1 // Task t(serviceSock, peerIp, peerPort, std::bind(&ServerTcp::transService, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); // tp_->push(t); // 5.3 v3.2 // Task t(serviceSock, peerIp, peerPort, transService); // tp_->push(t); // 5.3 v3.3 Task t(serviceSock, peerIp, peerPort, execCommand); tp_->push(t); // waitpid(); 默认是阻塞等待!WNOHANG // 方案1 // logMessage(DEBUG, "server 提供 service start ..."); // sleep(1); } } private: // sock int listenSock_; // port uint16_t port_; // ip std::string ip_; // 引入线程池 ThreadPool<Task> *tp_; }; static void Usage(std::string proc) { std::cerr << "Usage:\n\t" << proc << " port ip" << std::endl; std::cerr << "example:\n\t" << proc << " 8080 127.0.0.1\n" << std::endl; } // ./ServerTcp local_port local_ip int main(int argc, char *argv[]) { if (argc != 2 && argc != 3) { Usage(argv[0]); exit(USAGE_ERR); } uint16_t port = atoi(argv[1]); std::string ip; if (argc == 3) ip = argv[2]; daemonize(); // 我们的进程就会成为守护进程 ServerTcp svr(port, ip); svr.init(); svr.loop(); return 0; }
clientTcp.cc:
#include "util.hpp" // 2. 需要bind吗??需要,但是不需要自己显示的bind! 不要自己bind!!!! // 3. 需要listen吗?不需要的! // 4. 需要accept吗?不需要的! volatile bool quit = false; static void Usage(std::string proc) { std::cerr << "Usage:\n\t" << proc << " serverIp serverPort" << std::endl; std::cerr << "Example:\n\t" << proc << " 127.0.0.1 8081\n" << std::endl; } // ./clientTcp serverIp serverPort int main(int argc, char *argv[]) { if (argc != 3) { Usage(argv[0]); exit(USAGE_ERR); } std::string serverIp = argv[1]; uint16_t serverPort = atoi(argv[2]); // 1. 创建socket SOCK_STREAM int sock = socket(AF_INET, SOCK_STREAM, 0); if (sock < 0) { std::cerr << "socket: " << strerror(errno) << std::endl; exit(SOCKET_ERR); } // 2. connect,发起链接请求,你想谁发起请求呢??当然是向服务器发起请求喽 // 2.1 先填充需要连接的远端主机的基本信息 struct sockaddr_in server; memset(&server, 0, sizeof(server)); server.sin_family = AF_INET; server.sin_port = htons(serverPort); inet_aton(serverIp.c_str(), &server.sin_addr); // 2.2 发起请求,connect 会自动帮我们进行bind! if (connect(sock, (const struct sockaddr *)&server, sizeof(server)) != 0) { std::cerr << "connect: " << strerror(errno) << std::endl; exit(CONN_ERR); } std::cout << "info : connect success: " << sock << std::endl; std::string message; while (!quit) { message.clear(); std::cout << "请输入你的消息>>> "; std::getline(std::cin, message); // 结尾不会有\n if (strcasecmp(message.c_str(), "quit") == 0) quit = true; ssize_t s = write(sock, message.c_str(), message.size()); if (s > 0) { message.resize(1024); ssize_t s = read(sock, (char *)(message.c_str()), 1024); if (s > 0) message[s] = 0; std::cout << "Server Echo>>> " << message << std::endl; } else if (s <= 0) { break; } } close(sock); return 0; }
log.hpp:
#pragma once #include <cstdio> #include <ctime> #include <cstdarg> #include <cassert> #include <cassert> #include <cstring> #include <cerrno> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #define DEBUG 0 #define NOTICE 1 #define WARINING 2 #define FATAL 3 const char *log_level[] = {"DEBUG", "NOTICE", "WARINING", "FATAL"}; #define LOGFILE "serverTcp.log" // logMessage(DEBUG, "%d", 10); void logMessage(int level, const char *format, ...) { assert(level >= DEBUG); assert(level <= FATAL); char *name = getenv("USER"); char logInfo[1024]; va_list ap; // ap -> char* va_start(ap, format); vsnprintf(logInfo, sizeof(logInfo) - 1, format, ap); va_end(ap); // ap = NULL // 每次打开太麻烦 umask(0); int fd = open(LOGFILE, O_WRONLY | O_CREAT | O_APPEND, 0666); assert(fd >= 0); FILE *out = (level == FATAL) ? stderr : stdout; dup2(fd, 1); dup2(fd, 2); fprintf(out, "%s | %u | %s | %s\n", log_level[level], (unsigned int)time(nullptr), name == nullptr ? "unknow" : name, logInfo); fflush(out); // 将C缓冲区中的数据刷新到OS fsync(fd); // 将OS中的数据尽快刷盘 close(fd); // char *s = format; // while(s){ // case '%': // if(*(s+1) == 'd') int x = va_arg(ap, int); // break; // } }
ThreadPool.hpp:
#pragma once #include <iostream> #include <cassert> #include <queue> #include <memory> #include <cstdlib> #include <pthread.h> #include <unistd.h> #include <sys/prctl.h> #include "Lock.hpp" using namespace std; int gThreadNum = 15; template <class T> class ThreadPool { private: ThreadPool(int threadNum = gThreadNum) : threadNum_(threadNum), isStart_(false) { assert(threadNum_ > 0); pthread_mutex_init(&mutex_, nullptr); pthread_cond_init(&cond_, nullptr); } ThreadPool(const ThreadPool<T> &) = delete; void operator=(const ThreadPool<T>&) = delete; public: static ThreadPool<T> *getInstance() { static Mutex mutex; if (nullptr == instance) //仅仅是过滤重复的判断 { LockGuard lockguard(&mutex); //进入代码块,加锁。退出代码块,自动解锁 if (nullptr == instance) { instance = new ThreadPool<T>(); } } return instance; } //类内成员, 成员函数,都有默认参数this static void *threadRoutine(void *args) { pthread_detach(pthread_self()); ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args); // prctl(PR_SET_NAME, "follower"); // 更改线程名称 while (1) { tp->lockQueue(); while (!tp->haveTask()) { tp->waitForTask(); } //这个任务就被拿到了线程的上下文中 T t = tp->pop(); tp->unlockQueue(); t(); // 让指定的先处理这个任务 } } void start() { assert(!isStart_); for (int i = 0; i < threadNum_; i++) { pthread_t temp; pthread_create(&temp, nullptr, threadRoutine, this); } isStart_ = true; } void push(const T &in) { lockQueue(); taskQueue_.push(in); choiceThreadForHandler(); unlockQueue(); } ~ThreadPool() { pthread_mutex_destroy(&mutex_); pthread_cond_destroy(&cond_); } int threadNum() { return threadNum_; } private: void lockQueue() { pthread_mutex_lock(&mutex_); } void unlockQueue() { pthread_mutex_unlock(&mutex_); } bool haveTask() { return !taskQueue_.empty(); } void waitForTask() { pthread_cond_wait(&cond_, &mutex_); } void choiceThreadForHandler() { pthread_cond_signal(&cond_); } T pop() { T temp = taskQueue_.front(); taskQueue_.pop(); return temp; } private: bool isStart_; int threadNum_; queue<T> taskQueue_; pthread_mutex_t mutex_; pthread_cond_t cond_; static ThreadPool<T> *instance; // const static int a = 100; }; template <class T> ThreadPool<T> *ThreadPool<T>::instance = nullptr;
Lock.hpp:
#pragma once #include <iostream> #include <pthread.h> class Mutex { public: Mutex() { pthread_mutex_init(&lock_, nullptr); } void lock() { pthread_mutex_lock(&lock_); } void unlock() { pthread_mutex_unlock(&lock_); } ~Mutex() { pthread_mutex_destroy(&lock_); } private: pthread_mutex_t lock_; }; class LockGuard { public: LockGuard(Mutex *mutex) : mutex_(mutex) { mutex_->lock(); std::cout << "加锁成功..." << std::endl; } ~LockGuard() { mutex_->unlock(); std::cout << "解锁成功...." << std::endl; } private: Mutex *mutex_; };
Task.hpp:
#pragma once #include <iostream> #include <string> #include <functional> #include <pthread.h> #include "log.hpp" class Task { public: //等价于 // typedef std::function<void (int, std::string, uint16_t)> callback_t; using callback_t = std::function<void (int, std::string, uint16_t)>; private: int sock_; // 给用户提供IO服务的sock uint16_t port_; // client port std::string ip_; // client ip callback_t func_; // 回调方法 public: Task():sock_(-1), port_(-1) {} Task(int sock, std::string ip, uint16_t port, callback_t func) : sock_(sock), ip_(ip), port_(port), func_(func) {} void operator () () { logMessage(DEBUG, "线程ID[%p]处理%s:%d的请求 开始啦...",\ pthread_self(), ip_.c_str(), port_); func_(sock_, ip_, port_); logMessage(DEBUG, "线程ID[%p]处理%s:%d的请求 结束啦...",\ pthread_self(), ip_.c_str(), port_); } ~Task() {} };
util.hpp:
#pragma once #include <iostream> #include <string> #include <cstring> #include <cstdlib> #include <cassert> #include <ctype.h> #include <unistd.h> #include <strings.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include "log.hpp" #define SOCKET_ERR 1 #define BIND_ERR 2 #define LISTEN_ERR 3 #define USAGE_ERR 4 #define CONN_ERR 5 #define BUFFER_SIZE 1024
daemonize.hpp:
#pragma once #include <cstdio> #include <iostream> #include <signal.h> #include <unistd.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> void daemonize() { int fd = 0; // 1. 忽略SIGPIPE signal(SIGPIPE, SIG_IGN); // 2. 更改进程的工作目录 // chdir(); // 3. 让自己不要成为进程组组长 if (fork() > 0) exit(1); // 4. 设置自己是一个独立的会话 setsid(); // 5. 重定向0,1,2 //"/dev/null"——>Linux"文件黑洞/垃圾桶" if ((fd = open("/dev/null", O_RDWR)) != -1) // fd == 3 { dup2(fd, STDIN_FILENO); dup2(fd, STDOUT_FILENO); dup2(fd, STDERR_FILENO); // 6. 关闭掉不需要的fd if(fd > STDERR_FILENO) close(fd); } }
makefile:
.PHONY:all all:clientTcp serverTcpd clientTcp: clientTcp.cc g++ -o $@ $^ -std=c++11 serverTcpd:serverTcp.cc g++ -o $@ $^ -std=c++11 -lpthread .PHONY:clean clean: rm -f serverTcpd clientTcp
后记:
●由于作者水平有限,文章难免存在谬误之处,敬请读者斧正,俚语成篇,恳望指教!
——By 作者:新晓·故知