目录
一、预备知识
1、网络通信理解
2、源IP地址和目的IP地址
3、端口号
二、网络字节序
三、socket编程接口
1、socket常见API
2、sockaddr结构
3、sockaddr结构体
3.1、sockaddr结构体
3.2、sockaddr_in结构体
四、简单的UDP网络程序
1、创建套接字接口
2、绑定端口号
3、服务器运行
4、创建客户端
5、数据传输
5.1、本地数据传输
5.2、网络数据传输
6、业务处理
7、多线程业务处理
五、简单的TCP网络程序
1、创建套接字与绑定
2、监听
3、获取连接
4、服务器运行
4.1、创建子进程的子进程
4.2、设置信号回收
4.3、多线程服务器
4.4、线程池服务器
5、创建客户端
六、 TCP协议通讯流程
一、预备知识
1、网络通信理解
主机之间进行网络通信,都是用户打开了某个应用的客户端,创建出了一个进程,并把相关数据发送到了另一个主机应用层之上的服务器上,服务器软件启动后本身也是一个进程。
因此网络通信,本质上是进程间通信。
网络传输的过程:
- 先将数据通过OS,发送到目标主机(手段),由TCP/IP完成,IP可以标识互联网上唯一的一台主机。
- 再在本主机上将收到的数据推送给自己上层的指定进程,由端口号标识自己主机上网络进程的唯一性。
- IP地址 + 端口号,可以表示整个互联网中唯一的一个进程。
总结:
网络通信是通过 IP + PORT 构建进程唯一性,来进行的基于网络的进程间通信,简称socket通信。
2、源IP地址和目的IP地址
在IP数据包头部中,有两个IP地址,分别叫做源IP地址和目的IP地址。源IP地址可以理解为记录最终节点的地址,目的IP地址为记录下一条主机的地址。
3、端口号
端口号(port)是传输层协议的内容。
- 端口号是一个2字节16位的整数。
- 端口号用来标识一个进程,告诉操作系统,当前的这个数据要交给哪一个进程来处理。
- IP地址 + 端口号能够标识网络上的某一台主机的某一个进程。
- 一个端口号只能被一个进程占用。
"端口号" 和 "进程ID":
我们之前在学习系统编程的时候,学习了 pid 表示唯一一个进程。此处我们的端口号也是唯一表示一个进程。那么为什么不能直接使用 pid ,而是再创造一个端口号呢?原因有两点:
- 并不是所有的进程都是网络进程,直接使用pid不能很好的区分哪些是需要进行网络通信的进程。
- 都使用pid会增加网络通信与OS进程管理的耦合度,网络通信需要包含进程管理的相关字段。
一个进程可以绑定多个端口号,但是一个端口号不能被多个进程绑定。端口号与pid是通过一个哈希表联系起来的。
我们之前说过,网络相关是被划分在文件操作之下的,想要使用网络,就需要打开网络文件,获得文件描述符,找到对应的缓冲区,把网络中的数据拷贝到缓冲区,就相当于通过文件的方式读取到了网络中的数据。
传输层协议(TCP和UDP)的数据段中有两个端口号,分别叫做源端口号和目的端口号。分别描述"数据是谁发的,要发给谁"。
二、网络字节序
内存中的多字节数据相对于内存地址有大端和小端之分,磁盘文件中的多字节数据相对于文件中的偏移地址也有大端小端之分,网络数据流同样有大端小端之分。
- 发送主机通常将发送缓冲区中的数据按内存地址从低到高的顺序发出。
- 接收主机把从网络上接到的字节依次保存在接收缓冲区中,也是按内存地址从低到高的顺序保存。
- 因此,网络数据流的地址应这样规定:先发出的数据是低地址,后发出的数据是高地址。
- TCP/IP协议规定,网络数据流应采用大端字节序,即低地址高字节。
- 不管这台主机是大端机还是小端机,都会按照这个TCP/IP规定的网络字节序来发送/接收数据。
- 如果当前发送主机是小端,就需要先将数据转成大端。否则就忽略,直接发送即可。
为使网络程序具有可移植性,使同样的C代码在大端和小端计算机上编译后都能正常运行,可以调用以下库函数做网络字节序和主机字节序的转换。
- 这些函数名很好记,h表示host,n表示network,l表示32位长整数,s表示16位短整数。
- 例如htonl表示将32位的长整数从主机字节序转换为网络字节序,例如将IP地址转换后准备发送。
- 如果主机是小端字节序,这些函数将参数做相应的大小端转换然后返回。
- 如果主机是大端字节序,这些函数不做转换,将参数原封不动地返回。
三、socket编程接口
1、socket常见API
// 创建 socket
// 文件描述符 (TCP/UDP, 客户端 + 服务器)
int socket(int domain, int type, int protocol);
// 绑定端口号 (TCP/UDP, 服务器)
int bind(int socket, const struct sockaddr *address,
socklen_t address_len);
// 开始监听socket (TCP, 服务器)
int listen(int socket, int backlog);
// 接收请求 (TCP, 服务器)
int accept(int socket, struct sockaddr* address,
socklen_t* address_len);
// 建立连接 (TCP, 客户端)
int connect(int sockfd, const struct sockaddr *addr,
socklen_t addrlen);
2、sockaddr结构
socket API 是一层抽象的网络编程接口,适用于各种底层网络协议,如IPv4、 IPv6,以及后面要讲的UNIX Domain Socket。然而,各种网络协议的地址格式并不相同:
- IPv4和IPv6的地址格式定义在netinet/in.h中,IPv4地址用sockaddr_in结构体表示,包括16位地址类型,16位端口号和32位IP地址。
- IPv4、 IPv6地址类型分别定义为常数AF_INET、 AF_INET6。这样,只要取得某种sockaddr结构体的首地址,不需要知道具体是哪种类型的sockaddr结构体,就可以根据地址类型字段确定结构体中的内容。
- socket API可以都用struct sockaddr* 类型表示,在使用的时候需要强制转化成sockaddr_in。这样的好处是程序的通用性,可以接收IPv4,IPv6,以及UNIX Domain Socket各种类型的sockaddr结构体指针做为参数。
- socket API之所以不用 void* 类型表示,仅仅是因为当时没有 void*,后来有了void* 之后,也就没有必要专门再改了。
3、sockaddr结构体
3.1、sockaddr结构体
3.2、sockaddr_in结构体
虽然socket api的接口是sockaddr,但是我们真正在基于IPv4编程时,使用的数据结构是sockaddr_in。这个结构里主要有三部分信息:地址类型(__SOCKADDR_COMMON (sin_)),端口号(sin_port),IP地址(sin_addr)。
__SOCKADDR_COMMON是一个宏:
## 可以把它两边的符号合成一个符号。所以最终整个 __SOCKADDR_COMMON (sin_) 语句会被替换为 sa_family_t sin_family 。sa_family_t是类型,sin_family 是变量。
in_addr 类型用来表示一个IPv4的IP地址,其实就是一个32位的整数。
下面的 sin_zero 是填充字段。
地址转换函数:
只介绍基于IPv4的socket网络编程,sockaddr_in中的成员struct in_addr sin_addr表示32位的IP地址。但是我们通常用点分十进制的字符串表示IP地址,以下函数可以在字符串表示和in_addr表示之间转换。
因为我们常见的IP地址都是字符串风格的,而 in_addr_t 是四字节整数类型,所以需要进行类型转化。字符串转in_addr的函数:
in_addr_t inet_addr(const char *cp);
inet_addr 函数在把字符串风格数据转换成四字节整数的同时,也把主机序列转换成了网络序列。
in_addr转字符串的函数:
四、简单的UDP网络程序
1、创建套接字接口
int socket(int domain, int type, int protocol);
socket 函数中, domain 是 16 位地址类型。 type 是套接字服务类型可设为 SOCK_STREAM (流式套接)、SOCK_DGRAM(用户数据报)。 protocol 表明使用哪种协议,指明了 type 后, protocol 设置为 0 ,系统自动判定是 TCP 还是 UDP。函数运行成功,返回值是文件描述符。失败则返回 -1,并且错误码会被设置。
void InitServer()
{
_sock = socket(AF_INET, SOCK_DGRAM, 0);
if(_sock < 0)
{
cerr << "create socket error: " << strerror(errno) << endl;
exit(1);
}
cout << "create socket success: " << _sock << endl;
}
编译运行:
2、绑定端口号
int bind(int sockfd, const struct sockaddr *addr,
socklen_t addrlen);
bind 函数中, sockfd 是 socket 函数的返回值。 addr 是 sockaddr 结构体指针。 addrlen 是结构体大小。 bind 函数执行成功,返回 0,失败返回 -1,并且错误码会被设置。
void InitServer()
{
//1.创建socket接口,打开网络文件
_sock = socket(AF_INET, SOCK_DGRAM, 0);
if (_sock < 0)
{
cerr << "create socket error: " << strerror(errno) << endl;
exit(SOCKET_ERR);
}
cout << "create socket success: " << _sock << endl;
//2.给服务器指明IP地址和端口号
struct sockaddr_in local; // 这个local是一个局部变量,被定义在用户空间的特定函数的栈帧上,不
//在内核中。
bzero(&local, sizeof(local));//清零
local.sin_family = AF_INET;
local.sin_port = htons(_port); //把本地主机构建的port序列转成网络序列
//1.字符串风格的IP地址,转换成4字节int类型
//2.将主机序列转化称为网络序列
local.sin_addr.s_addr = inet_addr(_ip.c_str());
//int n = bind(_sock, (struct sockaddr*)&local, sizeof(local));
if (bind(_sock, (struct sockaddr*)&local, sizeof(local)) < 0) //把local绑定到了内核中
{
cerr << "bind socket error: " << strerror(errno) << endl;
exit(BIND_ERR);
}
cout << "bind socket success: " << _sock << endl;
}
编译运行:
云服务器不需要bind IP地址,需要让服务器自己指定IP地址。
因为一个服务器中可能有多张网卡,如果指定了其中一个IP,那么服务器就只能接收该IP递交的数据报,会导致服务器处理的数据量变少。为了使服务器能够接收所有发送到这台机器上的数据,只需要指定端口号就可以了。
3、服务器运行
从套接字中获取数据报:
ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen);
recvfrom 函数的参数列表中, sockfd 是服务端绑定的套接字。 buf 是我们自己定义的用于接收数据的缓冲区。 len 是缓冲区长度。 flags 表示读取方式,默认以阻塞方式读取。 src_addr 是客户端的IP地址+端口号,目的是为了找到是谁发来的数据,这是一个输入输出型参数,在输入时,就应该加上输入接收缓冲区,输出时,包含客户端的IP和端口号。 addrlen 是实际输出时,结构体的大小。返回值是实际读取到的字节数。
编写服务端接收数据的代码:
void Start()
{
char buffer[1024];
while (1)
{
//接收
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int n = recvfrom(_sock, buffer, sizeof(buffer) - 1, 0, (struct sockaddr*)&peer, &len);
if (n > 0)
buffer[n] = '\0';
else continue;
//提取client信息
string clientip = inet_ntoa(peer.sin_addr);
uint16_t clientport = ntohs(peer.sin_port);
cout << clientip << " - " << clientport << " # " << buffer << endl;
}
}
发送数据函数接口:
ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen);
sendto 函数的参数列表基本与 recvfrom 函数一一对应。
sendto(_sock, buffer, strlen(buffer), 0, (struct sockaddr*)&peer, sizeof(peer));
在发送时,不需要手动把本地序列转换成网络序列,因为sendto函数会自动进行转换。
4、创建客户端
因为一台设备上可以运行很多客户端,即运行着很多进程,这些进程都是独立的,这就意味着每一个客户端的端口号一定不能是固定的,以防止新启动客户端的端口号已经被别人占用。
所以客户端绑定端口号,不能自己绑定,而是需要OS来分配,防止客户端出现启动冲突。
至于为什么client的端口号不能自己绑定,但是server的端口号就可以自己绑定呢?
这是因为server的端口号必须要众所周知,且不能随意改变。另外,同一家公司的端口号需要统一规范化,如果发生冲突,可以很快的内部解决。
在我们首次使用系统调用发送数据的时候,OS会在底层随机选择客户端端口号与IP,进行绑定操作,并构建发送的数据报文。
创建代码:
static void usage(string proc)
{
cout << "Usage: \n\t" << proc << " serverip serverport\n" << endl;
}
// udp_client IP 端口号 命令+命令选项一共三个
int main(int argc, char* argv[])
{
if (argc != 3)
{
usage(argv[0]);
exit(USAGE_ERR);
}
string serverip = argv[1];
uint16_t serverport = atoi(argv[2]);
int sock = socket(AF_INET, SOCK_DGRAM, 0);
if (sock < 0)
{
cerr << "create socket error" << endl;
exit(SOCKET_ERR);
}
//client不需要自己绑定,由OS自动进行绑定
//明确server是谁
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(serverport);
server.sin_addr.s_addr = inet_addr(serverip.c_str());
while (1)
{
//用户输入
string message;
cout << "please Enter# ";
cin >> message;
//发送数据
sendto(sock, message.c_str(), message.size(), 0, (struct sockaddr*)&server, sizeof(server));
//接收数据
char buffer[1024];
struct sockaddr_in temp;
socklen_t len = sizeof(temp);
int n = recvfrom(sock, buffer, sizeof(buffer) - 1, 0, (struct sockaddr*)&temp, &len);
if (n > 0)
{
buffer[n] = 0;
cout << "server echo# " << buffer << endl;
}
}
return 0;
}
5、数据传输
5.1、本地数据传输
127.0.0.1 :本地环回,表示当前主机,通常用来进行本地通信或测试。
运行程序观察结果:
client发送的数据,server可以接收到。
5.2、网络数据传输
6、业务处理
我们编写的网络服务器刚刚解决的是网络IO的问题,要进行业务处理才是我们真正的目的。
现在实现一个小写转大写的服务,只需要在server发送数据之前对数据进行相应处理就可以了:
//server.cc
//上层的业务处理,不需要关心网络发送,只负责信息处理即可
string transactionString(string request)
{
string result;
char c;
for(auto& r : request)
{
if(islower(r))
c = toupper(r);
result.push_back(c);
}
return result;
}
int main(int argc, char* argv[])
{
if(argc != 2)
{
usage(argv[0]);
exit(USAGE_ERR);
}
uint16_t port = atoi(argv[1]);
unique_ptr<UdpServer> usvr(new UdpServer(transactionString,port));
usvr->InitServer();
usvr->Start();
return 0;
}
//server.hpp
//...
using func_t = function<string(string)>;
class UdpServer
{
public:
UdpServer(func_t cb, uint16_t port = default_port)
:_service(cb)
, _port(port)
{
cout << "server addr: " << _port << endl;
}
//....
void Start()
{
char buffer[1024];
while (1)
{
//接收
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int n = recvfrom(_sock, buffer, sizeof(buffer) - 1, 0, (struct sockaddr*)&peer, &len);
if (n > 0)
buffer[n] = '\0';
else continue;
//提取client信息
string clientip = inet_ntoa(peer.sin_addr);
uint16_t clientport = ntohs(peer.sin_port);
cout << clientip << " - " << clientport << " # " << buffer << endl;
//做业务处理
string response = _service(buffer);
//发送
sendto(_sock, response.c_str(), response.size(), 0, (struct sockaddr*)&peer, sizeof(peer));
}
}
~UdpServer()
{}
private:
int _sock;
uint16_t _port;
func_t _service;
//string _ip;
};
运行观察结果:
7、多线程业务处理
现在实现一个消息收发服务器,在服务器与客户端中分别创建两个线程,一个只用来收消息,另一个只用来发消息。
完整实现代码:
//Thread.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <functional>
#include <string>
#include <cstdlib>
using namespace std;
class Thread
{
public:
typedef enum
{
NEW = 0,
RUNNING,
EXITED
}ThreadStatus;
// typedef void (*func_t)(void*);
using func_t = function<void ()>;
public:
Thread(int num, func_t func)
:_tid(0)
,_status(NEW)
,_func(func)
{
char name[128];
snprintf(name, sizeof(name), "thread-%d", num);
_name = name;
}
int status()
{
return _status;
}
string threadname()
{
return _name;
}
pthread_t threadid()
{
return _tid;
}
//因为类的成员函数有this指针,占用了回调函数的void*类型的参数,所以要定义成static类型
static void* runHelper(void* args)
{
Thread* ts = (Thread*)args;
//_func(ts->_args );
(*ts)(); //使用仿函数的形式调用func函数
return nullptr;
}
void operator()()
{
_func();
}
void run()
{
int n = pthread_create(&_tid, nullptr, runHelper, this); //这里传参传的是 this 指针,为了static类型的回调函数可以访问类属性和其他成员函数
if(n != 0)
exit(1);
_status = RUNNING;
}
void join()
{
int n = pthread_join(_tid, nullptr);
if(n != 0)
{
cerr << "main thread join thread " << _name << " error" << endl;
return;
}
_status = EXITED;
}
~Thread()
{}
private:
pthread_t _tid;
string _name;
func_t _func; //线程未来要执行的回调
ThreadStatus _status;
};
//RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <mutex>
#include <semaphore.h>
using namespace std;
static const int N = 5;
template<class T>
class RingQueue
{
public:
RingQueue(int num = N)
:_ring(num)
,_cap(num)
,_c_step(0)
,_p_step(0)
{
sem_init(&_data_sem, 0, 0);
sem_init(&_space_sem, 0, num);
pthread_mutex_init(&_c_mtx, nullptr);
pthread_mutex_init(&_p_mtx, nullptr);
}
//生产
void push(const T& in)
{
P(_space_sem);
Lock(_p_mtx);
//一定拿到了对应的空间资源!不用再做判断
//确定拿到了哪一个资源
_ring[_p_step++] = in;
_p_step %= _cap;
Unlock(_p_mtx);
V(_data_sem);
}
//消费
void pop(T* out)
{
P(_data_sem);
Lock(_c_mtx);
*out = _ring[_c_step++];
_c_step %= _cap;
Unlock(_c_mtx);
V(_space_sem);
}
~RingQueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
pthread_mutex_destroy(&_c_mtx);
pthread_mutex_destroy(&_p_mtx);
}
private:
void P(sem_t& s)
{
sem_wait(&s);
}
void V(sem_t& s)
{
sem_post(&s);
}
void Lock(pthread_mutex_t& m)
{
pthread_mutex_lock(&m);
}
void Unlock(pthread_mutex_t& m)
{
pthread_mutex_unlock(&m);
}
private:
std::vector<T> _ring;
int _cap; //环形队列的容量
sem_t _data_sem; //只有消费者关心
sem_t _space_sem; //只有生产者关心
int _c_step; //消费位置
int _p_step; //生产位置
pthread_mutex_t _c_mtx; //消费者的锁
pthread_mutex_t _p_mtx; //生产者的锁
};
//LockGuard.hpp
#pragma once
#include <iostream>
#include <pthread.h>
using namespace std;
class Mutex //自己不维护锁,由外部传入
{
public:
Mutex(pthread_mutex_t* mutex)
:_pmutex(mutex)
{}
void lock()
{
pthread_mutex_lock(_pmutex);
}
void unlock()
{
pthread_mutex_unlock(_pmutex);
}
~Mutex()
{}
private:
pthread_mutex_t* _pmutex;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t* mutex)
:_mutex(mutex)
{
_mutex.lock();
}
~LockGuard()
{
_mutex.unlock();
}
private:
Mutex _mutex;
};
//error.hpp
#pragma once
enum
{
USAGE_ERR = 1,
SOCKET_ERR,
BIND_ERR
};
//udp_server.hpp
#pragma once
#include <iostream>
#include <strings.h>
#include <cerrno>
#include <cstring>
#include <cstdlib>
#include <functional>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <unordered_map>
#include "error.hpp"
#include "RingQueue.hpp"
#include "lockGuard.hpp"
#include "Thread.hpp"
using namespace std;
namespace ns_server
{
const static uint16_t default_port = 8080;
using func_t = function<string(string)>;
class UdpServer
{
public:
UdpServer(uint16_t port = default_port)
: _port(port)
{
cout << "server addr: " << _port << endl;
pthread_mutex_init(&_lock, nullptr);
p = new Thread(1, bind(&UdpServer::Recv, this));
c = new Thread(1, bind(&UdpServer::Broadcast, this));
}
void start()
{
// 1.创建socket接口,打开网络文件
_sock = socket(AF_INET, SOCK_DGRAM, 0);
if (_sock < 0)
{
cerr << "create socket error: " << strerror(errno) << endl;
exit(SOCKET_ERR);
}
cout << "create socket success: " << _sock << endl;
// 2.给服务器指明IP地址和端口号
struct sockaddr_in local; // 这个local是一个局部变量,被定义在用户空间的特定函数的栈帧上,不在内核中。
bzero(&local, sizeof(local)); // 清零
local.sin_family = AF_INET;
local.sin_port = htons(_port); // 把本地主机构建的port序列转成网络序列
// 1.字符串风格的IP地址,转换成4字节int类型
// 2.将主机序列转化称为网络序列
// local.sin_addr.s_addr = inet_addr(_ip.c_str());
// 3.云服务器,一般不要指明某一个确定的IP
local.sin_addr.s_addr = INADDR_ANY; // 让服务器在启动时,bind本主机上的任意IP
// int n = bind(_sock, (struct sockaddr*)&local, sizeof(local));
if (bind(_sock, (struct sockaddr *)&local, sizeof(local)) < 0) // 把local绑定到了内核中
{
cerr << "bind socket error: " << strerror(errno) << endl;
exit(BIND_ERR);
}
cout << "bind socket success: " << _sock << endl;
p->run();
c->run();
}
void addUser(const string &name, const struct sockaddr_in &peer)
{
LockGuard lockguard(&_lock);
auto iter = onlineuser.find(name);
if (iter != onlineuser.end())
return;
onlineuser.insert(pair<const string, const struct sockaddr_in>(name, peer));
}
void Recv()
{
char buffer[1024];
while (1)
{
// 接收
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int n = recvfrom(_sock, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&peer, &len);
if (n > 0)
buffer[n] = '\0';
else
continue;
// 提取client信息
string clientip = inet_ntoa(peer.sin_addr);
uint16_t clientport = ntohs(peer.sin_port);
cout << clientip << " - " << clientport << " # " << buffer << endl;
// 构建一个用户,并进行检查
string name = clientip;
name += "-";
name += to_string(clientport);
// 如果不存在就插入,存在就什么都不做。
addUser(name, peer);
string message = name + ">> " + buffer;
rq.push(message);
// //做业务处理
// string response = _service(buffer);
// //发送
// sendto(_sock, response.c_str(), response.size(), 0, (struct sockaddr*)&peer, sizeof(peer));
}
}
void Broadcast()
{
while (1)
{
string sendstring;
rq.pop(&sendstring);
vector<struct sockaddr_in> v;
{
LockGuard lockguard(&_lock);
for (auto user : onlineuser)
{
v.push_back(user.second);
}
}
for (auto &user : v)
{
// cout << "Broadcast message to " << user.first << " : " << sendstring << endl;
sendto(_sock, sendstring.c_str(), sendstring.size(), 0, (struct sockaddr *)&(user), sizeof(user));
}
}
}
~UdpServer()
{
pthread_mutex_destroy(&_lock);
c->join();
p->join();
delete c;
delete p;
}
private:
int _sock;
uint16_t _port;
// func_t _service;
unordered_map<string, struct sockaddr_in> onlineuser;
pthread_mutex_t _lock;
RingQueue<string> rq;
Thread *c;
Thread *p;
// string _ip;
};
}
//udp_server.cc
namespace ns_server
{
const static uint16_t default_port = 8080;
using func_t = function<string(string)>;
class UdpServer
{
public:
UdpServer(uint16_t port = default_port)
: _port(port)
{
cout << "server addr: " << _port << endl;
pthread_mutex_init(&_lock, nullptr);
p = new Thread(1, bind(&UdpServer::Recv, this));
c = new Thread(1, bind(&UdpServer::Broadcast, this));
}
void start()
{
// 1.创建socket接口,打开网络文件
_sock = socket(AF_INET, SOCK_DGRAM, 0);
if (_sock < 0)
{
cerr << "create socket error: " << strerror(errno) << endl;
exit(SOCKET_ERR);
}
cout << "create socket success: " << _sock << endl;
// 2.给服务器指明IP地址和端口号
struct sockaddr_in local; // 这个local是一个局部变量,被定义在用户空间的特定函数的栈帧上,不在内核中。
bzero(&local, sizeof(local)); // 清零
local.sin_family = AF_INET;
local.sin_port = htons(_port); // 把本地主机构建的port序列转成网络序列
// 1.字符串风格的IP地址,转换成4字节int类型
// 2.将主机序列转化称为网络序列
// local.sin_addr.s_addr = inet_addr(_ip.c_str());
// 3.云服务器,一般不要指明某一个确定的IP
local.sin_addr.s_addr = INADDR_ANY; // 让服务器在启动时,bind本主机上的任意IP
// int n = bind(_sock, (struct sockaddr*)&local, sizeof(local));
if (bind(_sock, (struct sockaddr *)&local, sizeof(local)) < 0) // 把local绑定到了内核中
{
cerr << "bind socket error: " << strerror(errno) << endl;
exit(BIND_ERR);
}
cout << "bind socket success: " << _sock << endl;
p->run();
c->run();
}
void addUser(const string &name, const struct sockaddr_in &peer)
{
LockGuard lockguard(&_lock);
auto iter = onlineuser.find(name);
if (iter != onlineuser.end())
return;
onlineuser.insert(pair<const string, const struct sockaddr_in>(name, peer));
}
void Recv()
{
char buffer[1024];
while (1)
{
// 接收
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int n = recvfrom(_sock, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&peer, &len);
if (n > 0)
buffer[n] = '\0';
else
continue;
// 提取client信息
string clientip = inet_ntoa(peer.sin_addr);
uint16_t clientport = ntohs(peer.sin_port);
cout << clientip << " - " << clientport << " # " << buffer << endl;
// 构建一个用户,并进行检查
string name = clientip;
name += "-";
name += to_string(clientport);
// 如果不存在就插入,存在就什么都不做。
addUser(name, peer);
string message = name + ">> " + buffer;
rq.push(message);
// //做业务处理
// string response = _service(buffer);
// //发送
// sendto(_sock, response.c_str(), response.size(), 0, (struct sockaddr*)&peer, sizeof(peer));
}
}
void Broadcast()
{
while (1)
{
string sendstring;
rq.pop(&sendstring);
vector<struct sockaddr_in> v;
{
LockGuard lockguard(&_lock);
for (auto user : onlineuser)
{
v.push_back(user.second);
}
}
for (auto &user : v)
{
// cout << "Broadcast message to " << user.first << " : " << sendstring << endl;
sendto(_sock, sendstring.c_str(), sendstring.size(), 0, (struct sockaddr *)&(user), sizeof(user));
}
}
}
~UdpServer()
{
pthread_mutex_destroy(&_lock);
c->join();
p->join();
delete c;
delete p;
}
private:
int _sock;
uint16_t _port;
// func_t _service;
unordered_map<string, struct sockaddr_in> onlineuser;
pthread_mutex_t _lock;
RingQueue<string> rq;
Thread *c;
Thread *p;
// string _ip;
};
}
//udp_client.cc
#include "udp_client.hpp"
#include "error.hpp"
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <cstring>
#include <pthread.h>
static void usage(string proc)
{
cout << "Usage: \n\t" << proc << " serverip serverport\n" << endl;
}
void* recver(void* args)
{
int sock = *(static_cast<int*>(args));
while(1)
{
//接收数据
char buffer[1024];
struct sockaddr_in temp;
socklen_t len = sizeof(temp);
int n = recvfrom(sock, buffer, sizeof(buffer) - 1, 0, (struct sockaddr*)&temp, &len);
if(n > 0)
{
buffer[n] = 0;
cout << "server echo# " << buffer << endl;
}
}
}
int main(int argc, char* argv[])
{
if(argc != 3)
{
usage(argv[0]);
exit(USAGE_ERR);
}
string serverip = argv[1];
uint16_t serverport = atoi(argv[2]);
int sock = socket(AF_INET, SOCK_DGRAM, 0);
if(sock < 0)
{
cerr << "create socket error" << endl;
exit(SOCKET_ERR);
}
//client不需要自己绑定,由OS自动进行绑定
//明确server是谁
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(serverport);
server.sin_addr.s_addr = inet_addr(serverip.c_str());
pthread_t tid;
pthread_create(&tid, nullptr, recver, &sock);
while(1)
{
//用户输入
string message;
cout << "please Enter# ";
cin >> message;
//发送数据
sendto(sock, message.c_str(), message.size(), 0, (struct sockaddr*)&server, sizeof(server));
}
return 0;
}
五、简单的TCP网络程序
1、创建套接字与绑定
这部分代码与UDP相同:
void initServer()
{
//1.创建socket文件
_sock = socket(AF_INET, SOCK_STREAM, 0);
if (_sock < 0)
{
cerr << " create socket error " << endl;
exit(SOCKET_ERR);
}
//2.绑定
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(_port);
local.sin_addr.s_addr = INADDR_ANY;
if (bind(_sock, (struct sockaddr*)&local, sizeof(local)) < 0)
{
cerr << "bind socket error" << endl;
exit(BIND_ERR);
}
//3.监听
}
但是与UDP不同的是,TCP需要增加一段监听代码,用于和用户端进行连接。
2、监听
int listen(int sockfd, int backlog);
listen 函数的参数列表中, sockfd 是刚刚创建的套接字。 backlog 是。函数调用成功返回 0 ,失败返回 -1,并且错误码被设置。
static const int backlog = 32;
void initServer()
{
//1.创建socket文件
_sock = socket(AF_INET, SOCK_STREAM, 0);
if (_sock < 0)
{
cerr << " create socket error " << endl;
exit(SOCKET_ERR);
}
//2.绑定
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(_port);
local.sin_addr.s_addr = INADDR_ANY;
if (bind(_sock, (struct sockaddr*)&local, sizeof(local)) < 0)
{
cerr << "bind socket error" << endl;
exit(BIND_ERR);
}
//3.监听
if (listen(_sock, backlog) < 0)
{
cerr << "listen socket error" << endl;
exit(LISTEN_ERR);
}
}
3、获取连接
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
accept 函数的参数列表中, sockfd 是套接字。 addr 用来记录连接该服务器的客户端。 addrlen 是 addr 的长度。
accept 函数的返回值:调用成功返回给客户端提供服务的套接字。调用失败返回 -1,并且错误码被设置。
4、服务器运行
void start()
{
_quit = false;
while (!_quit)
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
// 4.获取连接,accept
int sock = accept(_listensock, (struct sockaddr*)&client, &len);
if (sock < 0)
{
cerr << "accept error" << endl;
continue;
}
//提取client信息
string clientip = inet_ntoa(client.sin_addr);
uint16_t clientport = ntohs(client.sin_port);
//5.获取新连接成功,开始进行业务处理
cout << "获取新连接成功: " << sock << " from " << _listensock << " , "
<< clientip << "-" << clientport << endl;
service(sock, clientip, clientport);
}
}
void service(int sock, const string& clientip, const uint16_t& clientport)
{
string who = clientip + "-" + to_string(clientport);
char buffer[1024];
while (1)
{
ssize_t s = read(sock, buffer, sizeof(buffer) - 1);
if (s > 0)
{
buffer[s] = 0;
string res = _func(buffer); //进行回调
cout << who << ">>" << res << endl;
write(sock, res.c_str(), res.size());
}
else if (s == 0)
{
//对方将连接关闭了
close(sock);
cout << who << " quit" << endl;
break;
}
else
{
close(sock);
cout << "read error " << strerror(errno) << endl;
break;
}
}
}
这种写法有一个问题,那就是程序获取连接后,会直接执行service函数,而没办法继续获取连接。导致一个服务器只能连接到一个客户端。
解决办法是使用多进程来运行程序,但是创建子进程之后,因为父进程又要等待子进程,会造成阻塞等待。就算设置参数WNOHANG,来进行非阻塞等待,也会因为最后没有要连接的客户端,而阻塞在accept函数处,无法执行wait函数。
有多种方法解决这个问题,如下是其中两种:
4.1、创建子进程的子进程
void start()
{
_quit = false;
while (!_quit)
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
// 4.获取连接,accept
int sock = accept(_listensock, (struct sockaddr*)&client, &len);
if (sock < 0)
{
cerr << "accept error" << endl;
continue;
}
//提取client信息
string clientip = inet_ntoa(client.sin_addr);
uint16_t clientport = ntohs(client.sin_port);
//5.获取新连接成功,开始进行业务处理
cout << "获取新连接成功: " << sock << " from " << _listensock << " , "
<< clientip << "-" << clientport << endl;
// v2:多进程版本
pid_t id = fork();
if (id < 0)
{
close(sock);
continue;
}
else if (id == 0) //子进程。子进程继承父进程的fd table
{
//建议关闭掉不需要的fd
close(_listensock);
if (fork() > 0) break;
service(sock, clientip, clientport);
exit(0);
}
//父进程,一定要关闭不需要的fd,否则父进程的描述符越来越多,造成文件描述符泄漏,导致浪费
close(sock);
pid_t ret = waitpid(id, nullptr, 0);
if (ret == id)
{
cout << "wait child " << id << "success" << endl;
}
}
}
创建了一个孙子进程,子进程自己退出,并被父进程回收。孙子进程被OS领养,由OS负责,自然不用我们自己回收了。
4.2、设置信号回收
void start()
{
signal(SIGCHLD, SIG_IGN);
_quit = false;
while (!_quit)
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
// 4.获取连接,accept
int sock = accept(_listensock, (struct sockaddr*)&client, &len);
if (sock < 0)
{
cerr << "accept error" << endl;
continue;
}
//提取client信息
string clientip = inet_ntoa(client.sin_addr);
uint16_t clientport = ntohs(client.sin_port);
//5.获取新连接成功,开始进行业务处理
cout << "获取新连接成功: " << sock << " from " << _listensock << " , "
<< clientip << "-" << clientport << endl;
// v2:多进程版本
pid_t id = fork();
if (id < 0)
{
close(sock);
continue;
}
else if (id == 0) //子进程。子进程继承父进程的fd table
{
//建议关闭掉不需要的fd
close(_listensock);
// if(fork() > 0) break;
service(sock, clientip, clientport);
exit(0);
}
//父进程,一定要关闭不需要的fd,否则父进程的描述符越来越多,造成文件描述符泄漏,导致浪费
close(sock);
}
}
通过信号的方式,使子进程直接被OS回收。
以上是使用多进程实现一个服务器对应多个客户端的办法。但是多进程终究成本较高,因此,我们还可以实现多线程对应的方法:
4.3、多线程服务器
需要注意的是,在实现多线程时,由于回调函数要设置成static类型,所以内部没有this指针。此处是通过定义一个类的方式封装this指针。
//封装this指针
//class TcpServer;
//class ThreadData
//{
//public:
// ThreadData(int fd, const string& ip, const uint16_t& port, TcpServer* ts)
// :sock(fd)
// , clientip{ ip }
// , clientport(port)
// , current(ts)
// {}
//public:
// int sock;
// string clientip;
// uint16_t clientport;
// TcpServer* current;
//};
static void* threadRoutine(void* args)
{
pthread_detach(pthread_self());
ThreadData* td = static_cast<ThreadData*>(args);
td->current->service(td->sock, td->clientip, td->clientport);
delete td;
return nullptr;
}
void start()
{
signal(SIGCHLD, SIG_IGN);
_quit = false;
while (!_quit)
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
// 4.获取连接,accept
int sock = accept(_listensock, (struct sockaddr*)&client, &len);
if (sock < 0)
{
cerr << "accept error" << endl;
continue;
}
//提取client信息
string clientip = inet_ntoa(client.sin_addr);
uint16_t clientport = ntohs(client.sin_port);
//5.获取新连接成功,开始进行业务处理
cout << "获取新连接成功: " << sock << " from " << _listensock << " , "
<< clientip << "-" << clientport << endl;
// v3:多线程版本
pthread_t tid;
ThreadData* td = new ThreadData(sock, clientip, clientport, this);
pthread_create(&tid, nullptr, threadRoutine, td);
}
}
这里实现的多线程服务器,是在客户端访问时,才临时创建线程的。如果以后客户端非常多的话,这样临时创建线程的方式效率就会非常低了。
所以我们再来实现一个线程池服务器的版本。
4.4、线程池服务器
//ThreadPool.hpp
#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <string>
#include <unistd.h>
#include <pthread.h>
#include "Thread.hpp"
#include "lockGuard.hpp"
using namespace std;
const static int N = 5;
template <class T>
class ThreadPool
{
private:
ThreadPool(int num = N)
: _num(num)
{
pthread_mutex_init(&_lock, nullptr);
pthread_cond_init(&_cond, nullptr);
}
ThreadPool(const ThreadPool<T>& tp) = delete;
void operator=(const ThreadPool<T>& tp) = delete;
public:
static ThreadPool<T>* getinstance()
{
if (instance == nullptr)
{
LockGuard lockguard(&instance_lock);
if (instance == nullptr)
{
cout << "线程池单例形成" << endl;
instance = new ThreadPool<T>();
instance->init();
instance->start();
}
}
return instance;
}
pthread_mutex_t* getlock()
{
return &_lock;
}
void threadWait()
{
pthread_cond_wait(&_cond, &_lock);
}
void threadWakeup()
{
pthread_cond_signal(&_cond);
}
bool isEmpty()
{
return _tasks.empty();
}
T popTask()
{
T t = _tasks.front();
_tasks.pop();
return t;
}
static void threadRoutine(void* args)
{
ThreadPool<T>* tp = static_cast<ThreadPool<T> *>(args);
while (1)
{
// 1、检测有没有任务
// 2、有:处理
// 3、无:等待
// 必须要加锁
T t;
{
LockGuard lockguard(tp->getlock());
while (tp->isEmpty())
{
// 等待,cond
tp->threadWait();
}
t = tp->popTask(); // 从公共区域拿到私有区域
}
// t.run(); //处理任务不应该在临界区中进行
t();
}
}
void init()
{
for (int i = 0; i < _num; i++)
{
_threads.push_back(Thread(i, threadRoutine, this));
}
}
void start()
{
for (auto& t : _threads)
{
t.run();
}
}
void check()
{
for (auto& t : _threads)
{
cout << t.threadname() << " runing... " << endl;
}
}
void pushTask(const T& t)
{
LockGuard lockguard(&_lock);
_tasks.push(t);
threadWakeup();
}
~ThreadPool()
{
for (auto& t : _threads)
{
t.join();
}
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
private:
vector<Thread> _threads;
int _num;
queue<T> _tasks; // 使用stl的自动扩容特性
pthread_mutex_t _lock;
pthread_cond_t _cond;
static ThreadPool<T>* instance;
static pthread_mutex_t instance_lock;
};
template <class T>
ThreadPool<T>* ThreadPool<T>::instance = nullptr;
template<class T>
pthread_mutex_t ThreadPool<T>::instance_lock = PTHREAD_MUTEX_INITIALIZER;
//Thread.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include <functional>
#include <string>
#include <cstdlib>
using namespace std;
class Thread
{
public:
typedef enum
{
NEW = 0,
RUNNING,
EXITED
}ThreadStatus;
// typedef void (*func_t)(void*);
using func_t = function<void()>;
public:
Thread(int num, func_t func)
:_tid(0)
, _status(NEW)
, _func(func)
{
char name[128];
snprintf(name, sizeof(name), "thread-%d", num);
_name = name;
}
int status()
{
return _status;
}
string threadname()
{
return _name;
}
pthread_t threadid()
{
return _tid;
}
//因为类的成员函数有this指针,占用了回调函数的void*类型的参数,所以要定义成static类型
static void* runHelper(void* args)
{
Thread* ts = (Thread*)args;
//_func(ts->_args );
(*ts)(); //使用仿函数的形式调用func函数
return nullptr;
}
void operator()()
{
_func();
}
void run()
{
int n = pthread_create(&_tid, nullptr, runHelper, this); //这里传参传的是 this 指针,为了static类型的回调函数可以访问类属性和其他成员函数
if (n != 0)
exit(1);
_status = RUNNING;
}
void join()
{
int n = pthread_join(_tid, nullptr);
if (n != 0)
{
cerr << "main thread join thread " << _name << " error" << endl;
return;
}
_status = EXITED;
}
~Thread()
{}
private:
pthread_t _tid;
string _name;
func_t _func; //线程未来要执行的回调
ThreadStatus _status;
};
//task.hpp
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
using namespace std;
using cb_t = function<void(int, const string&, const uint16_t&)>;
class Task
{
public:
Task()
{
}
Task(int sock, const string& ip, const uint16_t& port, cb_t cb)
:_sock(sock)
, _ip(ip)
, _port(port)
, _cb(cb)
{}
void operator()()
{
_cb(_sock, _ip, _port);
}
~Task()
{
}
private:
int _sock;
string _ip;
uint16_t _port;
cb_t _cb;
};
//tcp_server.hpp部分代码
void start()
{
signal(SIGCHLD, SIG_IGN);
_quit = false;
while (!_quit)
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
// 4.获取连接,accept
int sock = accept(_listensock, (struct sockaddr*)&client, &len);
if (sock < 0)
{
cerr << "accept error" << endl;
continue;
}
//提取client信息
string clientip = inet_ntoa(client.sin_addr);
uint16_t clientport = ntohs(client.sin_port);
//5.获取新连接成功,开始进行业务处理
cout << "获取新连接成功: " << sock << " from " << _listensock << " , "
<< clientip << "-" << clientport << endl;
//v4:线程池
//使用线程池的时候,一定是有限的线程个数,一定要处理短任务
//这里让线程执行的service函数是死循环,实际上这样是不合理的。
Task t(sock, clientip, clientport, std::bind(&TcpServer::service, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
ThreadPool<Task>::getinstance()->pushTask(t);
}
}
void service(int sock, const string& clientip, const uint16_t& clientport)
{
string who = clientip + "-" + to_string(clientport);
char buffer[1024];
while (1)
{
ssize_t s = read(sock, buffer, sizeof(buffer) - 1);
if (s > 0)
{
buffer[s] = 0;
string res = _func(buffer); //进行回调
cout << who << ">>" << res << endl;
write(sock, res.c_str(), res.size());
}
else if (s == 0)
{
//对方将连接关闭了
close(sock);
cout << who << " quit" << endl;
break;
}
else
{
close(sock);
cout << "read error " << strerror(errno) << endl;
break;
}
}
}
5、创建客户端
客户端需要对服务器获取的连接进行申请:
int connect(int sockfd, const struct sockaddr *addr,
socklen_t addrlen);
客户端首次向服务端发起连接请求时,会自动进行绑定端口。函数成功返回 0,失败返回 -1,并且错误码被设置。
static void userage(string proc)
{
cout << "Usage:\n\t" << proc << " port\n" << endl;
}
//./tcp_client serverip serverport
int main(int argc, char* argv[])
{
if(argc != 3)
{
userage(argv[0]);
exit(USAGE_ERR);
}
string serverip = argv[1];
uint16_t serverport = atoi(argv[2]);
//1.创建套接字
int sock = socket(AF_INET, SOCK_STREAM, 0);
if(sock < 0)
{
cerr << "socket error : " << strerror(errno) << endl;
exit(SOCKET_ERR);
}
//client 不需要自己绑定端口号,由OS自动生成
//client 不需要获取连接(accept),获取连接是server要做的事
//2.连接
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(serverport);
server.sin_addr.s_addr = inet_addr(serverip.c_str());
int cnt = 5;
while(connect(sock, (struct sockaddr*)&server, sizeof(server)) != 0)
{
cout << "正在重新连接,剩余重连次数: " << cnt-- << endl;
sleep(1);
if(cnt <= 0)
break;
}
if(cnt <= 0)
{
cerr << "连接失败" << endl;
exit(CONNECT_ERR);
}
//3.连接成功
char buffer[1024];
while(1)
{
string line;
cout << "Enter>> " << endl;
getline(cin, line);
write(sock, line.c_str(), line.size());
ssize_t s = read(sock, buffer, sizeof(buffer) - 1);
if(s > 0)
{
buffer[s] = 0;
cout << "server echo >> " << buffer << endl;
}
else if(s == 0)
{
cerr << " server quit" << endl;
}
else
{
cerr << "read error: " << strerror(errno) << endl;
break;
}
}
close(sock);
return 0;
}
运行观察结果:
六、 TCP协议通讯流程
下图是基于TCP协议的客户端/服务器程序的一般流程:
服务器初始化:
- 调用socket,创建文件描述符。
- 调用bind,将当前的文件描述符和ip/port绑定在一起。如果这个端口已经被其他进程占用了,就会bind失败。
- 调用listen,声明当前这个文件描述符作为一个服务器的文件描述符,为后面的accept做好准备。
- 调用accecpt,并阻塞,等待客户端连接过来。
建立连接的过程:
- 调用socket, 创建文件描述符;
- 调用connect, 向服务器发起连接请求;
- connect会发出SYN段并阻塞等待服务器应答; (第一次)
- 服务器收到客户端的SYN, 会应答一个SYN-ACK段表示"同意建立连接"; (第二次)
- 客户端收到SYN-ACK后会从connect()返回, 同时应答一个ACK段; (第三次)
这个建立连接的过程, 通常称为 三次握手。
数据传输的过程:
- 建立连接后,TCP协议提供全双工的通信服务。所谓全双工的意思是,在同一条连接中,同一时刻,通信双方可以同时写数据。相对的概念叫做半双工,同一条连接在同一时刻,只能由一方来写数据。
- 服务器从accept()返回后立刻调 用read(),读socket就像读管道一样,如果没有数据到达就阻塞等待。
- 这时客户端调用write()发送请求给服务器,服务器收到后从read()返回,对客户端的请求进行处理,在此期间客户端调用read()阻塞等待服务器的应答。
- 服务器调用write()将处理结果发回给客户端,再次调用read()阻塞等待下一条请求。
- 客户端收到后从read()返回,发送下一条请求,如此循环下去。
断开连接的过程:
- 如果客户端没有更多的请求了,就调用close()关闭连接,客户端会向服务器发送FIN段(第一次)。
- 此时服务器收到FIN后,会回应一个ACK,同时read会返回0(第二次)。
- read返回之后,服务器就知道客户端关闭了连接,也调用close关闭连接,这个时候服务器会向客户端发送一个FIN(第三次)。
- 客户端收到FIN,再返回一个ACK给服务器(第四次)。
这个断开连接的过程, 通常称为 四次挥手。
在学习socket API时要注意应用程序和TCP协议层是如何交互的:
- 应用程序调用某个socket函数时TCP协议层完成什么动作,比如调用connect()会发出SYN段。
- 应用程序如何知道TCP协议层的状态变化,比如从某个阻塞的socket函数返回就表明TCP协议收到了某些段,再比如read()返回0就表明收到了FIN段。