观前提示:本篇博文的一些接口需要前几篇博文实现的
- 线程池的实现
Liunx--线程池的实现--0208 09_Gosolo!的博客-CSDN博客
- 线程池的单例模式
Linux--线程安全的单例模式--自旋锁--0211_Gosolo!的博客-CSDN博客
1.TCP编程需要用的接口
创建 socket 文件描述符
int socket(int domain, int type, int protocol);
type 给成 SOCK_STREAM 表示是流式套接
listensock=socket(AF_INET,SOCK_STREAM,0);
开始监听socket
int listen(int socket, int backlog);
TCP是面向连接的,listen其实也是一个套接字,不过他的用途在于建立连接,而不真正提供服务。类似拉人的,提供服务的是服务员。
接收请求
int accept(int socket, struct sockaddr* address,socklen_t* address_len);
相当于拉客的和服务员进行了交接,返回值是真正提供服务的套接字(fd)
建立连接
int connect(int sockfd, const struct sockaddr *addr,socklen_t addrlen);
在正式通信之前,需要先建立连接。客户端需要连接到连接。
2.TCP编程的框架
2.1 服务端 TcpServer的框架 tcp_server.hpp
class TcpServer
{
private:
const static int gbacklog = 20; //后面再说
public:
TcpServer(uint16_t port, std::string ip="0.0.0.0")
:_port(port)
,_ip(ip)
,listensock(-1)
{}
void initServer()
{}
void Start()
{}
~TcpServer()
{}
private:
uint16_t _port;
std::string _ip;
int listensock;//listensock套接字仅用于建立连接
};
头文件在这里一次性给出
#pragma once
#include <iostream>
#include <string>
#include <unordered_map>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <signal.h>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/wait.h>
#include <pthread.h>
#include <ctype.h>
//这几个头文件在线程池里面 其中Task.hpp会在用时给出
#include "ThreadPool/log.hpp"
#include "ThreadPool/ThreadPool.hpp"
#include "ThreadPool/Task.hpp"
2.1.2 TcpServer的调用 tcp_server.cc
#include "tcp_server.hpp"
#include <memory>
static void usage(std::string proc)
{
std::cout << "\nUsage: " << proc << " port\n" << std::endl;
}
// ./tcp_server port
int main(int argc, char *argv[])
{
if(argc != 2)
{
usage(argv[0]);
exit(1);
}
uint16_t port = atoi(argv[1]);
std::unique_ptr<TcpServer> svr(new TcpServer(port));
svr->initServer();
svr->Start();
return 0;
}
2.2 客户端 TcpClient的实现 tcp_client.cc
网络间通信首先需要 struct sockaddr_in 结构体,这里定义一个对象 server
结构对象有了需要对他进行初始化(建议先全部置零,避免出现一些bug)
- server 有三个地方需要做初始化,sin_family、sin_port、sin_addr.s_addr
server.sin_family=AF_INET; //跟申请套接字传入的参数一样即可
server.sin_port=htos(serverport); //记得从本地转为网络字节序
server.sin_addr.s_addr=inet_addr(serverip,c_str());
- connect()接口 客户端需要连接到服务上
connect(sock, (struct sockaddr *)&server, sizeof(server) );
- send() 向服务端发送消息
ssize_t s = send(sock, line.c_str(), line.size(), 0);
- recv() 接收服务器发来的消息
recv(sock, buffer, sizeof(buffer) - 1, 0);
- close() 关闭套接字
close(sock);
#include <iostream>
#include <string>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/in.h>
void usage(std::string proc)
{
std::cout << "\nUsage: " << proc << " serverIp serverPort\n"
<< std::endl;
}
// ./tcp_client targetIp targetPort
int main(int argc, char *argv[])
{
if (argc != 3)
{
usage(argv[0]);
exit(1);
}
std::string serverip = argv[1];
uint16_t serverport = atoi(argv[2]);
int sock = 0;
while (true) // TODO
{
sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0)
{
std::cerr << "socket error" << std::endl;
exit(2);
}
// client 要不要bind呢?不需要显示的bind,但是一定是需要port
// 需要让os自动进行port选择
// 连接别人的能力!
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(serverport);
server.sin_addr.s_addr = inet_addr(serverip.c_str());
if (connect(sock, (struct sockaddr *)&server, sizeof(server)) < 0)
{
std::cerr << "connect error" << std::endl;
exit(3); // TODO
}
std::cout << "connect success" << std::endl;
std::cout << "请输入# ";
std::string line;
std::getline(std::cin, line);
if (line == "quit")
{
close(sock);
break;
}
ssize_t s = send(sock, line.c_str(), line.size(), 0);
if (s > 0)
{
char buffer[1024];
ssize_t s = recv(sock, buffer, sizeof(buffer) - 1, 0);
if (s > 0)
{
buffer[s] = 0;
std::cout << "server 回显# " << buffer << std::endl;
close(sock);
}
else if (s == 0)
{
close(sock);
}
}
else
{
close(sock);
}
}
return 0;
}
3. 服务端的实现
3.1 TcpServer::initServer()
- 初始化,首先需要建立一个监听套接字listensock,这个监听套接字的作用是建立连接。
listensock=socket(AF_INET,SOCK_STREAM,0);
我们是网络之间进行通信,所以需要借助 struct sockaddr_in 类型的对象,local,创建出来之后最好先清空一下,保证不会出现一些奇怪的问题。
- local 有三个地方需要做初始化,sin_family、sin_port、sin_addr.s_addr
local.sin_family=AF_INET; //跟申请套接字传入的参数一样即可
local.sin_port=htos(_port); //记得从本地转为网络字节序
local.sin_addr.s_addr=_ip=="0.0.0.0"?INADDR_ANY:inet_addr(_ip.c_str());
- 接着和udp一样,进行绑定操作
//注意 虽然使用struct sockaddr_in 结构体类型 但是接口中的参数依然是 struct sockaddr*
bind(listensock, (struct sockaddr *)&local, sizeof(local);
- (tcp新增)建立连接
listen(listensock, gbacklog);//这个gbacklog以后再谈
3.1.1 完整代码
class TcpServer
{
private:
const static int gbacklog = 20; //后面再说
public:
TcpServer(uint16_t port, std::string ip="0.0.0.0")
:_port(port)
,_ip(ip)
,listensock(-1)
{}
void initServer()
{
listensock=socket(AF_INET,SOCK_STREAM,0);
if(listensock<0)
{
logMessage(FATAL, "create socket error, %d:%s", errno, strerror(errno));
exit(2);
}
logMessage(NORMAL, "create socket success, listensock: %d", listensock);
struct sockaddr_in local;
memset(&local,0,sizeof(local));
local.sin_family=AF_INET;
local.sin_port=htons(_port);
//local.sin_addr.s_addr=_ip.empty()?INADDR_ANY:inet_addr(_ip.c_str());
local.sin_addr.s_addr=_ip=="0.0.0.0"?INADDR_ANY:inet_addr(_ip.c_str());
//inet_pton(AF_INET, _ip.c_str(), &local.sin_addr);
if (bind(listensock, (struct sockaddr *)&local, sizeof(local)) < 0)
{
logMessage(FATAL, "bind error, %d:%s", errno, strerror(errno));
exit(3);
}
if(listen(listensock, gbacklog)<0)
{
logMessage(FATAL, "listen error, %d:%s", errno, strerror(errno));
exit(4);
}
logMessage(NORMAL, "init server success");
}
}
3.2 TcpServer::Start()
要想服务端程序开始运行,首先需要有人来提供服务,记得listensock的作用仅是建立连接吗,那谁来做具体的服务呢?
使用accept接口的返回值!
首先,网络通信需要struct sockaddr_in结构体,先创建一个,struct sockaddr_in src;,len就是这个结构体的长度。
int servicesock=accept(listensock,(struct sockaddr*)&src,&len);
连接成功时候,就可以从我们创建的src中获取端口号、ip信息了。
uint16_t client_port=ntohs(src.sin_port);
std::string client_ip=inet_ntoa(src.sin_addr);
接下来就可以开始进行服务了,自定义让做什么。这里让服务端去做这个任务
service(servicesock, client_ip, client_port);
static void service(int sock, const std::string &clientip,
const uint16_t &clientport)
{
//echo server
char buffer[1024];
while(true)
{
// read && write 可以直接被使用!
ssize_t s = read(sock, buffer, sizeof(buffer)-1);
if(s > 0)
{
buffer[s] = 0; //将发过来的数据当做字符串
std::cout << clientip << ":" << clientport << "# " << buffer
<< std::endl;
}
else if(s == 0) //对端关闭连接
{
logMessage(NORMAL, "%s:%d shutdown, me too!", clientip.c_str(),
clientport);
break;
}
else{ //
logMessage(ERROR, "read socket error, %d:%s", errno, strerror(errno));
break;
}
write(sock, buffer, strlen(buffer));
}
close(sock);
}
3.2.1 完整代码——单进程阻塞循环版
void Start()
{
while(true)
{
struct sockaddr_in src;
socklen_t len=sizeof(src);
//建立连接
int servicesock=accept(listensock,(struct sockaddr*)&src,&len);
if(servicesock < 0)
{
logMessage(ERROR, "accept error, %d:%s", errno, strerror(errno));
continue;
}
uint16_t client_port=ntohs(src.sin_port);
std::string client_ip=inet_ntoa(src.sin_addr);
logMessage(NORMAL, "link success, servicesock: %d | %s : %d |\n",
servicesock, client_ip.c_str(), client_port);
//版本1————单进程阻塞循环版
service(servicesock, client_ip, client_port);
}
问题在于,一次只能够处理一个进程。因为我们调用的函数service是一个死循环函数,如果一个客户端没有终止访问,其他客户端都不能正常来使用。
3.2.2 完整代码——多进程带信号屏蔽版
那我使用多进程来解决这个问题,可是使用多进程也有问题,我创建了子进程,那我是不是要等待子进程结束啊?如果我使用阻塞等待那和上面有什么本质区别呢?
注:使用非阻塞等待成本很大,可以但不建议。
所以我们还需要用到信号,当子进程结束后,会给父进程发SIGCHLD信号!
如果我们主动忽略SIGCHLD信号,子进程退出的时候,会自动释放自己的僵尸状态。
void Start()
{
signal(SIGCHLD, SIG_IGN);
while(true)
{
struct sockaddr_in src;
socklen_t len=sizeof(src);
//建立连接
int servicesock=accept(listensock,(struct sockaddr*)&src,&len);
if(servicesock < 0)
{
logMessage(ERROR, "accept error, %d:%s", errno, strerror(errno));
continue;
}
uint16_t client_port=ntohs(src.sin_port);
std::string client_ip=inet_ntoa(src.sin_addr);
logMessage(NORMAL, "link success, servicesock: %d | %s : %d |\n",
servicesock, client_ip.c_str(), client_port);
pid_t id = fork();
assert(id != -1);
if(id == 0)
{
close(listensock);//关闭不需要的文件描述符
service(servicesock, client_ip, client_port);
exit(0);
}
//关闭父进程不需要的文件描述符 不关闭会导致父进程可用的文件描述符越来越少
close(servicesock);
}
}
3.3.3 完整代码——多进程版
能不能不屏蔽SIGCHLD信号呢?那我们就需要有人等待子进程,让谁等呢?
让bash领养,让bash等!
void Start()
{
while(true)
{
//...跟上面一样
//版本3————多进程孤儿进程版
// 利用孤儿进程被系统回收
pid_t id=fork();
if(id==0)
{
close(listensock);
if(fork()>0)
{
//子进程本身
exit(0);
}
//子进程的子进程
service(servicesock, client_ip, client_port);
exit(0);
}
waitpid(id,nullptr,0);//由于子进程创建子进程后立即退出 所以父进程不会阻塞
close(servicesock);
}
}
3.3.4 完整代码——多线程版
相较于使用多进程,多线程的开销明显小。
class ThreadData
{
public:
int _sock;
std::string _ip;
uint16_t _port;
};
class TcpServer
{
private:
const static int gbacklog = 20; //后面再说
//设置的回调函数 必须是static的 不然会多一个this指针
static void *threadRoutine(void *args)
{
pthread_detach(pthread_self());
ThreadData *td = static_cast<ThreadData *>(args);
service(td->_sock, td->_ip, td->_port);
delete td;
return nullptr;
}
void Start()
{
while(true)
{
//...
//版本4————多线程版本
//因为创建一个进程的代价还是比较大的,创建一个线程相对简便
//不使用在栈上创建是为了保证线程安全 不会被覆盖 发生拷贝
ThreadData *td=new ThreadData();
td->_sock=servicesock;
td->_ip=client_ip;
td->_port=client_port;
pthread_t tid ;
//如果不join 一定会造成内存泄漏 可以在threadRoutine中设置等待
pthread_create(&tid,nullptr,threadRoutine,td);
}
}
};
3.3.5 完整代码——线程池版本
线程的创建也是一笔开销,能省就省
线程池版本的服务函数
//线程池版本的服务函数
static void service(int sock, const std::string &clientip,
const uint16_t &clientport, const std::string &thread_name)
{
// echo server
// 同时在线10人
// 所以,我们一般服务器进程业务处理,如果是从连上,到断开,要一直保持这个链接, 长连接
char buffer[1024];
while (true)
{
// read && write 可以直接被使用!
ssize_t s = read(sock, buffer, sizeof(buffer) - 1);
if (s > 0)
{
buffer[s] = 0; // 将发过来的数据当做字符串
std::cout << thread_name << "|" << clientip << ":" << clientport << "# " << buffer << std::endl;
}
else if (s == 0) // 对端关闭连接
{
logMessage(NORMAL, "%s:%d shutdown, me too!", clientip.c_str(), clientport);
break;
}
else
{ //
logMessage(ERROR, "read socket error, %d:%s", errno, strerror(errno));
break;
}
write(sock, buffer, strlen(buffer));
}
close(sock);
}
class TcpServer
{
public:
TcpServer(uint16_t port, std::string ip="0.0.0.0")
:_port(port)
,_ip(ip)
,listensock(-1)
, _threadpool_ptr(ThreadPool<Task>::getThreadPool())
{}
void Start()
{
//引入线程池
_threadpool_ptr->run();
//signal(SIGCHLD, SIG_IGN); // 对SIGCHLD,主动忽略SIGCHLD信号,子进程退出的时候,会自动释放自己的僵尸状态
while(true)
{
//...略
//线程池版本
Task t(servicesock, client_ip, client_port, service);
//Task t(servicesock, client_ip, client_port, dictOnline);
_threadpool_ptr->pushTask(t);
}
}
~TcpServer()
{}
private:
uint16_t _port;
std::string _ip;
int listensock;//仅用于建立连接
//定义一个线程池先
std::unique_ptr<ThreadPool<Task>> _threadpool_ptr;
};
pushTask
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include "log.hpp"
//typedef std::function<void (int , const std::string &, const uint16_t &)> func_t;
using func_t = std::function<void (int , const std::string &, const uint16_t &, const std::string &)>;
class Task
{
public:
Task(){}
Task(int sock, const std::string ip, uint16_t port, func_t func)
: _sock(sock), _ip(ip), _port(port), _func(func)
{}
void operator ()(const std::string &name)
{
_func(_sock, _ip, _port, name);
}
public:
int _sock;
std::string _ip;
uint16_t _port;
// int type;
func_t _func;
};