文章目录
- Tcp服务端
- TcpServer.hpp
- TcpServer.cc
- Tcp客户端
- TcpClient.hpp
- TcpClient.cc
- TCP客户端处理
- 守护进程
- 守护进程化
Tcp服务端
TcpServer.hpp
TCP服务端创建流程如下:
创建socket文件套接字对象,面向字节流SOCK_STREAM
bind绑定自己的网络信息,通常端口是固定的,IP地址默认为(0.0.0.0或者NADDR_ANY)
设置socket为监听状态(listen),一直帮我们获取新连接,接收请求,UDP没有链接,发过来的就是数据,TCP需要listen状态,是因为TCP是面向连接的,这点与UDP不同,TCP还需要进行监听
服务端获取客服端连接请求(accept)
最后进行通信,由于TCP是面向字节流,后续全是文件操作(read/write)
void initServer()
{
// 1. 创建socket文件套接字对象
_listensock = socket(AF_INET, SOCK_STREAM, 0);
if (_listensock < 0)
{
logMessage(FATAL, "create socket error");
exit(SOCKET_ERR);
}
logMessage(NORMAL, "create socket success: %d", _listensock);
// 2. bind绑定自己的网络信息
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(_port);
local.sin_addr.s_addr = INADDR_ANY;
if (bind(_listensock, (struct sockaddr *)&local, sizeof(local)) < 0)
{
logMessage(FATAL, "bind socket error");
exit(BIND_ERR);
}
logMessage(NORMAL, "bind socket success");
// 3. 设置socket 为监听状态
if (listen(_listensock, gbacklog) < 0)
{
logMessage(FATAL, "listen socket error");
exit(LISTEN_ERR);
}
logMessage(NORMAL, "listen socket success");
}
void start()
{
for (;;)
{
// 4. server 获取新链接
// sock, 和client进行通信的fd
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int sock = accept(_listensock, (struct sockaddr *)&peer, &len);
if (sock < 0)
{
logMessage(ERROR, "accept error, next");
continue;
}
logMessage(NORMAL, "accept a new link success, get new sock: %d", sock);
cout<<"sock: "<<sock<<endl;
//5.未来通信就用这个sock
serviceIO(sock);
close(sock);
}
}
void serviceIO(int sock)
{
char buffer[1024];
while (true)
{
ssize_t n = read(sock, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
std::cout << "recv message: " << buffer << std::endl;
std::string outbuffer = buffer;
outbuffer += " server[echo]";
write(sock, outbuffer.c_str(), outbuffer.size());
}
else if (n == 0)
{
logMessage(NORMAL, "client quit, me too!");
break;
}
}
}
logMessage函数:打印出自己模拟相关的日志信息
TcpServer.cc
TcpServer.cc主函数进行相关函数的调用
static void Usage(string proc)
{
cout << "\nUsage:\n\t" << proc << " local_port\n\n";
}
// tcp服务器,启动上和udp server一模一样
// ./tcpserver local_port
int main(int argc, char *argv[])
{
if (argc != 2)
{
Usage(argv[0]);
exit(USAGE_ERR);
}
uint16_t port = atoi(argv[1]);
unique_ptr<TcpServer> tsvr(new TcpServer(port));
tsvr->initServer();
tsvr->start();
return 0;
}
Tcp客户端
TcpClient.hpp
Tcp客户端创建流程如下:
创建套接字(socket)对象,面向字节流SOCK_STREAM
客户端需要bind,但是客户端的绑定不需要我们自己写,
操作系统会去绑定(无需程序员bind)
客户端发起连接请求(connect)
进行通信,客户端输入通信的内容,进行文件操作进行读写通信(read/write)
void initClient()
{
// 1. 创建socket
_sock = socket(AF_INET, SOCK_STREAM, 0);
if(_sock < 0)
{
std::cerr << "socket create error" << std::endl;
exit(2);
}
// 2. tcp的客户端要bind,不要显示的bind
}
void start()
{
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(_serverport);
server.sin_addr.s_addr = inet_addr(_serverip.c_str());
if(connect(_sock, (struct sockaddr*)&server, sizeof(server)) != 0)
{
std::cerr << "socket connect error" << std::endl;
}
else
{
std::string msg;
while(true)
{
std::cout << "Enter# ";
std::getline(std::cin, msg);
write(_sock, msg.c_str(), msg.size());
char buffer[NUM];
int n = read(_sock, buffer, sizeof(buffer)-1);
if(n > 0)
{
buffer[n] = 0;
std::cout << "Server回显# " << buffer << std::endl;
}
else
{
break;
}
}
}
}
TcpClient.cc
TcpClinet.cc主函数进行相关函数的调用
static void Usage(string proc)
{
cout << "\nUsage:\n\t" << proc << " serverip serverport\n\n";
}
// ./tcpclient serverip serverport
int main(int argc, char *argv[])
{
if (argc != 3)
{
Usage(argv[0]);
exit(1);
}
string serverip = argv[1];
uint16_t serverport = atoi(argv[2]);
unique_ptr<TcpClient> tcli(new TcpClient(serverip, serverport));
tcli->initClient();
tcli->start();
return 0;
}
如上就是TCP套接字创建的整体流程
上述TCP服务端TcpServer.hpp整体代码如下👇
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <signal.h>
#include <pthread.h>
#include "log.hpp"
namespace server
{
enum
{
USAGE_ERR = 1,
SOCKET_ERR,
BIND_ERR,
LISTEN_ERR
};
static const uint16_t gport = 8080;
static const int gbacklog = 5;
class TcpServer;
class ThreadData
{
};
class TcpServer
{
public:
TcpServer(const uint16_t &port = gport) : _listensock(-1), _port(port)
{
}
void initServer()
{
// 1. 创建socket文件套接字对象
_listensock = socket(AF_INET, SOCK_STREAM, 0);
if (_listensock < 0)
{
logMessage(FATAL, "create socket error");
exit(SOCKET_ERR);
}
logMessage(NORMAL, "create socket success: %d", _listensock);
// 2. bind绑定自己的网络信息
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(_port);
local.sin_addr.s_addr = INADDR_ANY;
if (bind(_listensock, (struct sockaddr *)&local, sizeof(local)) < 0)
{
logMessage(FATAL, "bind socket error");
exit(BIND_ERR);
}
logMessage(NORMAL, "bind socket success");
// 3. 设置socket 为监听状态
if (listen(_listensock, gbacklog) < 0)
{
logMessage(FATAL, "listen socket error");
exit(LISTEN_ERR);
}
logMessage(NORMAL, "listen socket success");
}
void start()
{
for (;;)
{
// 4. server 获取新链接
// sock, 和client进行通信的fd
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int sock = accept(_listensock, (struct sockaddr *)&peer, &len);
if (sock < 0)
{
logMessage(ERROR, "accept error, next");
continue;
}
logMessage(NORMAL, "accept a new link success, get new sock: %d", sock);
cout << "sock: " << sock << endl;
serviceIO(sock);
close(sock);
}
}
void serviceIO(int sock)
{
char buffer[1024];
while (true)
{
ssize_t n = read(sock, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
std::cout << "recv message: " << buffer << std::endl;
std::string outbuffer = buffer;
outbuffer += " server[echo]";
write(sock, outbuffer.c_str(), outbuffer.size());
}
else if (n == 0)
{
logMessage(NORMAL, "client quit, me too!");
break;
}
}
}
~TcpServer()
{
}
private:
int _listensock;
uint16_t _port;
};
}
TCP客户端处理
- 多进程版本处理
上面的是单进程版本,要想实现多进程,我们只需要改造TcpServer.hpp即可:
namespace server
{
enum
{
USAGE_ERR = 1,
SOCKET_ERR,
BIND_ERR,
LISTEN_ERR
};
static const uint16_t gport = 8080;
static const int gbacklog = 5;
class TcpServer
{
public:
TcpServer(const uint16_t &port = gport) : _listensock(-1), _port(port)
{
}
void initServer()
{
// 1. 创建socket文件套接字对象
_listensock = socket(AF_INET, SOCK_STREAM, 0);
if (_listensock < 0)
{
logMessage(FATAL, "create socket error");
exit(SOCKET_ERR);
}
logMessage(NORMAL, "create socket success: %d", _listensock);
// 2. bind绑定自己的网络信息
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(_port);
local.sin_addr.s_addr = INADDR_ANY;
if (bind(_listensock, (struct sockaddr *)&local, sizeof(local)) < 0)
{
logMessage(FATAL, "bind socket error");
exit(BIND_ERR);
}
logMessage(NORMAL, "bind socket success");
// 3. 设置socket 为监听状态
if (listen(_listensock, gbacklog) < 0)
{
logMessage(FATAL, "listen socket error");
exit(LISTEN_ERR);
}
logMessage(NORMAL, "listen socket success");
}
void start()
{
for (;;)
{
// 4. server 获取新链接
// sock, 和client进行通信的fd
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int sock = accept(_listensock, (struct sockaddr *)&peer, &len);
if (sock < 0)
{
logMessage(ERROR, "accept error, next");
continue;
}
logMessage(NORMAL, "accept a new link success, get new sock: %d", sock);
cout << "sock: " << sock << endl;
pid_t id = fork();
if (id == 0) // child
{
close(_listensock);
if(fork()>0) exit(0);
serviceIO(sock);
close(sock);
exit(0);
}
close(sock);
//father
pid_t ret = waitpid(id, nullptr, 0);
if(ret>0)
{
std::cout << "waitsuccess: " << ret << std::endl;
}
}
}
void serviceIO(int sock)
{
char buffer[1024];
while (true)
{
ssize_t n = read(sock, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
std::cout << "recv message: " << buffer << std::endl;
std::string outbuffer = buffer;
outbuffer += " server[echo]";
write(sock, outbuffer.c_str(), outbuffer.size());
}
else if (n == 0)
{
logMessage(NORMAL, "client quit, me too!");
break;
}
}
}
~TcpServer()
{
}
private:
int _listensock;
uint16_t _port;
};
}
进程创建的理解:
利用fork创建子进程,id==0时是子进程,此时关闭子进程的监听sock,即close(_listensock)(注意:虽然后续代码会让子进程退出,最好还是close一下);对于if(fork()>0)此时由子进程去创建进程,创建出来的进程,我们为了方便描述,称为孙子进程,如果fork()>0,说明是父进程,也就是此时我们的子进程,让子进程退出,父进程在外部就不用阻塞等待子进程退出了,而我们的孙子进程成为孤儿进程,会被1号进程领养,无需关心。
孙子进程close(sock),关闭s使用完sock文件描述符,防止泄漏(后续代码是退出,最好还是close一下)。父进程close(sock),关闭通信的sock,父进程与顺子进程都有,父进程关闭,文件描述符引用计数–,直到孙子进程退出,fd才减为0.关闭。父进程提前关闭并不会影响孙子进程。父进程如果不关会造成文件描述符泄漏,最后等待采用阻塞等待
- 多线程版本处理
对于一个进程中的所有线程,它们共享相同的文件描述符表,所以对于一个线程所对应的fd在使用完毕之后我们需要对其进行close关闭:
namespace server
{
enum
{
USAGE_ERR = 1,
SOCKET_ERR,
BIND_ERR,
LISTEN_ERR
};
static const uint16_t gport = 8080;
static const int gbacklog = 5;
class TcpServer;
class ThreadData
{
public:
ThreadData(TcpServer *self, int sock) : _self(self), _sock(sock)
{
}
public:
TcpServer *_self;
int _sock;
};
class TcpServer
{
public:
TcpServer(const uint16_t &port = gport) : _listensock(-1), _port(port)
{
}
void initServer()
{
// 1. 创建socket文件套接字对象
_listensock = socket(AF_INET, SOCK_STREAM, 0);
if (_listensock < 0)
{
logMessage(FATAL, "create socket error");
exit(SOCKET_ERR);
}
logMessage(NORMAL, "create socket success: %d", _listensock);
// 2. bind绑定自己的网络信息
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(_port);
local.sin_addr.s_addr = INADDR_ANY;
if (bind(_listensock, (struct sockaddr *)&local, sizeof(local)) < 0)
{
logMessage(FATAL, "bind socket error");
exit(BIND_ERR);
}
logMessage(NORMAL, "bind socket success");
// 3. 设置socket 为监听状态
if (listen(_listensock, gbacklog) < 0) // 第二个参数backlog后面在填这个坑
{
logMessage(FATAL, "listen socket error");
exit(LISTEN_ERR);
}
logMessage(NORMAL, "listen socket success");
}
void start()
{
for (;;)
{
// 4. server 获取新链接
// sock, 和client进行通信的fd
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int sock = accept(_listensock, (struct sockaddr *)&peer, &len);
if (sock < 0)
{
logMessage(ERROR, "accept error, next");
continue;
}
logMessage(NORMAL, "accept a new link success, get new sock: %d", sock);
cout << "sock: " << sock << endl;
pthread_t tid;
ThreadData *td = new ThreadData(this, sock);
pthread_create(&tid, nullptr, threadRoutine, td);
pthread_join(tid, nullptr);
}
}
static void *threadRoutine(void *args)
{
pthread_detach(pthread_self());
ThreadData *td = static_cast<ThreadData *>(args);
td->_self->serviceIO(td->_sock);
close(td->_sock);
delete td;
return nullptr;
}
void serviceIO(int sock)
{
char buffer[1024];
while (true)
{
ssize_t n = read(sock, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
std::cout << "recv message: " << buffer << std::endl;
std::string outbuffer = buffer;
outbuffer += " server[echo]";
write(sock, outbuffer.c_str(), outbuffer.size());
}
else if (n == 0)
{
logMessage(NORMAL, "client quit, me too!");
break;
}
}
}
~TcpServer()
{
}
private:
int _listensock;
uint16_t _port;
};
}
- 线程池版本处理
Tcpserver.hpp
#include "log.hpp"
#include "Task.hpp"
#include "ThreadPool.hpp"
namespace server
{
static const uint16_t gport = 8080;
static const int gbacklog = 5;
enum
{
USAGE_ERR = 1,
SOCKET_ERR,
BIND_ERR,
LISTEN_ERR
};
class TcpServer;
class ThreadData
{
public:
ThreadData(TcpServer *self, int sock)
: _self(self), _sock(sock)
{
}
public:
TcpServer *_self;
int _sock;
};
class TcpServer
{
public:
TcpServer(const uint16_t port = gport)
: _port(port), _listensock(-1)
{
}
void initServer()
{
// 1.创建socket文件套接字对象
_listensock = socket(AF_INET, SOCK_STREAM, 0);
if (_listensock < 0)
{
logMessage(FATAL, "create socket error");
exit(SOCKET_ERR);
}
logMessage(NORMAL, "create socket success : %d",_listensock);
// 2.绑定自己的网络信息
struct sockaddr_in local;
memset(&local, 0, sizeof local);
local.sin_family = AF_INET;
local.sin_port = htons(_port);
local.sin_addr.s_addr = INADDR_ANY;
if (bind(_listensock, (struct sockaddr *)&local, sizeof local) < 0)
{
logMessage(FATAL, "bind socket error");
exit(BIND_ERR);
}
logMessage(NORMAL, "bind socket success");
// 3.设置socket为监听状态,面向链接
if (listen(_listensock, gbacklog) < 0)
{
logMessage(FATAL, "listen socket error");
exit(LISTEN_ERR);
}
logMessage(NORMAL, "liseten socket success");
}
void start()
{
// 线程池的初始化
ThreadPool<Task>::getInstance()->run();
logMessage(NORMAL,"Thread init success");
// signal(SIGCHLD,SIG_IGN);
for (;;)
{
// server获取新链接
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
// sock:和client进程通信的fd
int sock = accept(_listensock, (struct sockaddr *)&peer, &len);
if (sock < 0)
{
logMessage(FATAL, "accept error,next");
continue;
}
logMessage(NORMAL, "accept a new link success,get new sock:%d",sock);
std::cout << "sock: " << sock << std::endl;
//线程池
ThreadPool<Task>::getInstance()->Push(Task(sock,serverIO));
}
}
~TcpServer()
{
}
private:
int _listensock;
uint16_t _port;
};
}
Task.hpp
#pragma once
#include <iostream>
#include <functional>
void serverIO(int sock)
{
char buffer[1024];
while (true)
{
ssize_t n = read(sock, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
std::cout << "Server message: " << buffer << std::endl;
std::string outbuffer = buffer;
outbuffer += "server[echo]";
write(sock, outbuffer.c_str(), outbuffer.size());
}
else if (n == 0)
{
logMessage(NORMAL, "client quit,mee too");
break;
}
}
}
class Task
{
using func_t = std::function<void(int)>;
public:
Task() {}
Task(int sock,func_t func)
: _sock(sock), _callback(func)
{
}
void operator()()
{
_callback(_sock);
}
private:
int _sock;
func_t _callback;
};
Threadpool.hpp
#pragma once
#include "Thread.hpp"
#include "LockGuard.hpp"
#include "log.hpp"
#include <vector>
#include <queue>
#include <pthread.h>
#include <mutex>
#include <unistd.h>
#include <mutex>
using namespace ThreadNs;
const int gnum = 10;
template <class T>
class ThreadPool;
template <class T>
class ThreadData
{
public:
ThreadPool<T> *threadpool;
std::string name;
public:
ThreadData(ThreadPool<T> *tp, const std::string &n) : threadpool(tp), name(n)
{
}
};
template <class T>
class ThreadPool
{
private:
static void *handlerTask(void *args)
{
ThreadData<T> *td = (ThreadData<T> *)args;
while (true)
{
T t;
{
LockGuard lockguard(td->threadpool->mutex());
while (td->threadpool->isQueueEmpty())
{
td->threadpool->threadWait();
}
t = td->threadpool->pop(); // pop的本质是将任务从公共队列中拿到当前线程独立的栈中
}
t();
}
delete td;
return nullptr;
}
ThreadPool(const int &num = gnum) : _num(num)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
for (int i = 0; i < _num; i++)
{
_threads.push_back(new Thread());
}
}
void operator=(const ThreadPool &) = delete;
ThreadPool(const ThreadPool &) = delete;
public:
void lockQueue() { pthread_mutex_lock(&_mutex); }
void unlockQueue() { pthread_mutex_unlock(&_mutex); }
bool isQueueEmpty() { return _task_queue.empty(); }
void threadWait() { pthread_cond_wait(&_cond, &_mutex); }
T pop()
{
T t = _task_queue.front();
_task_queue.pop();
return t;
}
pthread_mutex_t *mutex()
{
return &_mutex;
}
public:
void run()
{
for (const auto &t : _threads)
{
ThreadData<T> *td = new ThreadData<T>(this, t->threadname());
t->start(handlerTask, td);
//std::cout << t->threadname() << "start..." << std::endl;
logMessage(DEBUG,"%s start ...",t->threadname().c_str());
}
}
void Push(const T &in)
{
LockGuard lockguard(&_mutex);
_task_queue.push(in);
pthread_cond_signal(&_cond);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
for (const auto &t : _threads)
delete t;
}
static ThreadPool<T> *getInstance()
{
if (nullptr == tp)
{
_singlock.lock();
if (nullptr == tp)
{
tp = new ThreadPool<T>();
}
_singlock.unlock();
return tp;
}
}
private:
int _num;
std::vector<Thread *> _threads;
std::queue<T> _task_queue;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
static ThreadPool<T> *tp;
static std::mutex _singlock;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::tp = nullptr;
template <class T>
std::mutex ThreadPool<T>::_singlock;
守护进程
守护进程:服务器要做到一点:服务器启动之后,不在受到用户的登录退出影响,服务器可以自定义运行,不受用户登录注销影响的进程是守护进程
&:让一个命令在后台运行
jobs
命令用于显示当前shell会话中的活动作业(jobs),包括前台作业和后台作业。
任务由进程组完成
从哪里看出是同一个会话:SID
现在,把作业放在前台。fg 2:既我们把上面的2号作业放在前台
ctrl+z:
一个任务在前台如果暂停了会立马放在后台:
此时我们的2号任务是stopped状态的,我们让它启动起来:bg 2:
fg:放前台然后ctrl+z停止,bg:启动,作业是可以互相前后端转化的:
把任务提到前台后,命令行解释器就不会做响应了:这是因为当前整个会话中只能有一个前台任务,退出时任务都会被清掉,这样的任务是可能受到用户登录和注销的影响的!
要想不受影响,我们要独立出来,自成会话,自成进程组,和终端设备无关。这就是守护进程,可以一直运行,除非未来不让它运行了。
守护进程化
系统提供了接口。守护进程的原理以及代码:
daemon.hpp: setsid:形成一个新的进程组,创建一个新的会话,不能随便掉,调用这个函数的进程不能是组长
#pragma once
#include <unistd.h>
#include <signal.h>
#include <cstdlib>
#include <cassert>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#define DEV "/dev/null"//数据黑洞,写入的数据会被吃掉,读取数据什么都读不到
void daemonSelf(const char *currPath = nullptr)
{
// 1. 让调用进程忽略掉异常的信号
signal(SIGPIPE, SIG_IGN);
// 2. 如何让自己不是组长,setsid
if (fork() > 0)
exit(0);
// 子进程 -- 守护进程,精灵进程,本质就是孤儿进程的一种!
pid_t n = setsid();
assert(n != -1);
// 3. 守护进程是脱离终端的,关闭或者重定向以前进程默认打开的文件
int fd = open(DEV, O_RDWR);
if(fd >= 0)
{
dup2(fd, 0);
dup2(fd, 1);
dup2(fd, 2);
close(fd);
}
else
{
close(0);
close(1);
close(2);
}
// 4. 可选:进程执行路径发生更改
if(currPath) chdir(currPath);
直接在TcpServer.cc文件中进行调用即可:
#include "tcpServer.hpp"
#include "memory"
#include "daemon.hpp"
using namespace Server;
static void Usage(std::string proc)
{
std::cout<<"Usage:\n\t"<<proc<<"serverPort\n\n";
}
int main(int argc,char* argv[])
{
if(argc!=2)//判断外部传入的参数是否为2
{
Usage(argv[0]);
exit(USAGE_ERR);
}
uint16_t port=std::stoi(argv[1]);
std::unique_ptr<TcpServer> tsvr(new TcpServer(port));
tsvr->InitServer();
daemonSele();//守护进程化,让这个独立的孤儿进程去启动服务器
tsvr->Start();
return 0;
}