📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨
文章目录
- 🏳️🌈一、UdpServer.hpp 更新
- 1.1 基本结构
- 1.2 构造函数
- 1.3 开始方法 - start
- 1.4 注册服务聊天室
- 🏳️🌈二、用户类 User
- 🏳️🌈三、路由类 Route
- 3.1 新增用户 Adduser
- 3.2 删除用户 DelAddr
- 3.3 路由发送功能 Router
- 3.4 整体代码
- 🏳️🌈四、UdpServer.cpp 更新
- 🏳️🌈五、UdpClient.cpp 更新
- 5.1 发送消息
- 5.2 退出方法
- 5.3 接收消息
- 🏳️🌈六、测试代码
- 🏳️🌈七、整体代码
- 7.1 UserServer.hpp
- 7.2 UserServer.cpp
- 7.3 UserClient.hpp
- 7.4 UserClient.cpp
- 7.5 User.hpp
- 7.6 ThreadPool.hpp
- 7.7 Thread.hpp
- 7.9 Mutex.hpp
- 7.10 Log.hpp
- 7.11 Cond.hpp
- 7.12 Common.hpp
- 7.13 InetAddr.hpp
- 7.14 Makefile
- 👥总结
🏳️🌈一、UdpServer.hpp 更新
我们这次的整体思路是,利用回调函数,实现聊天室的用户增加、用户删除和消息路由
1.1 基本结构
- 就上上次字典类一样, func_t 我们需要三个回调函数,分别用来用户增加、用户删除和消息路由,不仅如此,我们可以引入线程池的概念,将消息路由作为一个个线程任务,放入线程池中
using adduser_t = std::function<void(InetAddr& id)>;
using deluser_t = std::function<void(InetAddr& id)>;
using task_t = std::function<void()>;
using route_t = std::function<void(int sockfd, const std::string& msg)>;
class UdpServer : public nocopy{
public:
UdpServer(uint16_t localport = gdefaultport)
: _sockfd(gsockfd),
_localport(localport),
_isrunning(false)
{}
void InitServer(){}
// 注册聊天室服务
void RegisterService(adduser_t adduser, deluser_t deluser, route_t route){}
void Start(){}
~UdpServer(){}
private:
int _sockfd; // 文件描述符
uint16_t _localport; // 端口号
std::string _localip; // 本地IP地址
bool _isrunning; // 运行状态
adduser_t _adduser; // 添加用户回调函数
deluser_t _deluser; // 删除用户回调函数
route_t _route; // 路由回调函数
task_t _task; // 任务回调函数
};
1.2 构造函数
- 因为我们均用回调函数,来实现方法,所以可以在函数类外面,利用lambda函数放到RegisterService中初始化各个方法,降低耦合
UdpServer(uint16_t localport = gdefaultport)
: _sockfd(gsockfd), _localport(localport), _isrunning(false) {}
1.3 开始方法 - start
- 接收客户端消息等方法不变,我们根据客户端的消息判断当前用户是要增还是删,分别调用不同的方法,然后将对应的消息路由给当前聊天室中的各个用户
void Start() {
_isrunning = true;
while (true) {
char inbuffer[1024]; // 接收缓冲区
struct sockaddr_in peer; // 接收客户端地址
socklen_t peerlen = sizeof(peer); // 计算接收的客户端地址长度
// 接收数据报
// recvfrom(int sockfd, void* buf, size_t len, int flags, struct
// sockaddr* src_addr, socklen_t* addrlen)
// 从套接字接收数据,并存入buf指向的缓冲区中,返回实际接收的字节数
// 参数sockfd:套接字文件描述符
// 参数buf:指向接收缓冲区的指针,c_str()函数可以将字符串转换为char*,以便存入缓冲区
// 参数len:接收缓冲区的长度
// 参数flags:接收标志,一般设为0
// 参数src_addr:指向客户端地址的指针,若不为NULL,函数返回时,该指针指向客户端的地址,是网络字节序
// 参数addrlen:客户端地址长度的指针,若不为NULL,函数返回时,该指针指向实际的客户端地址长度
ssize_t n = ::recvfrom(_sockfd, inbuffer, sizeof(inbuffer) - 1, 0,
CONV(&peer), &peerlen);
if (n > 0) {
InetAddr cli(peer);
inbuffer[n] = 0;
std::string message;
if (strcmp(inbuffer, "QUIT") == 0) {
// 删除用户
_deluser(cli);
message = cli.AddrStr() + "# " + "退出聊天室";
} else {
// 新增用户
_adduser(cli);
message = cli.AddrStr() + "# " + inbuffer;
}
// 转发消息
task_t task = std::bind(UdpServer::_route, _sockfd, message);
ThreadPool<task_t>::getInstance()->Equeue(task);
}
}
}
1.4 注册服务聊天室
- 这个部分只需要获取对应的方法,然后给类对象的回调函数就行了
// 注册聊天室服务
void RegisterService(adduser_t adduser, deluser_t deluser, route_t route) {
_adduser = adduser;
_deluser = deluser;
_route = route;
}
🏳️🌈二、用户类 User
- 既然是聊天室,那除了聊天功能,最重要的就是用户了,所以我们需要为用户创建一个类对象,所需功能不需要很多,除了标准的构造、析构函数,只需要添加发送、判断、及获取地址的功能就行了。
所以,我们可以先定义一个父类,构建虚函数,然后再子类中实现。
class UserInterface{
public:
virtual ~UserInterface() = default;
virtual void SendTo(int sockfd, const std::string& message) = 0;
virtual bool operator==(const InetAddr& user) = 0;
virtual std::string Id() = 0;
};
class User :public UserInterface{
public:
User(const InetAddr& id) : _id(id) {};
void SendTo(int sockfd, const std::string& message) override{
LOG(LogLevel::DEBUG) << "send message to " << _id.AddrStr() << " info: " << message;
int n = ::sendto(sockfd, message.c_str(), message.size(), 0, _id.NetAddr(), _id.NetAddrLen());
(void)n;
}
bool operator==(const InetAddr& user) override{
return _id == user;
}
std::string Id() override{
return _id.AddrStr();
}
~User(){}
private:
InetAddr _id;
};
为什么选择使用这种父类纯虚函数,子类实现的方法?
这是因为
多态性
允许不同类的对象通过同一接口表现出不同行为
- 如果未来需要支持其他类型的用户(如 AdminUser、GuestUser),只需继承 UserInterface 并实现接口,无需修改 User 的代码。
🏳️🌈三、路由类 Route
- 这个类负责 对用户的管理,增删用户,同时执行将每个人发出的消息转发给在线的所有人的功能
class Route{
public:
Route(){}
void AddUser(InetAddr& id){}
void DelUser(InetAddr& id){}
void Router(int sockfd, const std::string& message){}
~Route(){}
private:
std::list<std::shared_ptr<UserInterface>> _online_user;
Mutex _mutex;
};
为什么这里我们选择使用 链表
对所有用户进行管理,而不选择其他容器?
链表适合频繁增删, 因为聊天室人员变动是很随机的,很有可能会在中间随机删掉一个用户,用 vector 等顺序链表,虽然占空间会小一些,但是删除的时间复杂度是 O(n),而链表只是 O(1)。
3.1 新增用户 Adduser
主要逻辑就是,先判断该用户是否存在,如果存在就提示“已经存在”,不然地话,就增加到管理上线用户的链表中
void AddUser(InetAddr& id) {
LockGuard lockguard(_mutex);
for (auto& user : _online_user) {
if (*user == id) {
LOG(LogLevel::INFO) << id.AddrStr() << " 已经在线";
return;
}
}
// 到这里说明 在线用户中 不存在 新增用户
LOG(LogLevel::INFO) << " 新增该用户: " << id.AddrStr();
_online_user.push_back(std::make_shared<User>(id));
}
为何在尾插时不能直接使用 UserInterface?
UserInterface 定义了纯虚函数(如 SendTo、operator==),因此 无法直接实例化。必须通过其子类(如 User)实现这些接口。
3.2 删除用户 DelAddr
这里我们采用
remove_if + erase
的方法,将 不满足删除条件 的元素移动到容器的前端,覆盖掉需要删除的元素,返回一个迭代器 pos,指向 保留元素的新逻辑结尾(即第一个需要删除的元素的位置),删除从 pos 到容器末尾的所有元素,调整容器大小。
void DelUser(InetAddr& id) {
LockGuard lockguard(_mutex);
// 遍历容器,将 不满足删除条件
// 的元素移动到容器的前端,覆盖掉需要删除的元素 返回一个迭代器 pos,指向
// 保留元素的新逻辑结尾(即第一个需要删除的元素的位置)
auto pos =
std::remove_if(_online_user.begin(), _online_user.end(),
[&id](const std::shared_ptr<UserInterface>& user) {
return *user == id;
});
// 删除从 pos 到容器末尾的所有元素,调整容器大小。
_online_user.erase(pos, _online_user.end());
}
3.3 路由发送功能 Router
也就是整体遍历一边在线用户,然后逐个发送就行了
void Router(int sockfd, const std::string& message) {
LockGuard lockguard(_mutex);
for (auto& user : _online_user) {
user->SendTo(sockfd, message);
}
}
3.4 整体代码
为了一会便于观察用户是否上线等,我们可以添加一个
PrintUser
的方法。
class Route{
public:
Route(){}
void AddUser(InetAddr& id){
LockGuard lockguard(_mutex);
for(auto& user : _online_user){
if(*user == id){
LOG(LogLevel::INFO) << id.AddrStr() << " 已经在线";
return;
}
}
// 到这里说明 在线用户中 不存在 新增用户
LOG(LogLevel::INFO) << " 新增该用户: " << id.AddrStr();
_online_user.push_back(std::make_shared<User>(id));
PrintUsers();
}
void DelUser(InetAddr& id){
LockGuard lockguard(_mutex);
// 遍历容器,将 不满足删除条件 的元素移动到容器的前端,覆盖掉需要删除的元素
// 返回一个迭代器 pos,指向 保留元素的新逻辑结尾(即第一个需要删除的元素的位置)
auto pos = std::remove_if(_online_user.begin(), _online_user.end(),
[&id](const std::shared_ptr<UserInterface>& user){
return *user == id;
});
// 删除从 pos 到容器末尾的所有元素,调整容器大小。
_online_user.erase(pos, _online_user.end());
PrintUsers();
}
void Router(int sockfd, const std::string& message){
LockGuard lockguard(_mutex);
for(auto& user : _online_user){
user->SendTo(sockfd, message);
}
}
void PrintUsers(){
for(auto& user : _online_user){
LOG(LogLevel::DEBUG) << "online user: " << user->Id();
}
}
~Route(){}
private:
std::list<std::shared_ptr<UserInterface>> _online_user;
Mutex _mutex;
};
🏳️🌈四、UdpServer.cpp 更新
因为我们在服务端头文件中设置了线程池,需要将相应的处理函数,绑定到 RegisterService 方法中,所以我们可以先用智能指针创建路由方法对象,然后将对应的方法绑定到 RegisterService 中
#include "UdpServer.hpp"
#include "User.hpp"
int main(int argc, char *argv[])
{
if(argc != 2){
std::cerr << "Usage: " << argv[0] << " localport" << std::endl;
Die(1);
}
uint16_t port = std::stoi(argv[1]);
ENABLE_CONSOLE_LOG(); // 日期类方法,使日志在控制台输出
std::unique_ptr<UdpServer> usvr = std::make_unique<UdpServer>(port);
std::shared_ptr<Route> route = std::make_shared<Route>();
usvr->RegisterService([&route](InetAddr& id){route->AddUser(id);},
[&route](InetAddr& id){route->DelUser(id);},
[&route](int sockfd, const std::string& msg){route->Router(sockfd, msg);}
);
usvr->InitServer(); // 初始化服务端
usvr->Start(); // 启动服务端
return 0;
}
🏳️🌈五、UdpClient.cpp 更新
5.1 发送消息
这部分完全不变,就是先创建套接字,然后利用sendto方法将消息发给服务端,至于后面是增、删还是消息路由就不用管了
5.2 退出方法
- 我们前面说过我们在遇到 QUIT 的时候会退出,我们可以将 crtl + c 即 2信号与退出方法连接起来,当输入 crtl + c 命令后,会退出。
int main(int argc, char* argv[]){
// ...
// 注册信号退出函数
// 将信号 2 注册到 ClientQuit 函数
// 信号2 是 SIGINT,表示 Ctrl-C
signal(2, ClientQuit);
// ...
return 0;
}
5.3 接收消息
- 因为我们这里是聊天室,所以客户端除了需要实现能够不断地发送消息,还需要做到接收消息,但不阻塞发送消息地方法。用户希望随时输入消息并立即看到其他人的回复,若接收操作阻塞主线程,需等待接收完成才能输入。
因此,我们可以利用接收线程持续监听服务端消息,主线程处理用户输入,两者并行不相互阻塞。
int sockfd = -1;
struct sockaddr_in server;
void ClientQuit(int signo){
(void)signo;
const std::string msg = "QUIT";
int n = ::sendto(sockfd, msg.c_str(), msg.size(), 0, CONV(&server), sizeof(server));
}
int main(int argc, char* argv[]){
if(argc != 3){
std::cerr << argv[0] << " serverip server" << std::endl;
Die(USAGE_ERR);
}
// 注册信号退出函数
// 将信号 2 注册到 ClientQuit 函数
// 信号2 是 SIGINT,表示 Ctrl-C
signal(2, ClientQuit);
std::string serverip = argv[1];
uint16_t serverport = std::stoi(argv[2]);
// 1. 创建套接字
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if(sockfd < 0){
std::cerr << "create socket error" << std::endl;
Die(SOCKET_ERR);
}
// 1.1 填充 server 信息
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = ::htons(serverport);
server.sin_addr.s_addr = ::inet_addr(serverip.c_str());
// 1.2 创建线程
pthread_t tid;
pthread_create(&tid, nullptr, Recver, nullptr);
// 1.3 启动的时候,给服务器推送消息
const std::string msg = "... 来了";
int n = ::sendto(sockfd, msg.c_str(), msg.size(), 0, CONV(&server), sizeof(server));
(void)n;
// 2. 发送数据
while(true){
std::cout << "Please Enter# ";
std::string msg;
std::getline(std::cin, msg);
// client 必须自己的ip和端口。但是客户端,不需要显示调用bind
// 客户端首次 sendto 消息的时候,由OS自动bind
// 1. 如何理解 client 自动随机bind端口号? 一个端口号,只能读一个进程bind
// 2. 如何理解 server 要显示地bind? 必须稳定!必须是众所周知且不能轻易改变的
int n = ::sendto(sockfd, msg.c_str(), msg.size(), 0, CONV(&server), sizeof(server));
(void)n;
}
return 0;
}
🏳️🌈六、测试代码
- 我们运行端口号为 8080 的服务端
- 在root账号下运行第一个客户,显示 48627来了
- 再在我个人的账户上运行第二个客户,显示 42772来了
- 我们在root下输入
111
,成功路由到我的个人账户上
- 当我们将个人账户给退出后,服务端会显示 退出聊天室,然后回显在线用户
- 再尝试在 root 账户下发送 111,只有root自己会收到消息
🏳️🌈七、整体代码
7.1 UserServer.hpp
#include <iostream>
#include <string>
#include <memory>
#include <cstring>
#include <functional>
#include <cerrno> // 这个头文件包含了errno定义,用于存放系统调用的返回值
#include <strings.h> // 属于POSIX扩展(非标准C/C++),常见于Unix/Linux系统,提供额外字符串函数(如 bcopy, bzero)
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "InetAddr.hpp"
#include "Log.hpp"
#include "Common.hpp"
#include "ThreadPool.hpp"
using namespace LogModule;
using namespace ThreadPoolModule;
const static int gsockfd = -1;
const static std::string gdefaultip = "127.0.0.1"; // 表示本地主机
const static uint16_t gdefaultport = 8080;
using adduser_t = std::function<void(InetAddr& id)>;
using deluser_t = std::function<void(InetAddr& id)>;
using task_t = std::function<void()>;
using route_t = std::function<void(int sockfd, const std::string& msg)>;
class nocopy{
public:
nocopy(){}
~nocopy(){}
nocopy(const nocopy&) = delete; // 禁止拷贝构造函数
const nocopy& operator=(const nocopy&) = delete; // 禁止拷贝赋值运算符
};
class UdpServer : public nocopy{
public:
UdpServer(uint16_t localport = gdefaultport)
: _sockfd(gsockfd),
_localport(localport),
_isrunning(false)
{}
void InitServer(){
// 1. 创建套接字
// socket(int domain, int type, int protocol)
// 返回一个新的套接字文件描述符,或者在出错时返回-1
// 参数domain:协议族,AF_INET,表示IPv4协议族
// 参数type:套接字类型,SOCK_DGRAM,表示UDP套接字
// 参数protocol:协议,0,表示默认协议
_sockfd = ::socket(AF_INET, SOCK_DGRAM, 0);
if(_sockfd < 0){
LOG(LogLevel::FATAL) << "socket: " << strerror(errno);
// exit(SOCKET_ERR) 表示程序运行失败,并返回指定的错误码
exit(SOCKET_ERR);
}
LOG(LogLevel::DEBUG) << "socket success, sockfd is: " << _sockfd;
// 2. bind
// sockaddr_in
struct sockaddr_in local;
// 将local全部置零,以便后面设置
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET; // IPv4协议族
local.sin_port = htons(_localport); // 端口号,网络字节序
local.sin_addr.s_addr = htonl(INADDR_ANY); // 本地IP地址,网络字节序
// 将套接字绑定到本地地址
// bind(int sockfd, const struct sockaddr* addr, socklen_t addrlen)
// 绑定一个套接字到一个地址,使得套接字可以接收来自该地址的数据报
// 参数sockfd:套接字文件描述符
// 参数addr:指向sockaddr_in结构体的指针,表示要绑定的地址
// 参数addrlen:地址长度,即sizeof(sockaddr_in)
// 返回0表示成功,-1表示出错
int n = ::bind(_sockfd, (struct sockaddr* )&local, sizeof(local));
if(n < 0){
LOG(LogLevel::FATAL) << "bind: " << strerror(errno);
exit(BIND_ERR);
}
LOG(LogLevel::DEBUG) << "bind success";
}
// 注册聊天室服务
void RegisterService(adduser_t adduser, deluser_t deluser, route_t route){
_adduser = adduser;
_deluser = deluser;
_route = route;
}
void Start(){
_isrunning = true;
while(true){
char inbuffer[1024]; // 接收缓冲区
struct sockaddr_in peer; // 接收客户端地址
socklen_t peerlen = sizeof(peer); // 计算接收的客户端地址长度
// 接收数据报
// recvfrom(int sockfd, void* buf, size_t len, int flags, struct sockaddr* src_addr, socklen_t* addrlen)
// 从套接字接收数据,并存入buf指向的缓冲区中,返回实际接收的字节数
// 参数sockfd:套接字文件描述符
// 参数buf:指向接收缓冲区的指针,c_str()函数可以将字符串转换为char*,以便存入缓冲区
// 参数len:接收缓冲区的长度
// 参数flags:接收标志,一般设为0
// 参数src_addr:指向客户端地址的指针,若不为NULL,函数返回时,该指针指向客户端的地址,是网络字节序
// 参数addrlen:客户端地址长度的指针,若不为NULL,函数返回时,该指针指向实际的客户端地址长度
ssize_t n = ::recvfrom(_sockfd, inbuffer, sizeof(inbuffer) - 1, 0, CONV(&peer), &peerlen);
if(n > 0){
InetAddr cli(peer);
inbuffer[n] = 0;
std::string message;
if(strcmp(inbuffer, "QUIT") == 0){
// 删除用户
_deluser(cli);
message = cli.AddrStr() + "# " + "退出聊天室";
}
else{
// 新增用户
_adduser(cli);
message = cli.AddrStr() + "# " + inbuffer;
}
// 转发消息
task_t task = std::bind(UdpServer::_route, _sockfd, message);
ThreadPool<task_t>::getInstance()->Equeue(task);
}
}
}
~UdpServer(){
// 判断 _sockfd 是否是一个有效的套接字文件描述符
// 有效的文件描述符(如套接字、打开的文件等)是非负整数(>= 0)
if(_sockfd > -1) ::close(_sockfd);
}
private:
int _sockfd; // 文件描述符
uint16_t _localport; // 端口号
std::string _localip; // 本地IP地址
bool _isrunning; // 运行状态
adduser_t _adduser; // 添加用户回调函数
deluser_t _deluser; // 删除用户回调函数
route_t _route; // 路由回调函数
task_t _task; // 任务回调函数
};
7.2 UserServer.cpp
#include "UdpServer.hpp"
#include "User.hpp"
int main(int argc, char *argv[])
{
if(argc != 2){
std::cerr << "Usage: " << argv[0] << " localport" << std::endl;
Die(1);
}
uint16_t port = std::stoi(argv[1]);
ENABLE_CONSOLE_LOG(); // 日期类方法,使日志在控制台输出
std::unique_ptr<UdpServer> usvr = std::make_unique<UdpServer>(port);
std::shared_ptr<Route> route = std::make_shared<Route>();
usvr->RegisterService([&route](InetAddr& id){route->AddUser(id);},
[&route](InetAddr& id){route->DelUser(id);},
[&route](int sockfd, const std::string& msg){route->Router(sockfd, msg);}
);
usvr->InitServer(); // 初始化服务端
usvr->Start(); // 启动服务端
return 0;
}
7.3 UserClient.hpp
#pragma once
#include "Common.hpp"
#include <iostream>
#include <cstring>
#include <string>
#include <cstdlib>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
7.4 UserClient.cpp
#include "UdpClient.hpp"
int sockfd = -1;
struct sockaddr_in server;
void ClientQuit(int signo){
(void)signo;
const std::string msg = "QUIT";
int n = ::sendto(sockfd, msg.c_str(), msg.size(), 0, CONV(&server), sizeof(server));
exit(0);
}
// int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg);
// POSIX线程库的设计规范要求线程函数必须遵循特定的函数签名
// void *(*start_routine)(void *)
// 线程入口函数必须满足 void *(*)(void *) 的签名,即:
// 接受一个 void* 参数。
// 返回一个 void* 值。
void* Recver(void* args){
while(true){
(void)args; // 如果没有使用这个参数,会报错
struct sockaddr_in server;
socklen_t len = sizeof(server);
char buffer[1024];
int n = ::recvfrom(sockfd, buffer,sizeof(buffer) - 1, 0, CONV(&server), &len);
if(n > 0){
buffer[n] = 0;
std::cerr << buffer << std::endl;
}
}
}
int main(int argc, char* argv[]){
if(argc != 3){
std::cerr << argv[0] << " serverip server" << std::endl;
Die(USAGE_ERR);
}
// 注册信号退出函数
// 将信号 2 注册到 ClientQuit 函数
// 信号2 是 SIGINT,表示 Ctrl-C
signal(2, ClientQuit);
std::string serverip = argv[1];
uint16_t serverport = std::stoi(argv[2]);
// 1. 创建套接字
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if(sockfd < 0){
std::cerr << "create socket error" << std::endl;
Die(SOCKET_ERR);
}
// 1.1 填充 server 信息
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = ::htons(serverport);
server.sin_addr.s_addr = ::inet_addr(serverip.c_str());
// 1.2 创建线程
pthread_t tid;
pthread_create(&tid, nullptr, Recver, nullptr);
pthread_detach(tid);
// 1.3 启动的时候,给服务器推送消息
const std::string msg = "... 来了";
int n = ::sendto(sockfd, msg.c_str(), msg.size(), 0, CONV(&server), sizeof(server));
(void)n;
// 2. 发送数据
while(true){
std::cout << "Please Enter# ";
std::string msg;
std::getline(std::cin, msg);
// client 必须自己的ip和端口。但是客户端,不需要显示调用bind
// 客户端首次 sendto 消息的时候,由OS自动bind
// 1. 如何理解 client 自动随机bind端口号? 一个端口号,只能读一个进程bind
// 2. 如何理解 server 要显示地bind? 必须稳定!必须是众所周知且不能轻易改变的
int n = ::sendto(sockfd, msg.c_str(), msg.size(), 0, CONV(&server), sizeof(server));
(void)n;
}
return 0;
}
7.5 User.hpp
#pragma once
#include <iostream>
#include <string>
#include <list>
#include <memory>
#include <algorithm>
#include <sys/types.h>
#include <sys/socket.h>
#include "InetAddr.hpp"
#include "Log.hpp"
#include "Mutex.hpp"
// using namespace LockModule;
using namespace LogModule;
using task_t = std::function<void()>;
class UserInterface{
public:
virtual ~UserInterface() = default;
virtual void SendTo(int sockfd, const std::string& message) = 0;
virtual bool operator==(const InetAddr& user) = 0;
virtual std::string Id() = 0;
};
class User :public UserInterface{
public:
User(const InetAddr& id) : _id(id) {};
void SendTo(int sockfd, const std::string& message) override{
LOG(LogLevel::DEBUG) << "send message to " << _id.AddrStr() << " info: " << message;
int n = ::sendto(sockfd, message.c_str(), message.size(), 0, _id.NetAddr(), _id.NetAddrLen());
(void)n;
}
bool operator==(const InetAddr& user) override{
return _id == user;
}
std::string Id() override{
return _id.AddrStr();
}
~User(){}
private:
InetAddr _id;
};
class Route{
public:
Route(){}
void AddUser(InetAddr& id){
LockModule::LockGuard lockguard(_mutex);
for(auto& user : _online_user){
if(*user == id){
LOG(LogLevel::INFO) << id.AddrStr() << " 已经在线";
return;
}
}
// 到这里说明 在线用户中 不存在 新增用户
LOG(LogLevel::INFO) << " 新增该用户: " << id.AddrStr();
_online_user.push_back(std::make_shared<User>(id));
PrintUsers();
}
void DelUser(InetAddr& id){
LockModule::LockGuard lockguard(_mutex);
// 遍历容器,将 不满足删除条件 的元素移动到容器的前端,覆盖掉需要删除的元素
// 返回一个迭代器 pos,指向 保留元素的新逻辑结尾(即第一个需要删除的元素的位置)
auto pos = std::remove_if(_online_user.begin(), _online_user.end(),
[&id](const std::shared_ptr<UserInterface>& user){
return *user == id;
});
// 删除从 pos 到容器末尾的所有元素,调整容器大小。
_online_user.erase(pos, _online_user.end());
PrintUsers();
}
void Router(int sockfd, const std::string& message){
LockModule::LockGuard lockguard(_mutex);
for(auto& user : _online_user){
user->SendTo(sockfd, message);
}
}
void PrintUsers(){
for(auto& user : _online_user){
LOG(LogLevel::DEBUG) << "在线用户: " << user->Id();
}
}
~Route(){}
private:
std::list<std::shared_ptr<UserInterface>> _online_user;
LockModule::Mutex _mutex;
};
7.6 ThreadPool.hpp
#pragma once
#include <iostream>
#include <string>
#include <queue>
#include <vector>
#include <memory>
#include "Log.hpp"
#include "Mutex.hpp"
#include "Cond.hpp"
#include "Thread.hpp"
namespace ThreadPoolModule
{
using namespace LogModule;
using namespace ThreadModule;
using namespace LockModule;
using namespace CondModule;
// 用来做测试的线程方法
void DefaultTest()
{
while (true)
{
LOG(LogLevel::DEBUG) << "我是一个测试方法";
sleep(1);
}
}
using thread_t = std::shared_ptr<Thread>;
const static int defaultnum = 5;
template <typename T>
class ThreadPool
{
private:
bool IsEmpty() { return _taskq.empty(); }
void HandlerTask(std::string name)
{
LOG(LogLevel::INFO) << "线程: " << name << ", 进入HandlerTask的逻辑";
while (true)
{
// 1. 拿任务
T t;
{
LockGuard lockguard(_lock);
while (IsEmpty() && _isrunning)
{
_wait_num++;
_cond.Wait(_lock);
_wait_num--;
}
// 2. 任务队列为空 && 线程池退出了
if (IsEmpty() && !_isrunning)
break;
t = _taskq.front();
_taskq.pop();
}
// 2. 处理任务
t(); // 规定,未来所有的任务处理,全部都是必须提供()方法!
}
LOG(LogLevel::INFO) << "线程: " << name << " 退出";
}
ThreadPool(const ThreadPool<T> &) = delete;
ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
ThreadPool(int num = defaultnum) : _num(num), _wait_num(0), _isrunning(false)
{
for (int i = 0; i < _num; i++)
{
_threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1)));
LOG(LogLevel::INFO) << "构建线程" << _threads.back()->Name() << "对象 ... 成功";
}
}
public:
static ThreadPool<T> *getInstance()
{
if (instance == NULL)
{
LockGuard lockguard(mutex);
if (instance == NULL)
{
LOG(LogLevel::INFO) << "单例首次被执行,需要加载对象...";
instance = new ThreadPool<T>();
instance->Start();
}
}
return instance;
}
void Equeue(T &in)
{
LockGuard lockguard(_lock);
if (!_isrunning)
return;
// _taskq.push(std::move(in));
_taskq.push(in);
if (_wait_num > 0)
_cond.Notify();
}
void Start()
{
if (_isrunning)
return;
_isrunning = true; // bug fix??
for (auto &thread_ptr : _threads)
{
LOG(LogLevel::INFO) << "启动线程" << thread_ptr->Name() << " ... 成功";
thread_ptr->Start();
}
}
void Wait()
{
for (auto &thread_ptr : _threads)
{
thread_ptr->Join();
LOG(LogLevel::INFO) << "回收线程" << thread_ptr->Name() << " ... 成功";
}
}
void Stop()
{
LockGuard lockguard(_lock);
if (_isrunning)
{
// 3. 不能在入任务了
_isrunning = false; // 不工作
// 1. 让线程自己退出(要唤醒) && // 2. 历史的任务被处理完了
if (_wait_num > 0)
_cond.NotifyAll();
}
}
~ThreadPool()
{
}
private:
std::vector<thread_t> _threads;
int _num;
int _wait_num;
std::queue<T> _taskq; // 临界资源
Mutex _lock;
Cond _cond;
bool _isrunning;
static ThreadPool<T> *instance;
static Mutex mutex; // 只用来保护单例
};
template <typename T>
ThreadPool<T> *ThreadPool<T>::instance = NULL;
template <typename T>
Mutex ThreadPool<T>::mutex; // 只用来保护单例
}
7.7 Thread.hpp
#ifndef _THREAD_HPP__
#define _THREAD_HPP__
#include <iostream>
#include <string>
#include <pthread.h>
#include <functional>
#include <sys/types.h>
#include <unistd.h>
// v1
namespace ThreadModule
{
using func_t = std::function<void(std::string name)>;
static int number = 1;
enum class TSTATUS
{
NEW,
RUNNING,
STOP
};
class Thread
{
private:
// 成员方法!
static void *Routine(void *args)
{
Thread *t = static_cast<Thread *>(args);
t->_status = TSTATUS::RUNNING;
t->_func(t->Name());
return nullptr;
}
void EnableDetach() { _joinable = false; }
public:
Thread(func_t func) : _func(func), _status(TSTATUS::NEW), _joinable(true)
{
_name = "Thread-" + std::to_string(number++);
_pid = getpid();
}
bool Start()
{
if (_status != TSTATUS::RUNNING)
{
int n = ::pthread_create(&_tid, nullptr, Routine, this); // TODO
if (n != 0)
return false;
return true;
}
return false;
}
bool Stop()
{
if (_status == TSTATUS::RUNNING)
{
int n = ::pthread_cancel(_tid);
if (n != 0)
return false;
_status = TSTATUS::STOP;
return true;
}
return false;
}
bool Join()
{
if (_joinable)
{
int n = ::pthread_join(_tid, nullptr);
if (n != 0)
return false;
_status = TSTATUS::STOP;
return true;
}
return false;
}
void Detach()
{
EnableDetach();
pthread_detach(_tid);
}
bool IsJoinable() { return _joinable; }
std::string Name() {return _name;}
~Thread()
{
}
private:
std::string _name;
pthread_t _tid;
pid_t _pid;
bool _joinable; // 是否是分离的,默认不是
func_t _func;
TSTATUS _status;
};
}
// v2
// namespace ThreadModule
// {
// static int number = 1;
// enum class TSTATUS
// {
// NEW,
// RUNNING,
// STOP
// };
// template <typename T>
// class Thread
// {
// using func_t = std::function<void(T)>;
// private:
// // 成员方法!
// static void *Routine(void *args)
// {
// Thread<T> *t = static_cast<Thread<T> *>(args);
// t->_status = TSTATUS::RUNNING;
// t->_func(t->_data);
// return nullptr;
// }
// void EnableDetach() { _joinable = false; }
// public:
// Thread(func_t func, T data) : _func(func), _data(data), _status(TSTATUS::NEW), _joinable(true)
// {
// _name = "Thread-" + std::to_string(number++);
// _pid = getpid();
// }
// bool Start()
// {
// if (_status != TSTATUS::RUNNING)
// {
// int n = ::pthread_create(&_tid, nullptr, Routine, this); // TODO
// if (n != 0)
// return false;
// return true;
// }
// return false;
// }
// bool Stop()
// {
// if (_status == TSTATUS::RUNNING)
// {
// int n = ::pthread_cancel(_tid);
// if (n != 0)
// return false;
// _status = TSTATUS::STOP;
// return true;
// }
// return false;
// }
// bool Join()
// {
// if (_joinable)
// {
// int n = ::pthread_join(_tid, nullptr);
// if (n != 0)
// return false;
// _status = TSTATUS::STOP;
// return true;
// }
// return false;
// }
// void Detach()
// {
// EnableDetach();
// pthread_detach(_tid);
// }
// bool IsJoinable() { return _joinable; }
// std::string Name() { return _name; }
// ~Thread()
// {
// }
// private:
// std::string _name;
// pthread_t _tid;
// pid_t _pid;
// bool _joinable; // 是否是分离的,默认不是
// func_t _func;
// TSTATUS _status;
// T _data;
// };
// }
#endif
7.9 Mutex.hpp
#pragma once
#include <iostream>
#include <pthread.h>
namespace LockModule
{
class Mutex
{
public:
Mutex(const Mutex&) = delete;
const Mutex& operator = (const Mutex&) = delete;
Mutex()
{
int n = ::pthread_mutex_init(&_lock, nullptr);
(void)n;
}
~Mutex()
{
int n = ::pthread_mutex_destroy(&_lock);
(void)n;
}
void Lock()
{
int n = ::pthread_mutex_lock(&_lock);
(void)n;
}
pthread_mutex_t *LockPtr()
{
return &_lock;
}
void Unlock()
{
int n = ::pthread_mutex_unlock(&_lock);
(void)n;
}
private:
pthread_mutex_t _lock;
};
class LockGuard
{
public:
LockGuard(Mutex &mtx):_mtx(mtx)
{
_mtx.Lock();
}
~LockGuard()
{
_mtx.Unlock();
}
private:
Mutex &_mtx;
};
}
7.10 Log.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <string>
#include <fstream>
#include <sstream>
#include <memory>
#include <filesystem> //C++17
#include <unistd.h>
#include <time.h>
#include "Mutex.hpp"
namespace LogModule
{
using namespace LockModule;
// 获取一下当前系统的时间
std::string CurrentTime()
{
time_t time_stamp = ::time(nullptr);
struct tm curr;
localtime_r(&time_stamp, &curr); // 时间戳,获取可读性较强的时间信息5
char buffer[1024];
// bug
snprintf(buffer, sizeof(buffer), "%4d-%02d-%02d %02d:%02d:%02d",
curr.tm_year + 1900,
curr.tm_mon + 1,
curr.tm_mday,
curr.tm_hour,
curr.tm_min,
curr.tm_sec);
return buffer;
}
// 构成: 1. 构建日志字符串 2. 刷新落盘(screen, file)
// 1. 日志文件的默认路径和文件名
const std::string defaultlogpath = "./log/";
const std::string defaultlogname = "log.txt";
// 2. 日志等级
enum class LogLevel
{
DEBUG = 1,
INFO,
WARNING,
ERROR,
FATAL
};
std::string Level2String(LogLevel level)
{
switch (level)
{
case LogLevel::DEBUG:
return "DEBUG";
case LogLevel::INFO:
return "INFO";
case LogLevel::WARNING:
return "WARNING";
case LogLevel::ERROR:
return "ERROR";
case LogLevel::FATAL:
return "FATAL";
default:
return "None";
}
}
// 3. 刷新策略.
class LogStrategy
{
public:
virtual ~LogStrategy() = default;
virtual void SyncLog(const std::string &message) = 0;
};
// 3.1 控制台策略
class ConsoleLogStrategy : public LogStrategy
{
public:
ConsoleLogStrategy()
{
}
~ConsoleLogStrategy()
{
}
void SyncLog(const std::string &message)
{
LockGuard lockguard(_lock);
std::cout << message << std::endl;
}
private:
Mutex _lock;
};
// 3.2 文件级(磁盘)策略
class FileLogStrategy : public LogStrategy
{
public:
FileLogStrategy(const std::string &logpath = defaultlogpath, const std::string &logname = defaultlogname)
: _logpath(logpath),
_logname(logname)
{
// 确认_logpath是存在的.
LockGuard lockguard(_lock);
if (std::filesystem::exists(_logpath))
{
return;
}
try
{
std::filesystem::create_directories(_logpath);
}
catch (std::filesystem::filesystem_error &e)
{
std::cerr << e.what() << "\n";
}
}
~FileLogStrategy()
{
}
void SyncLog(const std::string &message)
{
LockGuard lockguard(_lock);
std::string log = _logpath + _logname; // ./log/log.txt
std::ofstream out(log, std::ios::app); // 日志写入,一定是追加
if (!out.is_open())
{
return;
}
out << message << "\n";
out.close();
}
private:
std::string _logpath;
std::string _logname;
// 锁
Mutex _lock;
};
// 日志类: 构建日志字符串, 根据策略,进行刷新
class Logger
{
public:
Logger()
{
// 默认采用ConsoleLogStrategy策略
_strategy = std::make_shared<ConsoleLogStrategy>();
}
void EnableConsoleLog()
{
_strategy = std::make_shared<ConsoleLogStrategy>();
}
void EnableFileLog()
{
_strategy = std::make_shared<FileLogStrategy>();
}
~Logger() {}
// 一条完整的信息: [2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [16] + 日志的可变部分(<< "hello world" << 3.14 << a << b;)
class LogMessage
{
public:
LogMessage(LogLevel level, const std::string &filename, int line, Logger &logger)
: _currtime(CurrentTime()),
_level(level),
_pid(::getpid()),
_filename(filename),
_line(line),
_logger(logger)
{
std::stringstream ssbuffer;
ssbuffer << "[" << _currtime << "] "
<< "[" << Level2String(_level) << "] "
<< "[" << _pid << "] "
<< "[" << _filename << "] "
<< "[" << _line << "] - ";
_loginfo = ssbuffer.str();
}
template <typename T>
LogMessage &operator<<(const T &info)
{
std::stringstream ss;
ss << info;
_loginfo += ss.str();
return *this;
}
~LogMessage()
{
if (_logger._strategy)
{
_logger._strategy->SyncLog(_loginfo);
}
}
private:
std::string _currtime; // 当前日志的时间
LogLevel _level; // 日志等级
pid_t _pid; // 进程pid
std::string _filename; // 源文件名称
int _line; // 日志所在的行号
Logger &_logger; // 负责根据不同的策略进行刷新
std::string _loginfo; // 一条完整的日志记录
};
// 就是要拷贝,故意的拷贝
LogMessage operator()(LogLevel level, const std::string &filename, int line)
{
return LogMessage(level, filename, line, *this);
}
private:
std::shared_ptr<LogStrategy> _strategy; // 日志刷新的策略方案
};
Logger logger;
#define LOG(Level) logger(Level, __FILE__, __LINE__)
#define ENABLE_CONSOLE_LOG() logger.EnableConsoleLog()
#define ENABLE_FILE_LOG() logger.EnableFileLog()
}
7.11 Cond.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"
namespace CondModule
{
using namespace LockModule;
class Cond
{
public:
Cond()
{
int n = ::pthread_cond_init(&_cond, nullptr);
(void)n;
}
void Wait(Mutex &lock) // 让我们的线程释放曾经持有的锁!
{
int n = ::pthread_cond_wait(&_cond, lock.LockPtr());
}
void Notify()
{
int n = ::pthread_cond_signal(&_cond);
(void)n;
}
void NotifyAll()
{
int n = ::pthread_cond_broadcast(&_cond);
(void)n;
}
~Cond()
{
int n = ::pthread_cond_destroy(&_cond);
}
private:
pthread_cond_t _cond;
};
}
7.12 Common.hpp
#pragma once
#include <iostream>
#define Die(code) \
do \
{ \
exit(code); \
} while (0)
#define CONV(v) (struct sockaddr *)(v)
enum
{
USAGE_ERR = 1,
SOCKET_ERR,
BIND_ERR
};
7.13 InetAddr.hpp
#pragma once
#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Common.hpp"
class InetAddr
{
private:
void PortNet2Host()
{
_port = ::ntohs(_net_addr.sin_port);
}
void IpNet2Host()
{
char ipbuffer[64];
const char *ip = ::inet_ntop(AF_INET, &_net_addr.sin_addr, ipbuffer, sizeof(ipbuffer));
(void)ip;
_ip = ipbuffer;
}
public:
InetAddr() {}
// 通过网络字节序地址构造主机字节序地址
InetAddr(const struct sockaddr_in &addr) : _net_addr(addr){
PortNet2Host();
IpNet2Host();
}
bool operator==(const InetAddr& addr){
return _ip == addr._ip && _port == addr._port;
}
// 创建一个绑定到指定端口(主机字节序)的 IPv4 地址对象,默认监听所有本地网络接口
InetAddr(uint16_t port) : _port(port), _ip(""){
_net_addr.sin_family = AF_INET;
_net_addr.sin_port = htons(_port);
_net_addr.sin_addr.s_addr = INADDR_ANY;
}
// 主机字节序转网络字节序
struct sockaddr *NetAddr() { return CONV(&_net_addr); }
// 网络字节序地址长度
socklen_t NetAddrLen() { return sizeof(_net_addr); }
// 主机字节序 ip 地址
std::string Ip() { return _ip; }
// 主机字节序端口号
uint16_t Port() { return _port; }
// 字符串形式的主机字节序地址 IP + 端口号
std::string AddrStr() { return _ip + ":" + std::to_string(_port); }
// 析构
~InetAddr(){}
private:
struct sockaddr_in _net_addr;
std::string _ip;
uint16_t _port;
};
7.14 Makefile
.PHONY: all
all:server_udp client_udp
server_udp:UdpServer.cpp
g++ -o $@ $^ -std=c++17 -lpthread
client_udp:UdpClient.cpp
g++ -o $@ $^ -std=c++17 -lpthread
.PHONY: clean
clean:
rm -f server_udp client_udp
👥总结
本篇博文对 【Linux网络】构建基于UDP的简单聊天室系统 做了一个较为详细的介绍,不知道对你有没有帮助呢
觉得博主写得还不错的三连支持下吧!会继续努力的~