文章目录
- Muduo
- Muduo库是什么
- Moduo 库的原理
- Muduo 库常见接口
- TcpServer类
- EventLoop类
- TcpConnection类
- TcpClient类
- Buffer类
- Muduo库实现一个简单英译汉服务器和客⼾端
Muduo
Muduo库是什么
Muduo由陈硕⼤佬开发,是⼀个基于⾮阻塞IO和事件驱动的C++⾼并发TCP⽹络编程库。它是⼀款基于主从Reactor模型的⽹络库,其使⽤的线程模型是one loop per thread,所谓one loop per thread指的是:
- ⼀个线程只能有⼀个事件循环(EventLoop),⽤于响应计时器和IO事件
- ⼀个⽂件描述符只能由⼀个线程进⾏读写,换句话说就是⼀个TCP连接必须归属于某个EventLoop管理
Moduo 库的原理
多路转接模型(如select或poll)在连接数量过多时,可能会导致性能下降,特别是当所有连接都需要频繁轮询时。
由于所有连接都在一个事件循环中处理,处理时间较长的连接可能会影响到其他连接的响应时间,从而导致一些连接得不到及时响应。
为了应对这种情况,可以采用更高效的模型,
例如使用主从(父子)Reactor模式。
主Reactor负责监听所有连接请求,并将这些请求分发给多个子Reactor。每个子Reactor处理一部分连接,从而将负载分散,提升整体响应效率。这种方式可以避免单个Reactor处理过多连接带来的性能瓶颈。
这种架构通常用于高并发场景下,以提高系统的伸缩性和响应速度。
Muduo 库常见接口
TcpServer类
TcpServer 类负责管理网络服务端的行为,包括启动服务器、设置网络连接的回调函数以及消息处理的回调函数。这样可以灵活地定义在不同网络事件发生时执行的具体操作,例如接受新连接、连接断开或数据接收等。
下面来认识一下 TcpServer 的主要接口:
typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
typedef std::function<void (const TcpConnectionPtr&)> ConnectionCallback;
typedef std::function<void (const TcpConnectionPtr&,
Buffer*,
Timestamp)> MessageCallback;
class InetAddress : public muduo::copyable
{
public:
InetAddress(StringArg ip, uint16_t port, bool ipv6 = false);
};
class TcpServer : noncopyable
{
public:
enum Option
{
kNoReusePort,
kReusePort,
};
TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option = kNoReusePort);
void setThreadNum(int numThreads);
void start();
/// 当⼀个新连接建⽴成功的时候被调⽤
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }
/// 消息的业务处理回调函数---这是收到新连接消息的时候被调⽤的函数
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }
};
EventLoop类
EventLoop 类是事件循环的核心,负责启动和管理事件监控循环。通过这个循环,服务器可以非阻塞地监听和响应网络事件(如新的连接请求、数据到达等)。它还支持通过定时任务来调度将来某一时刻执行的任务,这对于需要定时检查或更新状态的应用非常有用。
class EventLoop
{
void loop();// 开始事件监控循环
void quit();// 停止循环
Timerld runAfter(delay, cb);// 定时任务
};
TcpConnection类
TcpConnection 类管理着每个网络连接的状态,提供发送数据、检查连接状态和关闭连接的功能。它的设计使得每个连接可以独立地处理其生命周期内的各种事件。
class TcpConnection
{
TcpClient(EventLoop* loop, const InetAddress& serverAddr, const string& nameArg);
void send(std::string &msg);// 发送数据
bool connected();// 当前连接是否连接正常
void shutdown();// 关闭连接
};
TcpClient类
TcpClient 是 Muduo 库中用于处理 TCP 客户端连接的类,它通过 EventLoop 来监控和处理IO事件。用户可以使用 connect() 和 disconnect() 控制连接,并通过 setConnectionCallback() 和 setMessageCallback() 设置回调函数,处理连接状态的变更和消息的接收。
class TcpClient : noncopyable
{
public:
// TcpClient(EventLoop* loop);
// TcpClient(EventLoop* loop, const string& host, uint16_t port);
TcpClient(EventLoop* loop,
const InetAddress& serverAddr,
const string& nameArg);
~TcpClient(); // force out-line dtor, for std::unique_ptr members.
void connect();//连接服务器
void disconnect();//关闭连接
void stop();
//获取客⼾端对应的通信连接Connection对象的接⼝,发起connect后,有可能还没有连接建⽴成功
TcpConnectionPtr connection() const
{
MutexLockGuard lock(mutex_);
return connection_;
}
/// 连接服务器成功时的回调函数
void setConnectionCallback(ConnectionCallback cb)
{ connectionCallback_ = std::move(cb); }
/// 收到服务器发送的消息时的回调函数
void setMessageCallback(MessageCallback cb)
{ messageCallback_ = std::move(cb); }
private:
EventLoop* loop_;
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;
TcpConnectionPtr connection_ GUARDED_BY(mutex_);
};
/*
需要注意的是,因为muduo库不管是服务端还是客⼾端都是异步操作,
对于客⼾端来说如果我们在连接还没有完全建⽴成功的时候发送数据,这是不被允许的。
因此我们可以使⽤内置的CountDownLatch类进⾏同步控制
*/
class CountDownLatch : noncopyable
{
public:
explicit CountDownLatch(int count);
void wait(){
MutexLockGuard lock(mutex_);
while (count_ > 0)
{
condition_.wait();
}
}
void countDown(){
MutexLockGuard lock(mutex_);
--count_;
if (count_ == 0)
{
condition_.notifyAll();
}
}
int getCount() const;
private:
mutable MutexLock mutex_;
Condition condition_ GUARDED_BY(mutex_);
int count_ GUARDED_BY(mutex_);
}
Buffer类
Buffer 类用于高效地管理数据缓冲区。它提供一系列操作来读取和处理存储在缓冲区中的数据,支持网络字节序的转换和数据的基本处理,如读取、检索和删除操作。这是处理TCP流数据的关键部分,因为网络数据可以随机到达,并且可能需要积累足够的数据才能进行处理。
class Buffer
{
size_t readableBytes() const;// 获取缓冲区大小
const char* peek() const;// 获取缓冲区中数据的起始地址
int32_t peekInt32() const;// 尝试从缓冲区获取4字节数据,进行网络字节序转换为整型,但不从缓冲区删除
void retrieveInt32();// 数据读取位置向后偏移4字节(本质就是删除起始位置的4字节数据)
int32_t readInt32();
string retrieveAllAsString();// 从缓冲区取出所有数据并删除,以string形式返回
string retrieveAsString(size_t len);// 从缓冲区读取len长度的数据并删除,以string形式返回
};
Muduo库实现一个简单英译汉服务器和客⼾端
muduo库服务器编程流程:
1、组合TcpServer对象;
2、创建EventLoop事件循环对象的指针,可以向loop上注册感兴趣的事件,相应事件发生loop会上报给我们;
3、明确TcpServer构造函数需要的参数,输出服务器对应类的构造函数;
TcpServer(EventLoop* loop, //事件循环
const InetAddress& listenAddr, //绑定IP地址 + 端口号
const string& nameArg, //TcpServer服务器名字
Option option = kNoReusePort); //tcp协议选项
4、在当前服务器类的构造函数中,注册处理连接断开的回调函数和处理读写事件的回调函数主要通过下面两个函数回调实现;
void setConnectionCallback(const ConnectionCallback& cb) //链接的创建与断开
{ connectionCallback_ = cb; }
void setMessageCallback(const MessageCallback& cb) //消息读写事件
{ messageCallback_ = cb; }
5、设置合适的服务器端线程数量,muduo会自动分配I/O线程与工作线程;
6、开启事件循环start();
muduo服务器端编程代码如下:
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
#include <iostream>
#include <functional>
#include <unordered_map>
class TranslateServer {
public:
TranslateServer(int port):_server(&_baseloop,
muduo::net::InetAddress("0.0.0.0", port),
"TranslateServer", muduo::net::TcpServer::kReusePort){
//将我们的类成员函数,设置为服务器的回调处理函数
// std::bind 是一个函数适配器函数,对指定的函数进行参数绑定
_server.setConnectionCallback(std::bind(&TranslateServer::onConnection, this, std::placeholders::_1));
_server.setMessageCallback(std::bind(&TranslateServer::onMessage, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
}
//启动服务器
void start() {
_server.start();//开始事件监听
_baseloop.loop();//开始事件监控,这是一个死循环阻塞接口
}
private:
//onConnection,应该是在一个连接,建立成功,以及关闭的时候被调用
void onConnection(const muduo::net::TcpConnectionPtr&conn){
//新连接建立成功时的回调函数
if (conn->connected() == true) {
std::cout << "新连接建立成功!\n";
}else {
std::cout << "新连接关闭!\n";
}
}
std::string translate(const std::string &str) {
static std::unordered_map<std::string, std::string> dict_map = {
{"hello", "你好"},
{"Hello", "你好"},
{"你好", "Hello"},
{"吃了吗", "油泼面"}
};
auto it = dict_map.find(str);
if (it == dict_map.end()) {
return "没听懂!!";
}
return it->second;
}
void onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buf, muduo::Timestamp) {
//通信连接收到请求时的回调函数
//1. 从buf中把请求的数据取出来
std::string str = buf->retrieveAllAsString();
//2. 调用translate接口进行翻译
std::string resp = translate(str);
//3. 对客户端进行响应结果
conn->send(resp);
}
private:
//_baseloop是epoll的事件监控,会进行描述符的事件监控,触发事件后进行io操作
muduo::net::EventLoop _baseloop;
//这个server对象,主要用于设置回调函数,用于告诉服务器收到什么请求该如何处理
muduo::net::TcpServer _server;
};
int main()
{
TranslateServer server(8085);
server.start();
return 0;
}
muduo客户端端编程代码如下:
#include <muduo/net/TcpClient.h>
#include <muduo/net/EventLoopThread.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/base/CountDownLatch.h>
#include <iostream>
#include <functional>
class TranslateClient {
public:
TranslateClient(const std::string &sip, int sport):_latch(1),
_client(_loopthread.startLoop(), muduo::net::InetAddress(sip, sport), "TranslateClient"){
_client.setConnectionCallback(std::bind(&TranslateClient::onConnection, this, std::placeholders::_1));
_client.setMessageCallback(std::bind(&TranslateClient::onMessage, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
}
//连接服务器---需要阻塞等待连接建立成功之后再返回
void connect() {
_client.connect();
_latch.wait();//阻塞等待,直到连接建立成功
}
bool send(const std::string &msg) {
if (_conn->connected()) {//连接状态正常,再发送,否则就返回false
_conn->send(msg);
return true;
}
return false;
}
private:
//连接建立成功时候的回调函数,连接建立成功后,唤醒上边的阻塞
void onConnection(const muduo::net::TcpConnectionPtr&conn){
if (conn->connected()) {
_latch.countDown();//唤醒主线程中的阻塞
_conn = conn;
}else {
//连接关闭时的操作
_conn.reset();
}
}
//收到消息时候的回调函数
void onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buf, muduo::Timestamp) {
std::cout << "翻译结果:" << buf->retrieveAllAsString() << std::endl;
}
private:
muduo::CountDownLatch _latch;
muduo::net::EventLoopThread _loopthread;
muduo::net::TcpClient _client;
muduo::net::TcpConnectionPtr _conn;
};
int main()
{
TranslateClient client("127.0.0.1", 8085);
client.connect();
while(1) {
std::string buf;
std::cin >> buf;
client.send(buf);
}
return 0;
}
运行结果: