前面的文章中我们使用UDP套接字编写了四个版本,不同的UDP服务器,在本文中我们将要对TCP套接字进行几个不同的版本的代码的编写,首先测试一下TCP套接字的代码,然后是将这个版本进行修改成多进程版本的,再将代码修改成多线程版本的,最后在编写一个线程池版本的代码。
在编写TCP套接字之前我们会使用如下的一些API
socket() - int socket(int domain, int type, int protocol);
- socket()打开一个网络通信端口,如果成功的话,就像open()一样返回一个文件描述符 ;
- 应用程序可以像读写文件一样用read/write在网络上收发数据;
- 如果socket()出错则返回-1;
- 对于IPv4,第一个参数指定为AF_INET;
- 对于TCP协议,type参数指定为SOCK_ATREAM,表示面向流的传输协议;
- protocol参数指定为0即可
bind() - int bind(int socket, const struct sockaddr *address, socklen_t address_len);
- 服务器程序所监听的网络地址和端口号通常是固定不变的,客户端的值服务器的地址与端口号后就可以向服务器发起连接,服务器需要调用bind一个固定的网络地址与端口号;
- bind()成功返回0,失败返回-1;
- bind()的作用是将参数sockfd和myaddr绑定在一起,是sockfd这个网络通信的文件描述符监听myaddr所描述的地址和端口号;
- 前面讲过,struct sockaddr *是一个通用指针类型,myaddr参数实际上可以接受多种协议的sockaddr结构体,而它们的长度各不相同,所以需要第三个参数addrlen指定结构体的长度。
对于myaddr参数是这样初始化的:
bzero(&servaddr, sizeof(servaddr));// 将整个结构体清零;
servaddr.sim_family = AF_INET;// 设置地址类型为AF_INET;
servaddr.sin_family.s_addr = htonl(INADDR_ANY);// 网络地址为INADDR_ANY, 这个宏表示本地的任意IP地址,因为服务器可能有多个网卡,每个网卡也可能绑定多个IP 地址, 这样设置可以在所有的IP地址上监听,直到与某个客户端建立了连接时才确定下来到底用哪个IP 地址;
servaddr.sin_port = htons(SERV_PORT);// 端口号为SERV_PORT, 我们可以自己定义;
listen() - int listen(int socket, int backlog);
- listen()声明sockfd处于监听状态, 并且最多允许有backlog个客户端处于连接等待状态, 如果接收到更多的连接请求就忽略, 这里设置不会太大(一般是5);
- listen()成功返回0,失败返回-1;
accept() - int accept(int socket, struct sockaddr *restrict address, socklen_t *restrict address_len);
- 三次握手完成后, 服务器调用accept()接受连接;
- 如果服务器调用accept()时还没有客户端的连接请求,就阻塞等待直到有客户端连接上来;
- addr是一个传出参数,accept()返回时传出客户端的地址和端口号;
- 如果给addr 参数传NULL,表示不关心客户端的地址;
- addrlen参数是一个传入传出参数(value-result argument), 传入的是调用者提供的, 缓冲区addr的长度以避免缓冲区溢出问题, 传出的是客户端地址结构体的实际长度(有可能没有占满调用者提供的缓冲区);
- accept()的返回值是一个文件描述符,在传入参数中,他传入的是上面socket()返回的文件描述符,socket()返回的文件描述符可以想象是饭店门口拉客的,招呼客人进入饭店。等到客人进入饭店后,需要有一个新的店小二来招呼客人吃饭。
对于客户来说,不需要进行accept(),而需要调用connect()连接服务器
connect() - int connect(int socket, const struct sockaddr *address, socklen_t address_len);
- connect和bind的参数形式一致, 区别在于bind的参数是自己的地址, 而connect的参数是对方的地址;
- connect()成功返回0,出错返回-1
TCP_v1 测试版
在TCP中的错误码同样是我们自己定义在err.hpp头文件中的
// tcp_server.hpp
static const uint16_t defaultport = 8081;
static const int backlog = 32;
using func_t = std::function<std::string (std::string)>;
class TcpServer{
public:
TcpServer(func_t func, uint16_t port = defaultport) : func_(func), port_(port), quit_(true) {}
void initServer(){
// 1. 创建socket,文件
listensock_ = socket(AF_INET, SOCK_STREAM, 0); // 创建套接字,这个套接字是用于监听的
if(listensock_ < 0){
std::cerr << "create socket error" << std::endl;
exit(SOCKET_ERR);
}
// 2. bind
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port_); // 收发消息的时候,没有做主机转网络,会自动做大小端转化
local.sin_addr.s_addr = htonl(INADDR_ANY);
if(bind(listensock_, (struct sockaddr*)&local, sizeof(local)) < 0){
std::cerr << "bind socket error" << std::endl;
exit(BIND_ERR);
}
// 3. 监听 客户端要链接服务器
if(listen(listensock_, backlog) < 0){
std::cerr << "listen socket error" << std::endl;
exit(LISTEN_ERR);
}
}
void start(){
quit_ = false;
while (!quit_){
struct sockaddr_in client; // 获取客户端的信息
socklen_t len = sizeof(client);
// 4. 获取连接 accept
int sock = accept(listensock_, (struct sockaddr*)&client, &len); // 返回值是一个文件描述符,一个描述符专门用于监听,另一个描述符专门用于套接字通信
if (sock < 0){
std::cerr << "accept error" << std::endl;
continue; // 获取连接失败并不需要终止程序
}
// 提取client信息
std::string clientip = inet_ntoa(client.sin_addr);
uint16_t clientport = ntohs(client.sin_port);
// 5. 获取新连接成功,开始业务处理
std::cout << "获取新连接成功: " << sock << " from " << listensock_ << ", " << clientip << "-" << clientport << std::endl;
service(sock, clientip, clientport);
// 这里存在的问题 - 只能支持一个客户端的运行,当给一个执行流进入了service中时就无法再去accept了,因此一次只能给一个客户进行通信
}
}
// 流式操作可以使用read进行读取数据,UDP不是流式的是面向数据报的
void service(int sock, const std::string &clientip, const uint16_t &clientport) {
std::string who = clientip + "-" + std::to_string(clientport);
char buffer[1024];
while (true){
ssize_t s = read(sock, buffer, sizeof(buffer)-1);
if (s > 0){
buffer[s] = 0;
std::string res = func_(buffer); // 进行回调 这里我们使用的就是在前面UDP套接字中使用过的function将业务与网络IO进行分离
std::cout << who << " >>> " << res << std::endl;
write(sock, res.c_str(), res.size());
}
else if (s == 0){
// 对方将连接关闭了
close(sock);
std::cout << who << " quit, me too" << std::endl;
break;
}
else{
close(sock);
std::cerr << "read error: " << strerror(errno) << std::endl;
break;
}
}
}
~TcpServer(){}
private:
uint16_t port_;
int listensock_;
bool quit_;
func_t func_;
};
// tcp_client.cc
int main(int argc, char* argv[]){
string serverip = argv[1];
uint16_t serverport = atoi(argv[2]);
int sock = socket(AF_INET, SOCK_STREAM, 0);// 1. creatr socket
if (sock < 0){
cerr << "socket error : " << strerror(errno) << endl;
exit(SOCKET_ERR);
}
// 要不要bind?要
// 要不要自己bind?不要, 因为client要让OS自动给用户进行bind
// 要不要listen?不要 要不要accept?不需要
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(serverport);
inet_aton(serverip.c_str(), &server.sin_addr);
int cnt = 5;
while (connect(sock, (struct sockaddr*)&server, sizeof(server)) != 0){// 2. connect
sleep(1);
cout << "正在给你尝试重连,重连次数还有: " << cnt-- << endl;
if(cnt <= 0) break;
}
if (cnt <= 0){
cerr << "连接失败..." << endl;
exit(CONNECT_ERR);
}
char buffer[1024];// 3. 连接成功
while(true){
string line;
cout << "Enter>>> ";
getline(cin, line);
write(sock, line.c_str(), line.size());
ssize_t s = read(sock, buffer, sizeof(buffer)-1);
if(s > 0){
buffer[s] = 0;
cout << "server echo >>>" << buffer << endl;
}else if(s == 0){
cerr << "server quit" << endl;
break;
}else{
cerr << "read error: " << strerror(errno) << endl;
break;
}
}
close(sock);
return 0;
}
运行的结果就如下所示,可以发现当一个客户端连接上服务器之后,另外一个客户端就无法连接,只有当前连接上服务器的客户端退出后,另一个客户端才能够连接上。
为了解决上述的服务器只能够同时连接一个客户端的问题,我们在v2版本中使用多进程的形式:
TCP_v2 多进程版
// 之前的问题就存在于service(sock, clientip, clientport); 这句处理网络服务的代码与服务器的accept函数处于同一个进程中,下面就对其进行修改
void start(){
// signal(SIGCHLD, SIG_IGN);
pid_t id = fork();// 创建子进程
if (id < 0){
close(sock);
continue;
}
else if (id == 0){ // child,父进程fd会被子进程继承吗?会。 父子会用同一张文件描述符表吗?不会。子进程会拷贝父进程的fd table
// 子进程,建议关闭掉不需要的fd
close(listensock_);
// if (fork() > 0) exit(0); // 这一行代码会让child进程退出,孙子进程关闭其父进程之后->孤儿进程,由1号进程领养
service(sock, clientip, clientport);
exit(0);
}
// 父进程,一定要关闭掉不需要的fd,不关闭 - 文件描述符泄露
close(sock);
pid_t ret = waitpid(id, nullptr, 0); // 父进程默认阻塞 waitpid(id, nullptr, WOHANG)
if (ret == id)
std::cout << "wait child " << id << " success" << std::endl;
}
上述多进程程序运行时,当运行到waitpid的时候会被阻塞,如果阻塞了,那么就还是与v1版本的一样,在这里我们有两个比较推荐的方案,第一种就是在start函数开始的时候将子进程的信号忽略signal(SIGCHLD, SIG_IGN);
或者我们也可以if (fork() > 0) exit(0);
这句话就是创建孙子进程,让子进程退出,此时孙子进程就变为了孤儿进程有1号进程领养。
TCP_v3 多线程版
同样的我们也可以对v1版本进行修改,将其修改为多进程的版本,与v2的版本相同都是对service(sock, clientip, clientport);
处进行修改
class ThreadData{
public:
ThreadData(int fd, const std::string &ip, const uint16_t &port, TcpServer *ts)
: sock(fd), clientip(ip), clientport(port), current(ts) {}
public:
int sock;
std::string clientip;
uint16_t clientport;
TcpServer *current;
};
static void* threadRoutine(void* args){
pthread_detach(pthread_self()); // 为了不用join将线程自己进行分离
ThreadData* td = static_cast<ThreadData*>(args);
td->current->service(td->sock, td->clientip, td->clientport);
delete td;
return nullptr;
}
void start(){
// ...
// 要不要关闭不要的socket??不能,多线程将进程的文件描述符表共享
pthread_t tid;
ThreadData* td = new ThreadData(sock, clientip, clientport, this);
pthread_create(&tid, nullptr, threadRoutine, td); // 要给线程传入sock...
}
TCP_v4 线程池版
最后,我们可以再次对其进行修改,最后让其成为一个线程池版本的TCP服务器。
在线程池版本的TCP服务器中我们构建了日志输出模块、线程池模块、任务模块、守护进程模块、Tcpserver模块、Tcpclient模块等
日志输出模块
// log.hpp
#pragma once
#include <cstdio>
#include <cstring>
#include <cstdarg>
#include <string>
#include <sys/types.h>
#include <unistd.h>
const std::string filename = "log/tcpserver.log";
// 日志是有日志等级的
enum{ // 使用枚举类型来定义不同的日志等级
Debug = 0,
Info,
Warning,
Error,
Fatal,
Uknown
};
static std::string toLevelString(int level){ // 将数字日志等级转换为字符串
switch (level){
case Debug:
return "Debug";
case Info:
return "Info";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatal:
return "Fatal";
default:
return "Uknown";
}
}
static std::string getTime(){
time_t curr = time(nullptr); // 获取当前时间离1970.1.1的差值
struct tm *tmp = localtime(&curr); // ANSI C标准称使用tm结构的这种时间表示为分解时间(broken-down time)。
char buffer[128];
snprintf(buffer, sizeof(buffer), "%d-%d-%d %d:%d:%d", tmp->tm_year + 1900, tmp->tm_mon+1, tmp->tm_mday, tmp->tm_hour, tmp->tm_min, tmp->tm_sec); // 格式化输出字符串,并将结果写入到指定的缓冲区
return buffer;
}
// ...可变参数
// 日志格式:日志等级 时间 pid 消息体
// logMessage(DEBUG, "%d, %s", 12, s.c_str())
void logMessage(int level, const char* format, ...) // 传入的可能是字符串常量
{
char logLeft[1024];
std::string level_string = toLevelString(level);
std::string cur_time = getTime();
snprintf(logLeft, sizeof(logLeft), "[%s] [%s] [%d] ", level_string.c_str(), cur_time.c_str(), getpid());
char logRight[1024];
// 可变参数可以使用vsnprintf直接进行获取
va_list p;
va_start(p, format);
vsnprintf(logRight, sizeof(logRight), format, p);
va_end(p);
// 打印
// printf("%s %s\n", logLeft, logRight);
// 保存到文件中
FILE *fp = fopen(filename.c_str(), "a");
if(fp == nullptr)return;
fprintf(fp,"%s%s\n", logLeft, logRight);
fflush(fp); //可写也可以不写
fclose(fp);
// va_list p; // char* 类型
// int a = va_arg(p, int); // 根据类型提取参数
// va_start(p, format); // 将指针指向可变参数部分的起始地址
// va_end(p); // p = NULL
}
线程池模块就是我们之前写过的带有单例模式的线程池模块。
下面是服务器类实现:
namespace ns_server
{
static const uint16_t defaultport = 8081;
static const int backlog = 32;
using func_t = std::function<std::string(std::string)>;
class TcpServer;
class ThreadData
{
public:
ThreadData(int fd, const std::string &ip, const uint16_t &port, TcpServer *ts)
: sock(fd), clientip(ip), clientport(port), current(ts)
{}
public:
int sock;
std::string clientip;
uint16_t clientport;
TcpServer *current;
};
class TcpServer
{
public:
TcpServer(func_t func, uint16_t port = defaultport) : func_(func), port_(port), quit_(true)
{}
void initServer()
{
//...
}
void start()
{
// ...
// 线程池版
// 使用线程池的时候,一定是有限的,一定是要处理短任务,不一定在这里接入线程池
Task t(sock, clientip, clientport, bind(&TcpServer::service, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); // 构建任务,在类中的函数有隐含的this指针需要使用bind进行处理
ThreadPool<Task>::getinstance()->pushTask(t); // 将任务推送至线程池中
}
}
// 流式操作可以使用read进行读取数据,UDP不是流式的是面向数据报的
void service(int sock, const std::string &clientip, const uint16_t &clientport){
std::string who = clientip + "-" + std::to_string(clientport);
char buffer[1024];
ssize_t s = read(sock, buffer, sizeof(buffer) - 1); // 读取从网络中收到的数据根据不同的情况返回不同的结果
while(true)
{
if (s > 0){
buffer[s] = 0;
std::string res = func_(buffer); // 进行回调
logMessage(Debug, "%s# %s", who.c_str(), res.c_str());
write(sock, res.c_str(), res.size());
}
}
close(sock);
}
~TcpServer(){}
private:
uint16_t port_;
int listensock_;
bool quit_;
func_t func_; // 回调函数处理读取之后的数据信息
};
}
守护进程模块
#pragma once
// 进程组 前台任务 后台任务
// 如果后台任务提到前台,老的前台任务无法运行
// 在会话中只能有一个前台任务在运行 --- 我们在命令行中启动一个进程的时候,bash就无法运行
// 如果登录就是创建一个会话,bash任务,启动我们的进程,就是在当前会话中创建新的前后台任务
// 1. setsid();
// 2. setsid(), 调用进程,不能是组长!我们怎么保证自己不是组长呢?
// 3. 守护进程a. 忽略异常信号 b. 0,1,2要做特殊处理 c. 进程的工作路径可能要更改 /
//守护进程的本质:是孤儿进程的一种!
void Daemon()
{
// 1. 忽略信号
signal(SIGPIPE, SIG_IGN);
signal(SIGCHLD, SIG_IGN);
// 2. 让自己不要成为组长
if (fork() > 0) exit(0);
// 3. 新建会话,自己成为会话的话首进程
pid_t ret = setsid();
if ((int)ret == -1)
{
logMessage(Fatal, "deamon error, code: %d, string: %s", errno, strerror(errno));
exit(SETSID_ERR);
}
// 4. 可选:可以更改守护进程的工作路径
// chdir("/")
// 5. 处理后续的对于0,1,2的问题
int fd = open("/dev/null", O_RDWR);// 在 Linux 系统中,/dev/null 是一个特殊的文件,它被称为“空设备”。它没有任何数据,读取它永远不会产生任何输出,写入它永远不会导致任何数据被存储。/dev/null 起着丢弃数据的作用,可以用于一些需要忽略输出或者输入的场合。
if (fd < 0)
{
logMessage(Fatal, "open error, code: %d, string: %s", errno, strerror(errno));
exit(OPEN_ERR);
}
dup2(fd, 0);
dup2(fd, 1);
dup2(fd, 2);
close(fd);
}
服务器cpp文件
#include "tcpServer.hpp"
#include "daemon.hpp"
#include <memory>
using namespace std;
using namespace ns_server;
static void usage(string proc){
std::cout << "Usage:\n\t" << proc << " port\n" << std::endl;
}
std::string echo(const std::string& message){
return message;
}
// ./tcp_server port
int main(int argc, char* argv[]){
if (argc != 2){
usage(argv[0]);
exit(USAGE_ERR);
}
uint16_t port = atoi(argv[1]);
unique_ptr<TcpServer> tsvr(new TcpServer(echo, port));
tsvr->initServer();
// 将服务器守护进程化
Daemon();
tsvr->start();
return 0;
}