🌈个人主页:秦jh__https://blog.csdn.net/qinjh_?spm=1010.2135.3001.5343
🔥 系列专栏:https://blog.csdn.net/qinjh_/category_12891150.html
目录
Socket 编程预备
理解源 IP 地址和目的 IP 地址
认识端口号
端口号范围划分
理解 "端口号" 和 "进程 ID"
理解源端口号和目的端口号
理解 socket
传输层的典型代表
TCP 协议
UDP 协议
网络字节序
socket 编程接口
sockaddr 结构
Udp网络编程
echo server-版本1
InetAddr.hpp
LockGuard.hpp
Log.hpp
nocopy.hpp
UdpClientMain.cc
UdpServer.hpp
UdpServerMain.cc
DictServer-版本2
Dict.hpp
dict.txt
UdpServer.hpp
UdpServerMain.cc
简单聊天室-版本3
Route.hpp
Thread.hpp
ThreadPool.hpp
UdpClientMain.cc
UdpServer.hpp
UdpServerMain.cc
地址转换函数
前言
💬 hello! 各位铁子们大家好哇。
今日更新了Linux网络编程的内容
🎉 欢迎大家关注🔍点赞👍收藏⭐️留言📝
Socket 编程预备
理解源 IP 地址和目的 IP 地址
- IP 在网络中,用来标识主机的唯一性
问题:数据传输到主机是目的吗?不是的。因为数据是给人用的。比如:聊天是人在聊天,下载是人在下载,浏览网页是人在浏览?但是人是怎么看到聊天信息的呢?怎么执行下载任务呢?怎么浏览网页信息呢?通过启动的 qq,迅雷,浏览器。 而启动的 qq,迅雷,浏览器都是进程。换句话说,进程是人在系统中的代表,只要把数据给进程,人就相当于就拿到了数据。 所以:数据传输到主机不是目的,而是手段。到达主机内部,在交给主机内的进程,才是目的。 但是系统中,同时会存在非常多的进程,当数据到达目标主机之后,怎么转发给目标进程?
认识端口号
端口号(port)是传输层协议的内容.
- 端口号是一个 2 字节 16 位的整数;
- 端口号用来标识一个进程, 告诉操作系统, 当前的这个数据要交给哪一个进程来处理;
- IP 地址 + 端口号能够标识网络上的某一台主机的某一个进程;
- 一个端口号只能被一个进程占用
网络通信的本质就是进程间通信
端口号范围划分
- 0 - 1023: 知名端口号, HTTP, FTP, SSH 等这些广为使用的应用层协议, 他们的端口号都是固定的.
- 1024 - 65535: 操作系统动态分配的端口号. 客户端程序的端口号, 就是由操作系统从这个范围分配的
理解 "端口号" 和 "进程 ID"
系统编程中, pid 表示唯一一个进程; 此处我们的端口号也是唯一表示一个进程. 那么这两者之间是怎样的关系?
进程 ID 属于系统概念,技术上也具有唯一性,确实可以用来标识唯一的一个进程,但是这样做,会让系统进程管理和网络强耦合,实际设计的时候,并没有选择这样做。
另外, 一个进程可以绑定多个端口号; 但是一个端口号不能被多个进程绑定;
理解源端口号和目的端口号
传输层协议(TCP 和 UDP)的数据段中有两个端口号, 分别叫做源端口号和目的端口号. 就是在描述 "数据是谁发的, 要发给谁"
理解 socket
- 综上,IP 地址用来标识互联网中唯一的一台主机,port 用来标识该主机上唯一的一个网络进程
- IP+Port 就能表示互联网中唯一的一个进程
- 所以,通信的时候,本质是两个互联网进程代表人来进行通信,{srcIp,srcPort,dstIp,dstPort}这样的 4 元组就能标识互联网中唯二的两个进程
- 所以,网络通信的本质,也是进程间通信
- 我们把 ip+port 叫做套接字 socket
传输层的典型代表
传输层是属于内核的,那么我们要通过网络协议栈进行通信,必定调用的是传输层提供的系统调用,来进行的网络通信。
TCP 协议
对 TCP(Transmission Control Protocol 传输控制协议)的一个直观的认识:
- 传输层协议
- 有连接
- 可靠传输
- 面向字节流
UDP 协议
对 UDP(User Datagram Protocol 用户数据报协议)的一个直观的认识:
- 传输层协议
- 无连接
- 不可靠传输
- 面向数据报
网络字节序
内存中的多字节数据相对于内存地址有大端和小端之分, 磁盘文件中的多字节数据相对于文件中的偏移地址也有大端小端之分, 网络数据流同样有大端小端之分.
- 发送主机通常将发送缓冲区中的数据按内存地址从低到高的顺序发出;
- 接收主机把从网络上接到的字节依次保存在接收缓冲区中,也是按内存地址从低到高的顺序保存;
- 因此,网络数据流的地址应这样规定:先发出的数据是低地址,后发出的数据是高地址.
- TCP/IP 协议规定,网络数据流应采用大端字节序,即低地址高字节.
- 不管这台主机是大端机还是小端机, 都会按照这个 TCP/IP 规定的网络字节序来发送/接收数据;
- 如果当前发送主机是小端, 就需要先将数据转成大端; 否则就忽略, 直接发送即可
为使网络程序具有可移植性,使同样的 C 代码在大端和小端计算机上编译后都能正常运行,可以调用以下库函数做网络字节序和主机字节序的转换。
- 这些函数名很好记,h 表示 host,n 表示 network,l 表示 32 位长整数,s 表示16 位短整数。
- 例如 htonl 表示将 32 位的长整数从主机字节序转换为网络字节序,例如将IP地址转换后准备发送。
- 如果主机是小端字节序,这些函数将参数做相应的大小端转换然后返回;
- 如果主机是大端字节序,这些函数不做转换,将参数原封不动地返回。
socket 编程接口
// 创建 socket 文件描述符 (TCP/UDP, 客户端 + 服务器)
int socket(int domain, int type, int protocol);
// 绑定端口号 (TCP/UDP, 服务器)
int bind(int socket, const struct sockaddr *address, socklen_t address_len);
// 开始监听 socket (TCP, 服务器)
int listen(int socket, int backlog);
// 接收请求 (TCP, 服务器)
int accept(int socket, struct sockaddr* address, socklen_t* address_len);
// 建立连接 (TCP, 客户端)
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
sockaddr 结构
socket API 是一层抽象的网络编程接口,适用于各种底层网络协议,如IPv4、IPv6,以及 UNIX Domain Socket. 然而, 各种网络协议的地址格式并不相同.
- IPv4 和 IPv6 的地址格式定义在 netinet/in.h 中,IPv4 地址用sockaddr_in 结构体表示,包括 16 位地址类型, 16 位端口号和 32 位 IP 地址.
- IPv4、IPv6 地址类型分别定义为常数 AF_INET、AF_INET6. 这样,只要取得某种 sockaddr 结构体的首地址,不需要知道具体是哪种类型的 sockaddr 结构体,就可以根据地址类型字段确定结构体中的内容.
- socket API 可以都用 struct sockaddr *类型表示, 在使用的时候需要强制转化成sockaddr_in; 这样的好处是程序的通用性, 可以接收 IPv4, IPv6, 以及UNIX DomainSocket 各种类型的 sockaddr 结构体指针做为参数;
sockaddr_in用于网络通信,sockaddr_un用于本地通信。
sockaddr结构体根据传入的指针指向的对象是AF_INET还是AF_UNIX,会进行网络通信或者本地通信。这样就可以使用同一套socket API来设计更通用的可以兼顾网络通信和本地通信的一套接口。
Udp网络编程
参数1是域,可以填入上面红框中的选项,AF_UNIX表示这个套接字创建好后在本地网络进行通信。如果填AF_INET表示使用网络协议来实现网络通信。
参数2是套接字类型,SOCK_DGRAM表示用户数据报套接字(无连接,不可靠),也就是udp套接字。
参数3表示协议编号,一般设为0即可。
返回值:成功则返回新的文件描述符,失败则返回-1。
参数1是套接字,参数2的结构体是套接字信息。bind的作用就是把套接字信息跟套接字关联起来。
上面的这个函数可以把字符串风格的ip地址转成4字节,并且转成网络序列的ip。
该函数的作用是收消息。参数1是文件描述符;参数2是缓冲区,读到的数据就放在这里;参数3是缓冲区大小;参数4flags表示阻塞标记位,这里设为0,表示阻塞读取;参数5是输入输出型参数,他把数据放到缓冲区中,并且我们也能由此知道发送方是谁;参数6就是参数5的大小;返回值是实际收到多少个字节。
sendto的作用是发信息。通过sockfd把指定的字符串buf发送给dest_addr。
云服务器上,服务端不能直接(也强烈不建议)bind自己的公网ip。因为云服务器的公网ip是虚拟出来的,云服务器上根本没有对应的公网ip。
云服务器上的服务端ip一般bind为0。这样服务端就bind了任意IP。
如果服务器上有两个ip地址ip1和ip2,上层bind的端口号为8888。如果服务器bind了ip1和8888,未来收到各种发给8888的报文,此时只会收到ip1和8888这组套接字上的信息。如果服务器端bind的ip为0,不管收到哪些报文,只要它发送的目标端口是8888的,全都能收到。
这个接口能直接把4字节地址转字符串。
echo server-版本1
简单的回显服务器和客户端代码
InetAddr.hpp
#pragma once
#include<iostream>
#include<string>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
class InetAddr
{
private:
void ToHost(const struct sockaddr_in &addr) //主机转本地地址
{
_port=ntohs(addr.sin_port);
_ip=inet_ntoa(addr.sin_addr);
}
public:
InetAddr(const struct sockaddr_in &addr):_addr(addr)
{
ToHost(addr);
}
std::string Ip()
{
return _ip;
}
uint16_t Port()
{
return _port;
}
~InetAddr()
{}
private:
std::string _ip;
uint16_t _port;
struct sockaddr_in _addr;
};
LockGuard.hpp
#pragma once
#include <pthread.h>
class LockGuard
{
public:
LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
{
pthread_mutex_lock(_mutex);
}
~LockGuard()
{
pthread_mutex_unlock(_mutex);
}
private:
pthread_mutex_t *_mutex;
};
Log.hpp
#pragma once
#include <iostream>
#include <sys/types.h>
#include <unistd.h>
#include <ctime>
#include <cstdarg>
#include <fstream>
#include <cstring>
#include <pthread.h>
#include "LockGuard.hpp"
namespace log_ns
{
enum
{
DEBUG = 1,
INFO,
WARNING,
ERROR,
FATAL
};
std::string LevelToString(int level)
{
switch (level)
{
case DEBUG:
return "DEBUG";
case INFO:
return "INFO";
case WARNING:
return "WARNING";
case ERROR:
return "ERROR";
case FATAL:
return "FATAL";
default:
return "UNKNOWN";
}
}
std::string GetCurrTime()
{
time_t now = time(nullptr);
struct tm *curr_time = localtime(&now);
char buffer[128];
snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",
curr_time->tm_year + 1900,
curr_time->tm_mon + 1,
curr_time->tm_mday,
curr_time->tm_hour,
curr_time->tm_min,
curr_time->tm_sec);
return buffer;
}
class logmessage
{
public:
std::string _level;
pid_t _id;
std::string _filename;
int _filenumber;
std::string _curr_time;
std::string _message_info;
};
#define SCREEN_TYPE 1
#define FILE_TYPE 2
const std::string glogfile = "./log.txt";
pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;
// log.logMessage("", 12, INFO, "this is a %d message ,%f, %s hellwrodl", x, , , );
class Log
{
public:
Log(const std::string &logfile = glogfile) : _logfile(logfile), _type(SCREEN_TYPE)
{
}
void Enable(int type)
{
_type = type;
}
void FlushLogToScreen(const logmessage &lg)
{
printf("[%s][%d][%s][%d][%s] %s",
lg._level.c_str(),
lg._id,
lg._filename.c_str(),
lg._filenumber,
lg._curr_time.c_str(),
lg._message_info.c_str());
}
void FlushLogToFile(const logmessage &lg)
{
std::ofstream out(_logfile, std::ios::app);
if (!out.is_open())
return;
char logtxt[2048];
snprintf(logtxt, sizeof(logtxt), "[%s][%d][%s][%d][%s] %s",
lg._level.c_str(),
lg._id,
lg._filename.c_str(),
lg._filenumber,
lg._curr_time.c_str(),
lg._message_info.c_str());
out.write(logtxt, strlen(logtxt));
out.close();
}
void FlushLog(const logmessage &lg)
{
// 加过滤逻辑 --- TODO
LockGuard lockguard(&glock);
switch (_type)
{
case SCREEN_TYPE:
FlushLogToScreen(lg);
break;
case FILE_TYPE:
FlushLogToFile(lg);
break;
}
}
void logMessage(std::string filename, int filenumber, int level, const char *format, ...)
{
logmessage lg;
lg._level = LevelToString(level);
lg._id = getpid();
lg._filename = filename;
lg._filenumber = filenumber;
lg._curr_time = GetCurrTime();
va_list ap;
va_start(ap, format);
char log_info[1024];
vsnprintf(log_info, sizeof(log_info), format, ap);
va_end(ap);
lg._message_info = log_info;
// 打印出来日志
FlushLog(lg);
}
~Log()
{
}
private:
int _type;
std::string _logfile;
};
Log lg;
#define LOG(Level, Format, ...) \
do \
{ \
lg.logMessage(__FILE__, __LINE__, Level, Format, ##__VA_ARGS__); \
} while (0)
#define EnableScreen() \
do \
{ \
lg.Enable(SCREEN_TYPE); \
} while (0)
#define EnableFILE() \
do \
{ \
lg.Enable(FILE_TYPE); \
} while (0)
};
nocopy.hpp
#pragma once
class nocopy
{
public:
nocopy(){}
~nocopy(){}
nocopy(const nocopy&) =delete;
const nocopy& operator=(const nocopy&) =delete;
};
UdpClientMain.cc
#include<iostream>
#include<string>
#include<cstring>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
//客户端在未来一定要知道服务器IP地址和端口号
// ./udp_client server-ip server-port
// ./udp_client 127.0.0.1 8888
int main(int argc,char* argv[])
{
if(argc!=3)
{
std::cerr<<"Usage: "<<argv[0]<<"server-ip server-port"<<std::endl;
exit(0);
}
std::string serverip=argv[1];
uint16_t serverport=std::stoi(argv[2]);
int sockfd=::socket(AF_INET,SOCK_DGRAM,0);
if(sockfd<0)
{
std::cerr<<"create socket error"<<std::endl;
exit(1);
}
//client的端口号,一般不让用户自己设定,而是让client OS随机选择
//client需要bind它自己的IP和端口,但是client不需要 “显示” bind它自己的IP和端口
//client在首次向服务器发送数据的时候,OS会自动给client bind它自己的IP和端口
struct sockaddr_in server;
memset(&server,0,sizeof(server));
server.sin_family=AF_INET;
server.sin_port=htons(serverport);
server.sin_addr.s_addr=inet_addr(serverip.c_str());
while(1)
{
std::string line;
std::cout<<"Please Enter# ";
std::getline(std::cin,line);
int n=sendto(sockfd,line.c_str(),line.size(),0,(struct sockaddr*)&server,sizeof(server));//你要发送消息,你得知道你要发给谁
if(n>0)
{
struct sockaddr_in temp;
socklen_t len=sizeof(temp);
char buffer[1024];
int m=recvfrom(sockfd,buffer,sizeof(buffer)-1,0,(struct sockaddr*)&temp,&len);
if(m>0)
{
buffer[m]=0;
std::cout<<buffer<<std::endl;
}
else
{
std::cout<<"recvfrom error"<<std::endl;
break;
}
}
else
{
std::cout<<"sendto error"<<std::endl;
break;
}
}
::close(sockfd);
return 0;
}
UdpServer.hpp
#pragma once
#include<iostream>
#include<unistd.h>
#include<string>
#include<cstring>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include"nocopy.hpp"
#include"Log.hpp"
#include"InetAddr.hpp"
using namespace log_ns;
static const int gsockfd=-1;
static const uint16_t glocalport=8888;
enum {
SOCKET_ERROR=1,
BIND_ERROR
};
//UdpServer user("192.168.1.1",8899)
class UdpServer : public nocopy
{
public:
UdpServer(uint16_t localport=glocalport)
:_sockfd(gsockfd),
_localport(localport),
_isrunning(false)
{
}
void InitServer()
{
//1.创建socket文件
_sockfd=::socket(AF_INET,SOCK_DGRAM,0);
if(_sockfd<0)
{
LOG(FATAL,"socket error/n");
exit(SOCKET_ERROR);
}
LOG(DEBUG,"socket create success, _sockfd: %d\n",_sockfd);//3
//2. bind
struct sockaddr_in local;
memset(&local,0,sizeof(local));
local.sin_family=AF_INET;
local.sin_port=htons(_localport);
// local.sin_addr.s_addr=inet_addr(_localip.c_str());//1.需要4字节ip 2.需要网络序列的ip--暂时
local.sin_addr.s_addr=INADDR_ANY;//服务器端,进行任意ip地址绑定
int n=::bind(_sockfd,(struct sockaddr*)&local,sizeof(local));
if(n<0)
{
LOG(FATAL,"bind error\n");
exit(BIND_ERROR);
}
LOG(DEBUG,"socket bind success\n");
}
void Start()
{
_isrunning=true;
char inbuffer[1024];
while(_isrunning)
{
struct sockaddr_in peer;
socklen_t len=sizeof(peer);
ssize_t n=recvfrom(_sockfd,inbuffer,sizeof(inbuffer)-1,0,(struct sockaddr*)&peer,&len);
if(n>0)
{
InetAddr addr(peer);
inbuffer[n]=0;
std::cout<<"["<<addr.Ip()<<":"<<addr.Port()<<"]# "<<inbuffer<<std::endl;
std::string echo_string="[udp_server echo] #";
echo_string+=inbuffer;
sendto(_sockfd,echo_string.c_str(),echo_string.size(),0,(struct sockaddr*)&peer,len);
}
else
{
std::cout<<"recvfrom,error"<<std::endl;
}
}
}
~UdpServer()
{
if(_sockfd>gsockfd) ::close(_sockfd);
}
private:
int _sockfd;
uint16_t _localport;
// std::string _localip;//TODO:后面要专门处理这个ip
bool _isrunning;
};
UdpServerMain.cc
#include"UdpServer.hpp"
#include<memory>
// ./udp_server local-port
// ./udp_server 8888
int main(int argc,char* argv[])
{
if(argc!=2)
{
std::cerr<<"Usage: "<<argv[0]<<"local-port"<<std::endl;
exit(0);
}
uint16_t port=std::stoi(argv[1]);
EnableScreen();
std::unique_ptr<UdpServer> usvr=std::make_unique<UdpServer>(port);
usvr->InitServer();
usvr->Start();
return 0;
}
运行结果图:
DictServer-版本2
实现一个简单的英译汉的网络字典
仅需修改部分版本1和添加部分代码,其余没变化代码不再显示。
Dict.hpp
#pragma once
#include<iostream>
#include<string>
#include<fstream>
#include<unordered_map>
#include<unistd.h>
#include"Log.hpp"
using namespace log_ns;
const static std::string sep=": ";
class Dict
{
private:
void LoadDict(const std::string& path)
{
std::ifstream in(path);
if(!in.is_open())
{
LOG(FATAL,"open %s failed!\n",path.c_str());
exit(0);
}
std::string line;
while(std::getline(in,line))
{
LOG(DEBUG,"load info: %s ,success\n",line.c_str());
if(line.empty()) continue;
auto pos=line.find(sep);
if(pos==std::string::npos) continue;
std::string key=line.substr(0,pos);
if(key.empty()) continue;
std::string value=line.substr(pos+sep.size());
if(value.empty()) continue;
_dict.insert(std::make_pair(key,value));
}
LOG(INFO,"load %s done\n",path.c_str());
in.close();
}
public:
Dict(const std::string& dict_path):_dict_path(dict_path)
{
LoadDict(_dict_path);
}
std::string Translate(std::string word)
{
if(word.empty()) return "None";
auto iter=_dict.find(word);
if(iter==_dict.end()) return "None";
else return iter->second;
}
~Dict()
{}
private:
std::unordered_map<std::string,std::string> _dict;
std::string _dict_path;
};
dict.txt
apple: 苹果
banana: 香蕉
cat: 猫
dog: 狗
book: 书
pen: 笔
happy: 快乐的
sad: 悲伤的
run: 跑
jump: 跳
teacher: 老师
student: 学生
car: 汽车
bus: 公交车
love: 爱
hate: 恨
hello: 你好
goodbye: 再见
summer: 夏天
winter: 冬天
UdpServer.hpp
#pragma once
#include<iostream>
#include<unistd.h>
#include<string>
#include<cstring>
#include<functional>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include"nocopy.hpp"
#include"Log.hpp"
#include"InetAddr.hpp"
using namespace log_ns;
static const int gsockfd=-1;
static const uint16_t glocalport=8888;
enum {
SOCKET_ERROR=1,
BIND_ERROR
};
using func_t =std::function<std::string(std::string)>;
//UdpServer user("192.168.1.1",8899)
class UdpServer : public nocopy
{
public:
UdpServer(func_t func,uint16_t localport=glocalport)
:_func(func),
_sockfd(gsockfd),
_localport(localport),
_isrunning(false)
{
}
void InitServer()
{
//1.创建socket文件
_sockfd=::socket(AF_INET,SOCK_DGRAM,0);
if(_sockfd<0)
{
LOG(FATAL,"socket error/n");
exit(SOCKET_ERROR);
}
LOG(DEBUG,"socket create success, _sockfd: %d\n",_sockfd);//3
//2. bind
struct sockaddr_in local;
memset(&local,0,sizeof(local));
local.sin_family=AF_INET;
local.sin_port=htons(_localport);
// local.sin_addr.s_addr=inet_addr(_localip.c_str());//1.需要4字节ip 2.需要网络序列的ip--暂时
local.sin_addr.s_addr=INADDR_ANY;//服务器端,进行任意ip地址绑定
int n=::bind(_sockfd,(struct sockaddr*)&local,sizeof(local));
if(n<0)
{
LOG(FATAL,"bind error\n");
exit(BIND_ERROR);
}
LOG(DEBUG,"socket bind success\n");
}
void Start()
{
_isrunning=true;
char inbuffer[1024];
while(_isrunning)
{
struct sockaddr_in peer;
socklen_t len=sizeof(peer);
ssize_t n=recvfrom(_sockfd,inbuffer,sizeof(inbuffer)-1,0,(struct sockaddr*)&peer,&len);
if(n>0)
{
InetAddr addr(peer);
inbuffer[n]=0;
//一个一个的单词
std::cout<<"["<<addr.Ip()<<":"<<addr.Port()<<"]# "<<inbuffer<<std::endl;
std::string result=_func(inbuffer);
sendto(_sockfd,result.c_str(),result.size(),0,(struct sockaddr*)&peer,len);
}
else
{
std::cout<<"recvfrom,error"<<std::endl;
}
}
}
~UdpServer()
{
if(_sockfd>gsockfd) ::close(_sockfd);
}
private:
int _sockfd;
uint16_t _localport;
// std::string _localip;//TODO:后面要专门处理这个ip
bool _isrunning;
func_t _func;
};
UdpServerMain.cc
#include"UdpServer.hpp"
#include"Dict.hpp"
#include<memory>
// ./udp_server local-port
// ./udp_server 8888
int main(int argc,char* argv[])
{
if(argc!=2)
{
std::cerr<<"Usage: "<<argv[0]<<"local-port"<<std::endl;
exit(0);
}
uint16_t port=std::stoi(argv[1]);
EnableScreen();
Dict dict("./dict.txt");
func_t translate=std::bind(&Dict::Translate,&dict,std::placeholders::_1);
std::unique_ptr<UdpServer> usvr=std::make_unique<UdpServer>(translate,port);
usvr->InitServer();
usvr->Start();
return 0;
}
运行结果如下图:
简单聊天室-版本3
Route.hpp
#pragma once
#include<iostream>
#include<string>
#include<vector>
#include<functional>
#include<sys/types.h>
#include<sys/socket.h>
#include<pthread.h>
#include"InetAddr.hpp"
#include"ThreadPool.hpp"
#include"LockGuard.hpp"
using task_t =std::function<void()>;
class Route
{
public:
Route()
{
pthread_mutex_init(&_mutex,nullptr);
}
void CheckOnlineUser(InetAddr& who)
{
LockGuard lockguard(&_mutex);
for(auto& user:_online_user)
{
if(user==who)
{
LOG(DEBUG,"%s is exists\n",who.AddrStr().c_str());
return;
}
}
LOG(DEBUG,"%s is not exists,add it\n",who.AddrStr().c_str());
_online_user.push_back(who);
}
void Offline(InetAddr& who)
{
LockGuard lockguard(&_mutex);
auto iter=_online_user.begin();
for(;iter!=_online_user.end();iter++)
{
if(*iter==who)
{
LOG(DEBUG,"%s is offline\n",who.AddrStr().c_str());
_online_user.erase(iter);
break;
}
}
}
void ForwardHelper(int sockfd,const std::string message,InetAddr who)
{
LockGuard lockguard(&_mutex);
std::string send_message="["+who.AddrStr()+"]#"+message;
for(auto& user:_online_user)
{
struct sockaddr_in peer =user.Addr();
LOG(DEBUG,"Forward message to %s,message is %s\n",user.AddrStr().c_str(),send_message.c_str());
::sendto(sockfd,send_message.c_str(),send_message.size(),0,(struct sockaddr*)&peer,sizeof(peer));
}
}
void Forward(int sockfd,const std::string& message,InetAddr& who)//转发
{
//1.该用户是否在 在线用户列表中?如果在,什么都不做;如果不在,自动添加到_online_user
CheckOnlineUser(who);
//1.1 message == "QUIT" "Q"
if(message=="QUIT" || message== "Q")
{
Offline(who);
}
//2.who 一定在_online_user列表里面
//ForwardHelper(sockfd,message);
task_t t=std::bind(&Route::ForwardHelper,this,sockfd,message,who);
ThreadPool<task_t>::GetInstance()->Equeue(t);
}
~Route()
{
pthread_mutex_destroy(&_mutex);
}
private:
std::vector<InetAddr> _online_user;
pthread_mutex_t _mutex;
};
Thread.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
namespace ThreadMoudle
{
// 线程要执行的方法,后面我们随时调整
// typedef void (*func_t)(ThreadData *td); // 函数指针类型
// typedef std::function<void()> func_t;
using func_t = std::function<void(const std::string&)>;
class Thread
{
public:
void Excute()
{
_isrunning = true;
_func(_name);
_isrunning = false;
}
public:
Thread(const std::string &name, func_t func):_name(name), _func(func)
{
}
static void *ThreadRoutine(void *args) // 新线程都会执行该方法!
{
Thread *self = static_cast<Thread*>(args); // 获得了当前对象
self->Excute();
return nullptr;
}
bool Start()
{
int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);
if(n != 0) return false;
return true;
}
std::string Status()
{
if(_isrunning) return "running";
else return "sleep";
}
void Stop()
{
if(_isrunning)
{
::pthread_cancel(_tid);
_isrunning = false;
}
}
void Join()
{
::pthread_join(_tid, nullptr);
}
std::string Name()
{
return _name;
}
~Thread()
{
}
private:
std::string _name;
pthread_t _tid;
bool _isrunning;
func_t _func; // 线程要执行的回调函数
};
} // namespace ThreadModle
ThreadPool.hpp
#pragma once
#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include <queue>
#include <functional>
#include "Thread.hpp"
#include "Log.hpp"
#include "LockGuard.hpp"
using namespace ThreadMoudle;
using namespace log_ns;
static const int gdefaultnum = 5;
void test()
{
while (true)
{
std::cout << "hello world" << std::endl;
sleep(1);
}
}
template <typename T>
class ThreadPool
{
private:
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void Wakeup()
{
pthread_cond_signal(&_cond);
}
void WakeupAll()
{
pthread_cond_broadcast(&_cond);
}
void Sleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
bool IsEmpty()
{
return _task_queue.empty();
}
void HandlerTask(const std::string &name) // this
{
while (true)
{
// 取任务
LockQueue();
while (IsEmpty() && _isrunning)
{
_sleep_thread_num++;
LOG(INFO, "%s thread sleep begin!\n", name.c_str());
Sleep();
LOG(INFO, "%s thread wakeup!\n", name.c_str());
_sleep_thread_num--;
}
// 判定一种情况
if (IsEmpty() && !_isrunning)
{
UnlockQueue();
LOG(INFO, "%s thread quit\n", name.c_str());
break;
}
// 有任务
T t = _task_queue.front();
_task_queue.pop();
UnlockQueue();
// 处理任务
t(); // 处理任务,此处不用/不能在临界区中处理
// std::cout << name << ": " << t.result() << std::endl;
//LOG(DEBUG, "hander task done, task is : %s\n", t.result().c_str());
}
}
void Init()
{
func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
for (int i = 0; i < _thread_num; i++)
{
std::string threadname = "thread-" + std::to_string(i + 1);
_threads.emplace_back(threadname, func);
LOG(DEBUG, "construct thread %s done, init success\n", threadname.c_str());
}
}
void Start()
{
_isrunning = true;
for (auto &thread : _threads)
{
LOG(DEBUG, "start thread %s done.\n", thread.Name().c_str());
thread.Start();
}
}
ThreadPool(int thread_num = gdefaultnum)
: _thread_num(thread_num), _isrunning(false), _sleep_thread_num(0)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
ThreadPool(const ThreadPool<T> &) = delete;
void operator=(const ThreadPool<T> &) = delete;
public:
void Stop()
{
LockQueue();
_isrunning = false;
WakeupAll();
UnlockQueue();
LOG(INFO, "Thread Pool Stop Success!\n");
}
// 如果是多线程获取单例呢?
static ThreadPool<T> *GetInstance()
{
if (_tp == nullptr)
{
LockGuard lockguard(&_sig_mutex);
if (_tp == nullptr)
{
LOG(INFO, "create threadpool\n");
// thread-1 thread-2 thread-3....
_tp = new ThreadPool<T>();
_tp->Init();
_tp->Start();
}
else
{
LOG(INFO, "get threadpool\n");
}
}
return _tp;
}
void Equeue(const T &in)
{
LockQueue();
if (_isrunning)
{
_task_queue.push(in);
if (_sleep_thread_num > 0)
Wakeup();
}
UnlockQueue();
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
int _thread_num;
std::vector<Thread> _threads;
std::queue<T> _task_queue;
bool _isrunning;
int _sleep_thread_num;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
// 单例模式
// volatile static ThreadPool<T> *_tp;
static ThreadPool<T> *_tp;
static pthread_mutex_t _sig_mutex;
};
template <typename T>
ThreadPool<T> *ThreadPool<T>::_tp = nullptr;
template <typename T>
pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;
UdpClientMain.cc
#include<iostream>
#include<string>
#include<cstring>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include"Thread.hpp"
using namespace ThreadMoudle;
int InitClient()
{
int sockfd=::socket(AF_INET,SOCK_DGRAM,0);
if(sockfd<0)
{
std::cerr<<"create socket error"<<std::endl;
exit(1);
}
return sockfd;
}
void RecvMessage(int sockfd,const std::string &name)
{
while(true)
{
struct sockaddr_in peer;
socklen_t len=sizeof(peer);
char buffer[1024];
int n=recvfrom(sockfd,buffer,sizeof(buffer)-1,0,(struct sockaddr*)&peer,&len);
if(n>0)
{
buffer[n]=0;
std::cerr<<buffer<<std::endl;
}
else
{
std::cerr<<"recvfrom error"<<std::endl;
break;
}
}
}
void SendMessage(int sockfd,std::string serverip,uint16_t serverport,const std::string &name)
{
struct sockaddr_in server;
memset(&server,0,sizeof(server));
server.sin_family=AF_INET;
server.sin_port=htons(serverport);
server.sin_addr.s_addr=inet_addr(serverip.c_str());
std::string cli_profix=name+"#";//sender-thread# 你好
while(true)
{
std::string line;
std::cout<<cli_profix;
std::getline(std::cin,line);
int n=sendto(sockfd,line.c_str(),line.size(),0,(struct sockaddr*)&server,sizeof(server));
if(n<=0)
break;
}
}
int main(int argc,char* argv[])
{
if(argc!=3)
{
std::cerr<<"Usage: "<<argv[0]<<"server-ip server-port"<<std::endl;
exit(0);
}
std::string serverip=argv[1];
uint16_t serverport=std::stoi(argv[2]);
int sockfd=InitClient();
Thread recver("recver-thread",std::bind(&RecvMessage,sockfd,std::placeholders::_1));
Thread sender("sender-thread",std::bind(&SendMessage,sockfd,serverip,serverport,std::placeholders::_1));
recver.Start();
sender.Start();
recver.Join();
sender.Join();
::close(sockfd);
return 0;
}
UdpServer.hpp
#pragma once
#include<iostream>
#include<unistd.h>
#include<string>
#include<cstring>
#include<functional>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include"nocopy.hpp"
#include"Log.hpp"
#include"InetAddr.hpp"
using namespace log_ns;
static const int gsockfd=-1;
static const uint16_t glocalport=8888;
enum {
SOCKET_ERROR=1,
BIND_ERROR
};
using service_t =std::function<void(int,const std::string& message,InetAddr& who)>;
//UdpServer user("192.168.1.1",8899)
//一般服务器主要是用来进行网络数据读取和写入的,即IO的
//服务器IO逻辑 和 业务逻辑 解耦
class UdpServer : public nocopy
{
public:
UdpServer(service_t func,uint16_t localport=glocalport)
:_func(func),
_sockfd(gsockfd),
_localport(localport),
_isrunning(false)
{
}
void InitServer()
{
//1.创建socket文件
_sockfd=::socket(AF_INET,SOCK_DGRAM,0);
if(_sockfd<0)
{
LOG(FATAL,"socket error/n");
exit(SOCKET_ERROR);
}
LOG(DEBUG,"socket create success, _sockfd: %d\n",_sockfd);//3
//2. bind
struct sockaddr_in local;
memset(&local,0,sizeof(local));
local.sin_family=AF_INET;
local.sin_port=htons(_localport);
// local.sin_addr.s_addr=inet_addr(_localip.c_str());//1.需要4字节ip 2.需要网络序列的ip--暂时
local.sin_addr.s_addr=INADDR_ANY;//服务器端,进行任意ip地址绑定
int n=::bind(_sockfd,(struct sockaddr*)&local,sizeof(local));
if(n<0)
{
LOG(FATAL,"bind error\n");
exit(BIND_ERROR);
}
LOG(DEBUG,"socket bind success\n");
}
void Start()
{
_isrunning=true;
char message[1024];
while(_isrunning)
{
struct sockaddr_in peer;
socklen_t len=sizeof(peer);
ssize_t n=recvfrom(_sockfd,message,sizeof(message)-1,0,(struct sockaddr*)&peer,&len);
if(n>0)
{
InetAddr addr(peer);
message[n]=0;
LOG(DEBUG,"[%s]# %s\n",addr.AddrStr().c_str(),message);
_func(_sockfd,message,addr);
LOG(DEBUG,"return udpserver\n");
}
else
{
std::cout<<"recvfrom,error"<<std::endl;
}
}
}
~UdpServer()
{
if(_sockfd>gsockfd) ::close(_sockfd);
}
private:
int _sockfd;//读写都用同一个sockfd,说明:udp是全双工通信的
uint16_t _localport;
// std::string _localip;//TODO:后面要专门处理这个ip
bool _isrunning;
service_t _func;
};
UdpServerMain.cc
#include"UdpServer.hpp"
#include"Route.hpp"
#include<memory>
// ./udp_server local-port
// ./udp_server 8888
int main(int argc,char* argv[])
{
if(argc!=2)
{
std::cerr<<"Usage: "<<argv[0]<<"local-port"<<std::endl;
exit(0);
}
uint16_t port=std::stoi(argv[1]);
EnableScreen();
Route messageRoute;
service_t message_route=std::bind(&Route::Forward,&messageRoute,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3);
std::unique_ptr<UdpServer> usvr=std::make_unique<UdpServer>(message_route,port);
usvr->InitServer();
usvr->Start();
return 0;
}
地址转换函数
上面代码中,我们用的地址转换函数是inet_ntoa,在InetAddr.hpp中。
inet_ntoa 这个函数返回了一个 char*, 很显然是这个函数自己在内部为我们申请了一块内存来保存 ip 的结果. 那么是否需要调用者手动释放呢?
man 手册上说, inet_ntoa 函数, 是把这个返回结果放到了静态存储区. 这个时候不需要我们手动进行释放.
如果我们调用多次这个函数, 会有什么样的效果呢?
因为 inet_ntoa 把结果放到自己内部的一个静态存储区, 这样第二次调用时的结果会覆盖掉上一次的结果
在多线程环境下, 推荐使用 inet_ntop, 这个函数由调用者提供一个缓冲区保存结果, 可以规避线程安全问题;
src就是4字节的ip地址,dst就是自己传的缓冲区,size就是缓冲区的大小。
使用示范: