文章目录
- 前言
- 一、简单TCP网络程序
- 二、TCP socket API 详解
- socket()
- bind()
- listen()
- accept()
- connect()
- 三、TCP网络编程
- 头文件
- 日志文件
- 客户端
- 服务器
- 单进程版本
- 多进程版本
- 多线程版本
- 四、线程池版的TCP
- 更改提供的服务
- 总结
前言
接着上节课我们讲了udp套接字网络编程,这节课我来给大家讲解基于Tcp套接字网络编程.
正文开始!
一、简单TCP网络程序
和上一章的UDP类似,实现一个简单的大小写转化的功能.
二、TCP socket API 详解
下面介绍程序中用到的socket API,这些函数都在sys/socket.h中.
socket()
- socket()打开一个网络通讯端口,如果成功的话,就像open()一样返回一个文件描述符;
- 应用程序可以像读写文件一样用read/write在网络上收发数据;
- 如果socket()调用出错返回-1;
- 对于IPv4,family参数指定为AF_INET;
- 对于TCP协议,type参数指定为SOCK_STREAM,表示面向流的传输协议;
- prorocol指定为0即可.
bind()
- 服务器程序所监听的网络地址和端口号通常是固定不变的,客户端程序得知服务器程序的地址和端口号后就可以向服务器发起连接;服务器需要调用bind绑定一个固定的网络地址和端口号;
- bind()成功返回0,失败返回-1;
- bind()的作用是将参数sockfd和myaddr绑定在一起,使sockfd这个用于网络通讯的文件描述符监听myaddr所描述的地址和端口号;
- 上一章我给大家讲过,struct sockaddr*是一个通用指针类型,myaddr参数实际上可以接受多种协议的sockaddr结构体,而它们的长度各不相同,所以需要第三个参数addrlen指定结构体的长度.
在程序中对myaddr参数是这样初始化的:
bzero(&myaddr,sizeof(myaddr));
myaddr.sin_family = AF_INET;
myaddr.sin_addr.s_addr = htonl(INADDR_ANY);
myaddr.sin_port = htons(PORT);
- 将整个结构体清零;
- 设置地址类型为AF_INET;
- 网络地址为INADDR_ANY,这个宏表示本地的任意IP地址,因为服务器可能有多个网卡,每个网卡也可能绑定多个IP地址,这样设置可以在所有的IP地址上监听,直到与某个客户端建立连接时才确定下来到底用哪个IP地址;
- 端口号为PORT,默认定义为8080;
listen()
- listen()声明sockdf处于监听状态,并且最多允许有backlog个客户端处于连接等待状态,如果接收到更多的连接请求就忽略,这里设置不会太大(一般是5),
- listen()成功返回0,失败返回-1;
accept()
- 三次握手完成后,服务器调用accept()接受连接;
- 如果服务器调用accept()时还没有客户端的连接请求,就阻塞等待直到有客户端连接上来;
- addr是一个传出参数,accept()返回时传出客户端的地址和端口号;
- 如果给addr 参数串NULL,表示不关心客户端的地址;
- addrlen参数是一个传入传出参数(value-result argument),传入的是调用者提供的,缓冲区addr的长度以避免缓冲区溢出问题,传出的是客户端地址结构体的实际长度(有可能没有占满调用者提供的缓冲区);
- 返回值是一个socket套接字,传入的第一个参数也是一个socket套接字,这些都是文件描述符.
- 传入的套接字参数的核心工作是:获取新的连接(一定是有客户来连接了!)—>监听套接字
- 返回值的套接字的核心工作:主要是为用户提供网络服务的socket,主要是进行IO服务;
connect()
- 客户端需要调用connect()连接服务器;
- connect()和bind的参数形式一致,区别在于bind的参数是自己的地址,而connect的参数是对方的地址;
- connect()成功返回0,出错返回-1;
三、TCP网络编程
头文件
util.hpp
#include <iostream>
#include <string>
#include<cstring>
#include<cstdlib>
#include <ctype.h>
#include<signal.h>
#include<unistd.h>
#include<sys/wait.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<pthread.h>
#define SOCKET_ERR 1
#define BIND_ERR 2
#define LISTEN_ERR 3
#define USAGE_ERR 4
#define CONN_ERR 5
#define BUFFER_SIZE 1024
日志文件
log.hpp
#pragma once
#include<cstdio>
#include<ctime>
#include<cstdarg>
#include<cassert>
#include<cstdlib>
#include<cstring>
#include<cerrno>
#define DEBUG 0
#define NOTICE 1
#define WARINING 2
#define FATAL 3
const char* log_level[]={"DEBUG","NOTICE","WARINING","FATAL"};
//logMessage(DEBUG,"%d",10);
void logMessage(int level,const char* format,...)
{
assert(level>=DEBUG);
assert(level<=FATAL);
char logInfo[1024];
char* name=getenv("USER");
va_list ap; //ap--->char*
va_start(ap,format);
vsnprintf(logInfo,sizeof(logInfo)-1,format,ap);
va_end(ap); //ap=NULL
FILE* out=(level==FATAL)?stderr:stdout;
fprintf(out,"%s | %u | %s | %s\n",\
log_level[level],(unsigned int)time(nullptr),\
name==nullptr?"unknow":name,logInfo);
}
客户端
ClientTcp.cc
#include "util.hpp"
#include "log.hpp"
using namespace std;
volatile bool quit =false;
static void Usage(string proc)
{
cerr << "Usage\n\t" << proc << " ip port" << endl;
cerr << "Example\n\t" << proc << " 127.0.0.1 8080\n"
<< endl;
}
// 2.需要bind吗??--->需要,但是不需要显式的bind!
// 3.需要listen吗?不需要的!
// 4.需要accept吗?不需要的!
// ./clientTcp serverIp serverPort
int main(int argc, char *argv[])
{
if (argc != 2 && argc != 3)
{
Usage(argv[0]);
exit(USAGE_ERR);
}
string serverIp = argv[1];
uint16_t serverPort = stoi(argv[2]);
// 1.创建套接字
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0)
{
cerr << "socket: " << strerror(errno) << endl;
exit(SOCKET_ERR);
}
// 2.connect,发起连接请求,你想谁发起请求呢?当然是想服务器发起请求喽
// 2.1 先填充需要连接的远端主机的基本信息
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
inet_aton(serverIp.c_str(), &server.sin_addr);
server.sin_port = ntohs(serverPort);
// 2.2发送请求,connect会自动帮我们进行bind!
if (connect(sock, (const sockaddr *)&server, sizeof(server)) != 0)
{
cerr << "connect: " << strerror(errno) << endl;
exit(CONN_ERR);
}
cout << "info connect success: " << sock << endl;
string message;
while(!quit)
{
cout<<"请输入你的消息>> ";
getline(cin,message);
if(strcasecmp(message.c_str(),"quit")==0)
quit=true;
ssize_t s=write(sock,message.c_str(),message.size());
if(s>0)
{
char outbuffer[BUFFER_SIZE];
ssize_t s=read(sock,outbuffer,BUFFER_SIZE);
if(s>0)
{
message[s]='\0';
cout<<"Server Echo>> "<<message<<endl;
}
}
else if(s<=0)
{
break;
}
message.clear();
}
close(sock);
return 0;
}
由于客户端不需要固定的端口号,因此不必调用bind(),客户端的端口号由内核自动分配.
注意:
- 客户端不是不允许bind(),只是没有必要调用bind()固定一个端口号.否则如果在同一台机器上启动多个客户端,就会出现端口号被占用导致不能正确建立连接.
- 服务器也不是必须调用bind(),但如果服务器不调用bind(),内核会自动给服务器分配监听端口,每次启动服务器端口号都不一样,客户端要连接服务器就会遇到麻烦;
服务器
在这里服务器分为三个版本
单进程版本
#include "util.hpp"
#include "log.hpp"
using namespace std;
class ServerTcp
{
public:
ServerTcp(uint16_t port, string ip = "")
: _listenSock(-1), _port(port), _ip(ip)
{
}
~ServerTcp()
{
}
public:
void init()
{
// 1.创建socket
_listenSock = socket(AF_INET, SOCK_STREAM, 0);
if (_listenSock < 0)
{
logMessage(FATAL, "socket:%s", strerror(errno));
exit(SOCKET_ERR);
}
logMessage(DEBUG, "socket:&s,%d", strerror(errno), _listenSock);
// 2.bind绑定
// 2.1填充服务器
struct sockaddr_in local; //用户栈
memset(&local, 0, sizeof local);
local.sin_family = AF_INET;
local.sin_port = htons(_port);
_ip.empty() ? (local.sin_addr.s_addr = INADDR_ANY) : (inet_aton(_ip.c_str(), &local.sin_addr));
// 2.2本地socket信息,写入_sock对应的内核区域
if (bind(_listenSock, (const sockaddr *)&local, sizeof local) < 0)
{
logMessage(FATAL, "bind: %s", strerror(errno));
exit(BIND_ERR);
}
logMessage(DEBUG, "bind: %s", strerror(errno));
// 3.监听socket,为何要监听呢?tcp是面向连接的!
if (listen(_listenSock, 5 /*后面再说*/) < 0)
{
logMessage(FATAL, "listen: %s", strerror(errno));
exit(LISTEN_ERR);
}
logMessage(DEBUG, "listen: %s", strerror(errno));
//允许别人来连接你了
}
void loop()
{
while (true)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
// 4.获取连接,accept的返回值是一个新的socket fd??
int serviceSock = accept(_listenSock, (struct sockaddr *)&peer, &len);
if (serviceSock < 0)
{
//获取连接失败
logMessage(WARINING, "accept: &s[%d]", strerror(errno), serviceSock);
continue;
}
// 4.1获取客户端基本信息
uint16_t peerPort = ntohs(peer.sin_port);
string peerIp = inet_ntoa(peer.sin_addr);
logMessage(DEBUG, "accept: %s | %s[%d],socker fd: %d",
strerror(errno), peerIp.c_str(), peerPort, serviceSock);
// 5.提供服务,小写转大写
// // 5.0 v0版本----单进程--一旦进行transService,主执行流就无法进行向后执行,只能提供完毕服务后才能进行accept
transService(serviceSock, peerIp, peerPort);
// logMessage(DEBUG,"server provide service start ...");
// sleep(1);
}
}
// TCP && UDP支持全双工
void transService(int sock, string &clientIp, uint16_t clientPort)
{
assert(sock > 0);
assert(!clientIp.empty());
assert(clientPort > 1024);
char inbuffer[BUFFER_SIZE];
while (true)
{
ssize_t s = read(sock, inbuffer, sizeof(inbuffer) - 1); //我们认为读到的都是字符串
if (s > 0)
{
// read success
inbuffer[s] = '\0';
if (strcasecmp(inbuffer, "quit") == 0)
{
logMessage(DEBUG, "client quit -- %s[%d]", clientIp.c_str(), clientPort);
break;
}
logMessage(DEBUG, "trans before: %s[%d]>> %s", clientIp.c_str(), clientPort, inbuffer);
//可以进行大小写转化了
for (int i = 0; i < s; i++)
{
if (isalpha(inbuffer[i]) && islower(inbuffer[i]))
inbuffer[i] = toupper(inbuffer[i]);
}
write(sock, inbuffer, sizeof(inbuffer));
logMessage(DEBUG, "trans after: %s[%d]>> %s", clientIp.c_str(), clientPort, inbuffer);
}
else if (s == 0)
{
// pipe:读端一直在读,写端不写了,并且关闭了写端,读端会如何?--->s==0,代表对端关闭
// s==0,代表对方关闭,Client退出
logMessage(DEBUG, "client quit -- %s[%d]", clientIp.c_str(), clientPort);
break;
}
else
{
logMessage(DEBUG, "%s[%d] -- read: %s", clientIp.c_str(), clientPort, strerror(errno));
break;
}
}
//只要走到这里,一定是client退出了,服务到此结束
close(sock); // 如果一个进程对应的文件fd,打开了没有被归还,文件描述符泄露!
logMessage(DEBUG, "server close %d done", sock);
}
private:
int _listenSock;
uint16_t _port;
string _ip;
};
static void Usage(string proc)
{
cerr << "Usage\n\t" <<proc<<" port ip"<<endl;
cerr << "Example\n\t" <<proc<<" 8080 127.0.0.1\n"<<endl;
}
// ./serverTcp local_port [local_ip]
int main(int argc, char * argv[])
{
if (argc != 2 && argc != 3)
{
Usage(argv[0]);
exit(USAGE_ERR);
}
uint16_t port=stoi(argv[1]);
string ip;
if(argc==3)
{
ip=argv[2];
}
ServerTcp svr(port,ip);
svr.init();
svr.loop();
return 0;
}
测试多个连接的情况
在启动一个客户端,尝试连接服务器,发现第二个客户端,不能正确的和服务器进行通信.
分析原因,是因为我们accept了一个请求之后,就一直while循环尝试read,没有继续低啊用到的accept,导致不能接受新的请求.
也就是说一旦进行transService,主执行流就无法进行向后执行,只能提供完毕服务后才能进行accept!
我们当前这个TCP,只能处理一个连接,这是不科学的.
多进程版本
这种多进程版本只在Linux下有效!因为子进程执行完代码以后就进入僵尸状态,需要等父进程来回收资源.
这种方法让父进程忽略SIGCHLD的信号,就是子进程执行完自己的代码直接退出即可,不让父进程回收自己的资源.
void loop()
{
signal(SIGCHLD,SIG_IGN);//只在Linux下有效
while (true)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
// 4.获取连接,accept的返回值是一个新的socket fd??
int serviceSock = accept(_listenSock, (struct sockaddr *)&peer, &len);
if (serviceSock < 0)
{
//获取连接失败
logMessage(WARINING, "accept: &s[%d]", strerror(errno), serviceSock);
continue;
}
// 4.1获取客户端基本信息
uint16_t peerPort = ntohs(peer.sin_port);
string peerIp = inet_ntoa(peer.sin_addr);
logMessage(DEBUG, "accept: %s | %s[%d],socker fd: %d",
strerror(errno), peerIp.c_str(), peerPort, serviceSock);
// 5.提供服务,小写转大写
//5.1 V1---多进程版本---父进程打开的文件会被子进程继承!
pid_t id=fork();
assert(id!=-1);
if(id==0)
{
close(_listenSock);
//子进程
transService(serviceSock, peerIp, peerPort);
exit(0);
}
//父进程
close(serviceSock);//这一步是一定要做的!
// logMessage(DEBUG,"server provide service start ...");
// sleep(1);
}
}
创建爸爸进程之后,爸爸进程又创建了孙子进程,然后直接让爸爸进程退出,然后爷爷进程直接回收爸爸进程的资源,然后让孙子进程去执行代码,又因为孙子进程没有父进程,就是一个孤儿进程,就是被系统领养了,然后孤儿进程的资源回收就交给了系统进程.
void loop()
{
while (true)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
// 4.获取连接,accept的返回值是一个新的socket fd??
int serviceSock = accept(_listenSock, (struct sockaddr *)&peer, &len);
if (serviceSock < 0)
{
//获取连接失败
logMessage(WARINING, "accept: &s[%d]", strerror(errno), serviceSock);
continue;
}
// 4.1获取客户端基本信息
uint16_t peerPort = ntohs(peer.sin_port);
string peerIp = inet_ntoa(peer.sin_addr);
logMessage(DEBUG, "accept: %s | %s[%d],socker fd: %d",
strerror(errno), peerIp.c_str(), peerPort, serviceSock);
// 5.提供服务,小写转大写
// //5.1 V1.1---多进程版本
//爷爷进程
pid_t id=fork();
if(id==0)
{
//爸爸进程
close(_listenSock);
//又进行了一次fork
if(fork()>0) exit(0);
//孙子进程--就没有爸爸进程了--孤儿进程--被系统领养了--回收问题就交给了系统来回收
transService(serviceSock, peerIp, peerPort);
exit(0);
}
close(serviceSock);
//爸爸进程直接终止,立马得到退出码,释放僵尸进程状态
pid_t ret=waitpid(id,nullptr,0);//就用阻塞式等待
(void)ret;
// logMessage(DEBUG,"server provide service start ...");
// sleep(1);
}
}
多线程版本
class ServerTcp;
struct ThreadData
{
ThreadData(int sock, string clientIp, uint16_t clientPort,ServerTcp* ts)
:_sock(sock)
,_clientIp(clientIp)
,_clientPort(clientPort)
,_this(ts)
{
}
int _sock;
string _clientIp;
uint16_t _clientPort;
ServerTcp* _this;
};
static void* startRountine(void* args)
{
pthread_detach(pthread_self());
ThreadData* td=static_cast<ThreadData*>(args);
td->_this->transService(td->_sock,td->_clientIp,td->_clientPort);
delete(td);
}
void loop()
{
while (true)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
// 4.获取连接,accept的返回值是一个新的socket fd??
int serviceSock = accept(_listenSock, (struct sockaddr *)&peer, &len);
if (serviceSock < 0)
{
//获取连接失败
logMessage(WARINING, "accept: &s[%d]", strerror(errno), serviceSock);
continue;
}
// 4.1获取客户端基本信息
uint16_t peerPort = ntohs(peer.sin_port);
string peerIp = inet_ntoa(peer.sin_addr);
logMessage(DEBUG, "accept: %s | %s[%d],socker fd: %d",
strerror(errno), peerIp.c_str(), peerPort, serviceSock);
// 5.提供服务,小写转大写
//5.2 v2版本---多线程
//这里不需要关闭文件描述符了!
//多线程是会共享文件描述符表的!
ThreadData* td=new ThreadData(serviceSock,peerIp,peerPort,this);
pthread_t tid;
pthread_create(&tid,nullptr,startRountine,&td);
// logMessage(DEBUG,"server provide service start ...");
// sleep(1);
}
}
这样我们就可以进行正常的大小写转化服务啦!!!
四、线程池版的TCP
lock.hpp
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&_lock, nullptr);
}
void lock()
{
pthread_mutex_lock(&_lock);
}
void unlock()
{
pthread_mutex_unlock(&_lock);
}
~Mutex()
{
pthread_mutex_destroy(&_lock);
}
private:
pthread_mutex_t _lock;
};
class LockGuard
{
public:
LockGuard(Mutex* mutex)
: _mutex(mutex)
{
_mutex->lock();
std::cout<<"加锁成功..."<<std::endl;
}
~LockGuard()
{
_mutex->unlock();
std::cout<<"解锁成功..."<<std::endl;
}
private:
Mutex* _mutex;
};
Task.hpp
#pragma once
#include <iostream>
#include <string>
#include<functional>
#include"log.hpp"
#include<pthread.h>
class Task
{
public:
//等价于
// typedef std::function<void(int,std::string,uint16_t)> callback_t;
using callback_t=std::function<void (int, std::string, uint16_t)>;
private:
int _sock;//给用户提供任务IO服务的sock
std::string _ip;//client ip
uint16_t _port;//client port
callback_t _func;//回调方法
public:
Task():_sock(-1),_port(-1)
{}
Task(int sock,std::string ip,uint16_t port,callback_t func)
:_sock(sock),_ip(ip),_port(port),_func(func)
{}
void operator()()
{
logMessage(DEBUG,"线程ID[%p]->处理%s:%d的请求 开始啦....",\
pthread_self(),_ip.c_str(),_port);
_func(_sock,_ip,_port);
logMessage(DEBUG,"线程ID[%p]->处理%s:%d的请求 结束啦....",\
pthread_self(),_ip.c_str(),_port);
}
~Task()
{}
};
ThreadPool.hpp
#pragma once
#include <iostream>
#include <queue>
#include <cassert>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
#include<sys/prctl.h>
#include "Task.hpp"
#include "lock.hpp"
using namespace std;
const int gThreadNum = 15;
//设计为懒汉模式
template <class T>
class ThreadPool
{
private:
ThreadPool(int threadNum = gThreadNum)
: _threadNum(threadNum), _isStart(false)
{
assert(_threadNum > 0);
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
ThreadPool(const ThreadPool<T> &) = delete;
void operator=(const ThreadPool<T> &) = delete;
public:
static ThreadPool<T> *getInstance()
{
static Mutex mutex;
if (nullptr == instance)//仅仅是过滤重复的判断
{
LockGuard lockguard(&mutex);//进入代码块,加锁,退出代码块,自动解锁
if (nullptr == instance)
{
instance = new ThreadPool<T>();
}
}
return instance;
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
int ThreadNum()
{
return _threadNum;
}
//类内成员,成员函数都有默认参数this
static void *threadRoutine(void *args)
{
pthread_detach(pthread_self());
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
while (true)
{
tp->lockQueue();
while (!tp->haveTask())
{
tp->waitForTask();
}
T t = tp->pop();
tp->unlockQueue();
t();//让指定的线程处理这个任务
}
}
void start()
{
assert(!_isStart);
for (int i = 0; i < _threadNum; i++)
{
pthread_t tmp;
pthread_create(&tmp, nullptr, threadRoutine, this);
}
_isStart = true;
}
void push(const T &in)
{
lockQueue();
_taskQueue.push(in);
choiceThreadForHandler();
unlockQueue();
}
private:
void lockQueue() { pthread_mutex_lock(&_mutex); }
void unlockQueue() { pthread_mutex_unlock(&_mutex); }
bool haveTask() { return !_taskQueue.empty(); }
void waitForTask() { pthread_cond_wait(&_cond, &_mutex); }
void choiceThreadForHandler() { pthread_cond_signal(&_cond); }
T pop()
{
T tmp = _taskQueue.front();
_taskQueue.pop();
return tmp;
}
private:
bool _isStart;
int _threadNum;
queue<T> _taskQueue;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
static ThreadPool<T> *instance;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;
serverTcp.cc
#include "util.hpp"
#include "log.hpp"
#include "ThreadPool.hpp"
#include "Task.hpp"
#include"daemonize.hpp"
using namespace std;
class ServerTcp;
struct ThreadData
{
ThreadData(int sock, string clientIp, uint16_t clientPort, ServerTcp *ts)
: _sock(sock), _clientIp(clientIp), _clientPort(clientPort), _this(ts)
{
}
int _sock;
string _clientIp;
uint16_t _clientPort;
ServerTcp *_this;
};
//大小写转化
// TCP && UDP支持全双工
void transService(int sock, const string &clientIp, uint16_t clientPort)
{
assert(sock > 0);
assert(!clientIp.empty());
assert(clientPort > 1024);
char inbuffer[BUFFER_SIZE];
while (true)
{
ssize_t s = read(sock, inbuffer, sizeof(inbuffer) - 1); //我们认为读到的都是字符串
if (s > 0)
{
// read success
inbuffer[s] = '\0';
if (strcasecmp(inbuffer, "quit") == 0)
{
logMessage(DEBUG, "client quit -- %s[%d]", clientIp.c_str(), clientPort);
break;
}
logMessage(DEBUG, "trans before: %s[%d]>> %s", clientIp.c_str(), clientPort, inbuffer);
//可以进行大小写转化了
for (int i = 0; i < s; i++)
{
if (isalpha(inbuffer[i]) && islower(inbuffer[i]))
inbuffer[i] = toupper(inbuffer[i]);
}
write(sock, inbuffer, sizeof(inbuffer));
logMessage(DEBUG, "trans after: %s[%d]>> %s", clientIp.c_str(), clientPort, inbuffer);
}
else if (s == 0)
{
// pipe:读端一直在读,写端不写了,并且关闭了写端,读端会如何?--->s==0,代表对端关闭
// s==0,代表对方关闭,Client退出
logMessage(DEBUG, "client quit -- %s[%d]", clientIp.c_str(), clientPort);
break;
}
else
{
logMessage(DEBUG, "%s[%d] -- read: %s", clientIp.c_str(), clientPort, strerror(errno));
break;
}
}
//只要走到这里,一定是client退出了,服务到此结束
close(sock); // 如果一个进程对应的文件fd,打开了没有被归还,文件描述符泄露!
logMessage(DEBUG, "server close %d done", sock);
}
class ServerTcp
{
public:
ServerTcp(uint16_t port, string ip = "")
: _listenSock(-1), _port(port), _ip(ip), _tp(nullptr)
{
}
~ServerTcp()
{
}
public:
void init()
{
// 1.创建socket
_listenSock = socket(AF_INET, SOCK_STREAM, 0);
if (_listenSock < 0)
{
logMessage(FATAL, "socket:%s", strerror(errno));
exit(SOCKET_ERR);
}
logMessage(DEBUG, "socket:&s,%d", strerror(errno), _listenSock);
// 2.bind绑定
// 2.1填充服务器
struct sockaddr_in local; //用户栈
memset(&local, 0, sizeof local);
local.sin_family = AF_INET;
local.sin_port = htons(_port);
_ip.empty() ? (local.sin_addr.s_addr = INADDR_ANY) : (inet_aton(_ip.c_str(), &local.sin_addr));
// 2.2本地socket信息,写入_sock对应的内核区域
if (bind(_listenSock, (const sockaddr *)&local, sizeof local) < 0)
{
logMessage(FATAL, "bind: %s", strerror(errno));
exit(BIND_ERR);
}
logMessage(DEBUG, "bind: %s", strerror(errno));
// 3.监听socket,为何要监听呢?tcp是面向连接的!
if (listen(_listenSock, 5 /*后面再说*/) < 0)
{
logMessage(FATAL, "listen: %s", strerror(errno));
exit(LISTEN_ERR);
}
logMessage(DEBUG, "listen: %s", strerror(errno));
//允许别人来连接你了
// 4.加载线程池
_tp = ThreadPool<Task>::getInstance();
}
void loop()
{
// signal(SIGCHLD,SIG_IGN);//只在Linux下有效
_tp->start();
logMessage(DEBUG, "thread pool start success,thread num: %d", _tp->ThreadNum());
while (true)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
// 4.获取连接,accept的返回值是一个新的socket fd??
// 4.1 _listenScok:监听&&获取新的连接--->sock
// 4.2 serviceSock:给用户提供新的socket服务
int serviceSock = accept(_listenSock, (struct sockaddr *)&peer, &len);
if (serviceSock < 0)
{
//获取连接失败
logMessage(WARINING, "accept: &s[%d]", strerror(errno), serviceSock);
continue;
}
// 4.1获取客户端基本信息
uint16_t peerPort = ntohs(peer.sin_port);
string peerIp = inet_ntoa(peer.sin_addr);
logMessage(DEBUG, "accept: %s | %s[%d],socker fd: %d",
strerror(errno), peerIp.c_str(), peerPort, serviceSock);
// 5.提供服务,小写转大写
Task t(serviceSock, peerIp, peerPort, transService);
_tp->push(t);
}
}
private:
int _listenSock;
uint16_t _port;
string _ip;
//引入线程池
ThreadPool<Task> *_tp;
};
static void Usage(string proc)
{
cerr << "Usage\n\t" << proc << " port ip" << endl;
cerr << "Example\n\t" << proc << " 8080 127.0.0.1\n"
<< endl;
}
// ./serverTcp local_port [local_ip]
int main(int argc, char *argv[])
{
if (argc != 2 && argc != 3)
{
Usage(argv[0]);
exit(USAGE_ERR);
}
uint16_t port = stoi(argv[1]);
string ip;
if (argc == 3)
{
ip = argv[2];
}
ServerTcp svr(port, ip);
svr.init();
svr.loop();
return 0;
}
我把打印出来的日志消息全部放入一个日志文件中,在日志文件中可以查看到!
我们设置了对应的任务是死循环,那么线程池提供服务,就显得不太合适.因为初始化的线程数量只有五个,就代表只能有五个人连接服务器.
一般我们给线程池跑入的任务都是短任务.
更改提供的服务
这里我们需要用到一个函数popen();
popen()可以执行shell命令,并读取此命令的返回值;
popen()函数通过创建一个管道,调用fork()产生一个子进程,执行一个shell以运行命令来开启一个进程.可以通过这个管道执行标准输入输出操作.这个管道必须由pclose()函数关闭!!!而不是fclose()函数(若使用fclose()函数关闭则会产生僵尸进程.)
tyoe参数只能是读或者写中的一种,得到的返回值(标准I/O流)也具有和type相应的只读或者只写类型.如果type是"r"则文件指针连接到command的标准输出;如果type是"w"则文件指针连接到command的标准输入.
command蚕食是一个只想以NULL结束的shell命令字符串的指针.这行命令将被传到bin/sh并使用-c标志,shell将执行这个命令!
serverTcp.cc
#include "util.hpp"
#include "log.hpp"
#include "ThreadPool.hpp"
#include "Task.hpp"
#include"daemonize.hpp"
using namespace std;
class ServerTcp;
struct ThreadData
{
ThreadData(int sock, string clientIp, uint16_t clientPort, ServerTcp *ts)
: _sock(sock), _clientIp(clientIp), _clientPort(clientPort), _this(ts)
{
}
int _sock;
string _clientIp;
uint16_t _clientPort;
ServerTcp *_this;
};
void execCommand(int sock, const string &clientIp, uint16_t clientPort)
{
assert(sock > 0);
assert(!clientIp.empty());
assert(clientPort > 1024);
char command[BUFFER_SIZE];
while (true)
{
ssize_t s = read(sock, command, sizeof(command) - 1); //我们认为读到的都是字符串
if (s > 0)
{
command[s]='\0';
logMessage(DEBUG,"[%s:%d] exec [%s]",clientIp.c_str(),clientPort,command);
std::string safe;
if((safe.find("rm")!=std::string::npos)||(safe.find("unlink")!=std::string::npos))
{
break;
}
// 我们是以r方式打开的文件,没有写入
//所以我们无法通过dup2的方式得到对应的结果
FILE* fp=popen(command,"r");
if(fp==nullptr)
{
logMessage(WARINING,"exec %s failed, because: %s",command,strerror(errno));
break;
}
char line[1024];
while(fgets(line,sizeof(line)-1,fp)!=nullptr)
{
write(sock,line,strlen(line));
}
// dup2(sock,fp->_fileno);
// fflush(fp);
pclose(fp);
logMessage(DEBUG,"[%s]:%d exec [%s]... done",clientIp.c_str(),clientPort,command);
}
else if (s == 0)
{
// pipe:读端一直在读,写端不写了,并且关闭了写端,读端会如何?--->s==0,代表对端关闭
// s==0,代表对方关闭,Client退出
logMessage(DEBUG, "client quit -- %s[%d]", clientIp.c_str(), clientPort);
break;
}
else
{
logMessage(DEBUG, "%s[%d] -- read: %s", clientIp.c_str(), clientPort, strerror(errno));
break;
}
}
//只要走到这里,一定是client退出了,服务到此结束
close(sock); // 如果一个进程对应的文件fd,打开了没有被归还,文件描述符泄露!
logMessage(DEBUG, "server close %d done", sock);
}
class ServerTcp
{
public:
ServerTcp(uint16_t port, string ip = "")
: _listenSock(-1), _port(port), _ip(ip), _tp(nullptr)
{
}
~ServerTcp()
{
}
public:
void init()
{
// 1.创建socket
_listenSock = socket(AF_INET, SOCK_STREAM, 0);
if (_listenSock < 0)
{
logMessage(FATAL, "socket:%s", strerror(errno));
exit(SOCKET_ERR);
}
logMessage(DEBUG, "socket:&s,%d", strerror(errno), _listenSock);
// 2.bind绑定
// 2.1填充服务器
struct sockaddr_in local; //用户栈
memset(&local, 0, sizeof local);
local.sin_family = AF_INET;
local.sin_port = htons(_port);
_ip.empty() ? (local.sin_addr.s_addr = INADDR_ANY) : (inet_aton(_ip.c_str(), &local.sin_addr));
// 2.2本地socket信息,写入_sock对应的内核区域
if (bind(_listenSock, (const sockaddr *)&local, sizeof local) < 0)
{
logMessage(FATAL, "bind: %s", strerror(errno));
exit(BIND_ERR);
}
logMessage(DEBUG, "bind: %s", strerror(errno));
// 3.监听socket,为何要监听呢?tcp是面向连接的!
if (listen(_listenSock, 5 /*后面再说*/) < 0)
{
logMessage(FATAL, "listen: %s", strerror(errno));
exit(LISTEN_ERR);
}
logMessage(DEBUG, "listen: %s", strerror(errno));
//允许别人来连接你了
// 4.加载线程池
_tp = ThreadPool<Task>::getInstance();
}
void loop()
{
// signal(SIGCHLD,SIG_IGN);//只在Linux下有效
_tp->start();
logMessage(DEBUG, "thread pool start success,thread num: %d", _tp->ThreadNum());
while (true)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
// 4.获取连接,accept的返回值是一个新的socket fd??
// 4.1 _listenScok:监听&&获取新的连接--->sock
// 4.2 serviceSock:给用户提供新的socket服务
int serviceSock = accept(_listenSock, (struct sockaddr *)&peer, &len);
if (serviceSock < 0)
{
//获取连接失败
logMessage(WARINING, "accept: &s[%d]", strerror(errno), serviceSock);
continue;
}
// 4.1获取客户端基本信息
uint16_t peerPort = ntohs(peer.sin_port);
string peerIp = inet_ntoa(peer.sin_addr);
logMessage(DEBUG, "accept: %s | %s[%d],socker fd: %d",
strerror(errno), peerIp.c_str(), peerPort, serviceSock);
Task t(serviceSock, peerIp, peerPort, execCommand);
_tp->push(t);
}
}
private:
int _listenSock;
uint16_t _port;
string _ip;
//引入线程池
ThreadPool<Task> *_tp;
};
static void Usage(string proc)
{
cerr << "Usage\n\t" << proc << " port ip" << endl;
cerr << "Example\n\t" << proc << " 8080 127.0.0.1\n"
<< endl;
}
// ./serverTcp local_port [local_ip]
int main(int argc, char *argv[])
{
if (argc != 2 && argc != 3)
{
Usage(argv[0]);
exit(USAGE_ERR);
}
uint16_t port = stoi(argv[1]);
string ip;
if (argc == 3)
{
ip = argv[2];
}
ServerTcp svr(port, ip);
svr.init();
svr.loop();
return 0;
}
总结
(本章完!)