前文
重写Muduo库实现核心模块的Git仓库
注:本文将重点剖析 Muduo
网络库的核心框架,深入探讨作者精妙的代码设计思路,并针对核心代码部分进行重写,将原本依赖 boost
的实现替换为原生的 C++11 语法。需要说明的是,本文并不打算对整个 Muduo
库进行完整的重写。Muduo库源码链接
在上文中,我们对Muduo
网络库的核心网络模块中的Socket
、InetAddress
以及Acceptor
进行了解析。这节我们将对剩余的核心网络模块中的TcpConnection
以及TcpServer
进行解析。
TcpConnection
在 Muduo 网络库 中,TcpConnection
是一个非常重要的类,主要用于表示并管理一个 TCP 连接。它抽象了应用层和网络层之间的交互,负责处理一个具体的 TCP 连接的生命周期以及数据的发送和接收。
TcpConnection
的主要作用
-
抽象 TCP 连接
TcpConnection
表示一个具体的 TCP 连接,隐藏了底层的 socket 描述符和 epoll 等细节,使得用户只需要关注逻辑层面。- 每一个客户端连接都会对应一个
TcpConnection
实例。
-
管理 TCP 连接的生命周期
- 包括连接的建立、数据的收发、连接的关闭等。
- 在连接的不同阶段会触发对应的回调(callback),如连接建立回调、消息回调和关闭回调。
-
提供高效的异步 I/O
- 通过事件驱动模型,结合
Channel
和EventLoop
,实现异步非阻塞的 I/O 操作。
- 通过事件驱动模型,结合
-
数据缓冲
- 提供输入缓冲区和输出缓冲区(
Buffer
),用于存储接收和发送的数据。
- 提供输入缓冲区和输出缓冲区(
-
支持用户自定义回调
- 用户可以设置各种回调函数,比如连接建立的回调(
ConnectionCallback
)、消息到来的回调(MessageCallback
)、写完成回调等。
- 用户可以设置各种回调函数,比如连接建立的回调(
-
线程安全
TcpConnection
的大部分操作是线程安全的,支持跨线程调用,比如关闭连接时可以跨线程调用shutdown
。
TcpConnection
的核心功能
- 连接管理
- 提供方法来开启和关闭连接(
connectEstablished()
和connectDestroyed()
)。 - 判断连接状态(
isConnected()
等)。
- 提供方法来开启和关闭连接(
- 数据传输
- 接收数据:通过
MessageCallback
回调函数处理接收到的数据。 - 发送数据:提供
send()
方法,用于发送字符串或二进制数据。发送过程是非阻塞的,数据会先存入输出缓冲区。
- 接收数据:通过
- 回调设置
- 支持用户设置各种回调函数,如:
ConnectionCallback
:连接状态变化时的回调。MessageCallback
:收到数据时的回调。WriteCompleteCallback
:数据发送完毕时的回调。CloseCallback
:连接关闭时的回调。
- 支持用户设置各种回调函数,如:
- 与事件循环集成
- 每个
TcpConnection
实例绑定一个EventLoop
,并通过Channel
监听和处理其对应 socket 的事件(如可读、可写等)。
- 每个
类图如下:
类的关键成员变量和方法
- 主要成员变量
EventLoop* loop_
:所属的事件循环StateE state_
:表示连接的状态(如连接中、已连接、正在关闭、未连接)unqiue_ptr<Socket> socket_
:表示该TCP连接的socketunique_ptr<Channel> channel_
:表示该套接字描述的channelInetAddress localAddr_
:本地的IP和端口InetAddress peerAddr_
:对端的IP和端口- 输入缓冲区和输出缓冲区
- 一系列回调函数:
connectionCallback_
、messageCallback_
等
- 主要方法
void send(const std::string& message)
:发送数据。void shutdown()
:关闭连接的写端。void connectEstablished()
:在连接建立后被调用,初始化连接。void connectDestroyed()
:在连接关闭后被调用,清理资源。- 回调设置方法:
setConnectionCallback()
、setMessageCallback()
等。
TcpConnection.h
class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection>
{
public:
TcpConnection(EventLoop* loop,
const std::string& name,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr);
~TcpConnection();
EventLoop* getLoop() const { return loop_; }
const std::string& name() const { return name_; }
const InetAddress& localAddress() const { return localAddr_; }
const InetAddress& peerAddress() const { return peerAddr_; }
bool conncted() const { return state_ == StateE::kConnected; }
// 发送数据
void send(const std::string& buf);
// 关闭连接
void shutdown();
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }
void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{ writeCompleteCallback_ = cb; }
void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
{ highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }
void setCloseCallback(const CloseCallback& cb)
{ closeCallback_ = cb; }
// 建立连接
void connectEstablished();
// 销毁连接
void connectDestroyed();
private:
enum StateE
{
kDisconnected, // 已关闭连接
kConnecting, // 正在连接
kConnected, // 已连接
kDisconnecting // 正在关闭连接
};
void handleRead(Timestamp receiveTime);
void handleWrite();
void handleClose();
void handleError();
void sendInLoop(const char* message, size_t len);
void shutdownInLoop();
void setState(StateE state) { state_ = state; }
EventLoop* loop_;
const std::string name_;
std::atomic_int state_;
bool reading_;
std::unique_ptr<Socket> socket_;
std::unique_ptr<Channel> channel_;
const InetAddress localAddr_;
const InetAddress peerAddr_;
ConnectionCallback connectionCallback_; // 有新连接的回调
MessageCallback messageCallback_; // 有读写消息的回调
WriteCompleteCallback writeCompleteCallback_; // 消息发送完成以后的回调
CloseCallback closeCallback_;
HighWaterMarkCallback highWaterMarkCallback_; // 高水位回调
// 高水位的值
size_t highWaterMark_;
Buffer inputBuffer_;
Buffer outputBuffer_;
};
TcpConnection.cc
构造函数
TcpConnection
表示并管理一个 TCP 连接,在事件就绪时,会自动调用用户注册的回调函数。与此相对应,channel_
中负责注册和管理这些用户定义的回调函数。
static EventLoop* CheckLoopNotNull(EventLoop* loop)
{
if(loop == nullptr)
{
LOG_FATAL("%s:%s:%d TcpConnection is null!", __FILE__, __FUNCTION__, __LINE__);
}
return loop;
}
TcpConnection::TcpConnection(EventLoop *loop, std::string const &name,
int sockfd, InetAddress const &localAddr,
InetAddress const &peerAddr) :
loop_(CheckLoopNotNull(loop)),
name_(name),
state_(StateE::kConnecting),
reading_(true),
socket_(new Socket(sockfd)),
channel_(new Channel(loop_, sockfd)),
localAddr_(localAddr),
peerAddr_(peerAddr),
highWaterMark_(64*1024*1024)
{
channel_->setReadCallback(
std::bind(&TcpConnection::handleRead, this, std::placeholders::_1)
);
channel_->setWriteCallback(
std::bind(&TcpConnection::handleWrite, this)
);
channel_->setCloseCallback(
std::bind(&TcpConnection::handleClose, this)
);
channel_->setErrorCallback(
std::bind(&TcpConnection::handleError, this)
);
LOG_INFO("TcpConnection::ctor[%s] at fd=%d", name_.c_str(), sockfd);
socket_->setKeepAlive(true);
}
新连接
在接受到新的客户端连接后,构造了一个新的TcpConnection
,紧接着会执行一些后续操作(设置状态等),就是在MainLoop
中执行TcpConnection::connectEstablished
。在构造函数中,状态设置为正在连接,在connectEstablish
设置为已连接,后续可以注册读写事件了。
// 建立连接
void TcpConnection::connectEstablished()
{
setState(kConnected);
channel_->tie(shared_from_this()); // 将TcpConnection绑定到Channel上
channel_->enableReading(); // 向Poller注册EPOLLIN事件
// 新连接建立,执行回调
connectionCallback_(shared_from_this());
}
connectionCallback_
为用户注册的回调函数。
读事件
Poller
在监听到读事件就绪后,会将活跃的 Channel
集合返回给 EventLoop
,即 activeChannels
。随后,EventLoop
遍历 activeChannels
中的每一个 Channel
,并调用其对应的就绪事件回调函数。理解了前面这段话,我们可以思考谁把各种回调函数注册到了Channel
?
根据
TcpConnection
的构造函数,我们可以得出结论:TcpConnection
负责管理一个 TCP 连接的完整生命周期,而Channel
则负责管理已连接后 socket 的各种回调函数、感兴趣的事件以及就绪事件等。因此,为了实现这一管理职责,TcpConnection
需要负责Channel
的生命周期管理;并在构造函数中,调用
Channel
的公有接口注册回调函数。
注册用于处理读事件的成员函数是 TcpConnection::handleRead
,这意味着当读事件就绪时,会自动调用该函数。第一步先读数据,若数据已成功读出则执行messageCallback_()
,messageCallback_()
其实就是我们作为用户利用TcpServer::setMessageCallback
注册的回调函数,在此函数就负责业务逻辑处理,将读数据和业务处理解耦。
void TcpConnection::handleRead(Timestamp receiveTime)
{
int saveErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &saveErrno);
if(n > 0)
{
// 已建立连接的用户,有可读事件发生
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if(n == 0)
{
// 客户端断开连接
handleClose();
}
else
{
errno = saveErrno;
LOG_ERROR("TcpConnection::handleRead");
handleError();
}
}
写事件
作为用户,我们希望只调用简单的接口就可以实现将数据发送出去,而不用关心其内部细节。在Muduo
中提供了这样简单的接口:void TcpConnection::send(std::string const &msg)
其内部帮我们实现了线程安全:
- 若调用
send
的线程与loop_
所属的线程相同,则直接调用sendInLoop
- 否则调用
EventLoop::runInLoop
,其实也等同于EventLoop::queueInLoop
void TcpConnection::send(std::string const &msg)
{
if(state_ == kConnected)
{
if(loop_->isInLoopThread())
{
// 在一个线程
sendInLoop(msg.c_str(), msg.size());
}
else
{
loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, msg.c_str(), msg.size()));
}
}
}
Send
只是向用户提供了一个向对端写数据的简易接口,真正做出向对端socketwirte
操作的是TcpConnection::sendInLoop
。
此函数考虑了从应用缓冲区向内核缓冲区拷贝与网卡从内核缓冲区发送数据之间的速度,设置了一个水位标志,只有应用缓冲区的数据量越过水位标志才会发送数据。
调用 sendInLoop
会通过系统调用 write
将数据发送到对端。如果数据全部发送成功,则会触发回调函数 writeCompleteCallback_
;如果未能全部发送完,则会将剩余数据存入写缓冲区,并为 Channel
设置 EPOLLOUT
事件。待内核缓冲区就绪时,Channel
会触发写事件,进而调用已注册的写回调函数(即 TcpConnection::handleWrite
)来继续发送数据。
/*
发送数据,应用程序写的快,内核发送慢,需要把带发送数据写入缓冲区,而且设置了水位回调
*/
void TcpConnection::sendInLoop(const char *message, size_t len)
{
ssize_t nwrote = 0; // 本次已写字节
size_t remaining = len; // 剩余字节
bool faultError = false; // 是否发生错误
if(state_ == kDisconnected)
{
LOG_ERROR("disconnected, give up writing!");
return;
}
// 表示channel第一次写数据=> fd未注册写事件&&发送缓冲区可读字节为0
if(!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = ::write(channel_->fd(), message, len); // 向内核缓冲区写数据
if(nwrote >= 0)
{
remaining = len - nwrote;
if(remaining == 0 && writeCompleteCallback_)
{
// 表示数据已全部发送,调用发送完毕回调
loop_->queueInLoop(
std::bind(writeCompleteCallback_, shared_from_this())
);
}
}
else // 出错
{
nwrote = 0;
if(errno != EWOULDBLOCK)
{
LOG_ERROR("TcpConnection::sendInLoop");
if(errno == EPIPE || errno == ECONNRESET)
{
faultError = true;
}
}
}
}
/*
下面此判断逻辑说明:
1.当前这一次write并没有全部发送完毕,需要将剩余的数据保存到缓冲区outputBuffer_中
2.给Channel注册EPOLLOUT事件,poller发现tcp的发送缓冲区有空间,会通知sock - channel,调用writeCallback_回调
3.就是调用TcpConnection::handleWrite方法,把发送缓冲区中的数据全部发送完为止
*/
if(!faultError && remaining > 0) // 这次write系统调用无错误 && 还有剩余数据待发送
{
/*
如果在某次调用sendInLoop并未一次性地把数据全部发送完,会把数据存到缓冲区;
待下一次调用sendInLoop会取到上次未读完的数据
*/
size_t oldLen = outputBuffer_.readableBytes();
if(oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
// 旧数据 + 此次未写数据 >= 高水位标志 && 旧数据 < 高水位标志
// => 意味着 此次未写数据要使写缓冲区待发送数据(缓冲区待发送数据 = 旧数据 + 此次未写数据)>=高水位标志
{
loop_->queueInLoop(
std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining)
);
}
// 将此次未写数据添加到 缓冲区
outputBuffer_.append(static_cast<const char*>(message) + nwrote, remaining);
if(!channel_->isWriting())
{
channel_->enableWriting(); // 设置EPOLLOUT事件
}
}
}
待内核缓冲区就绪时,Channel
会触发写事件,进而调用已注册的写回调函数(即 TcpConnection::handleWrite
)来继续发送数据。
为什么在 handleWrite
中判断 state_ == kDisconnecting
?
写事件触发的时机:当内核发送缓冲区有空间时,写事件会触发。这时,
handleWrite
会尝试继续发送缓冲区中的数据。判断是否完成发送:如果此时缓冲区中的数据已全部发送完成,并且连接状态是
kDisconnecting
,说明可以安全地关闭连接。
void TcpConnection::handleWrite()
{
if(channel_->isWriting())
{
int saveErrno = 0;
ssize_t n = outputBuffer_.writeFd(channel_->fd(), &saveErrno);
if(n > 0)
{
outputBuffer_.retrieve(n);
if(outputBuffer_.readableBytes() == 0) // 表示已发送完
{
channel_->disableWriting();
// 消息发送完之后的回调函数
if(writeCompleteCallback_)
{
loop_->queueInLoop(
std::bind(writeCompleteCallback_, shared_from_this())
);
}
/*
为什么要判断连接状态?
1.保证在断开连接前,所有待发送的数据都已发送完毕。
2.实现优雅关闭(半关闭)
*/
if(state_ == kDisconnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_ERROR("TcpConnection::handleWrite");
}
}
else
{
LOG_ERROR("TcpConnection fd=%d is down, no more writing", channel_->fd());
}
}
关闭
作为用户,可以在任意线程内(并非是TcpConnection
所属的EventLoop
线程内)调用 TcpConnection::shutdown()
来优雅地关闭与已连接客户端的连接。该方法会确保所有未发送的数据被完整发送后,再关闭连接的写端,从而实现对客户端的安全关闭操作。
// 关闭连接
void TcpConnection::。shutdown()
{
if(state_ == kConnected)
{
setState(kDisconnecting);
loop_->runInLoop(std::bind(
&TcpConnection::shutdownInLoop, this
));
}
}
TcpConnection::shutdownInLoop
会确保在 EventLoop
所属的线程内执行,并首先检查数据是否已全部发送:
- 若数据已全部发送:直接关闭写端,触发
EPOLLHUP
事件,通知对端连接已关闭。 - 若数据未发送完:跳过关闭写端的逻辑,同时为
Channel
注册EPOLLOUT
事件。随后,在EventLoop
所属线程中,当内核发送缓冲区可用时触发写事件,执行TcpConnection::handleWrite
。
在 handleWrite
中:
- 如果缓冲区中的数据被完全发送,则会再次调用
TcpConnection::shutdownInLoop
,完成闭环。 - 最终,当所有数据发送完毕,关闭写端并触发
EPOLLOUT
事件,执行Channel
注册的关闭事件回调,完成优雅关闭流程。
// 此方法会确保在EventLoop所属的线程内执行
void TcpConnection::shutdownInLoop()
{
if(!channel_->isWriting()) // 表示写缓冲区内的数据全部发送完
{
socket_->shutdownWrite();// 关闭写端,触发EPOLLHUP;
// =》channel::closeCallback_->TcpConnection::handleClose
}
}
在TcpConnection
构造函数可知,Channel
的关闭事件回调也就是TcpConnection::handleClose()
。
void TcpConnection::handleClose()
{
LOG_INFO("TcpConnection::handleClose fd=%d state=%d", channel_->fd(), (int)state_);
state_ = kDisconnected;
channel_->disableAll();
TcpConnectionPtr connPtr(shared_from_this());
connectionCallback_(connPtr); // 执行连接关闭的回调
closeCallback_(connPtr); // 执行关闭连接的回调
}
connectionCallback_
为用户注册的回调函数,closeCallaback_
为TcpServer
注册的回调函数,最终会调用TcpConnection::connectDestroyed
。
// 连接销毁
void TcpConnection::connectDestroyed()
{
if(state_ == kConnected)
{
setState(kDisconnected);
channel_->disableAll(); // 把channel的所以有感兴趣的事件从Poller中delete
connectionCallback_(shared_from_this());
}
channel_->remove();// 在Poller中移除remove
}
错误
发生错误时,即Channel
触发了EPOLLERR
事件,会调用Channel
中注册的错误回调函数,此函数是在TcpConnection
构造函数中设置的。
void TcpConnection::handleError()
{
int optval;
socklen_t optlen = sizeof optval;
int err;
if(::getsockopt(channel_->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0)
{
err = errno;
}
else
{
err = optval;
}
LOG_ERROR("TcpConnection::handleError name:%s - SO_ERROR:%d", name_.c_str(), err);
}
至此,TcpConnection
的核心部分以及与其他组件的关联已经完整串联和解析。
TcpServer
TcpServer
是Muduo
网络库的最顶层模块,它抽象了服务端的网络通信流程,包括监听端口、接收客户端连接、创建 TcpConnection
实例,以及管理多个连接的生命周期和事件回调。
具体如何使用它来构建一个建议聊天服务器,大家可以看看我的这篇文章Muduo架构设计剖析
TcpServer
的组成:
-
Acceptor
- 负责监听指定的地址和端口,并接收新连接。
- 为每个连接分配一个文件描述符(
fd
),并将其交给主线程或线程池中的事件循环(EventLoop
)处理。
-
EventLoop
MainLoop
,管理TcpServer
和所有连接的事件驱动逻辑。
-
EventLoopThreadPool
- 可以将新连接分配给不同的线程,提高并发处理能力。
-
用户回调函数
- 用户可以通过
TcpServer
注册各种回调函数(如连接回调、消息回调、写完成回调等),用于处理应用层逻辑。
- 用户可以通过
类图如下:
Tcpconnection.h
class TcpServer : noncopyable
{
public:
using ThreadInitCallback = std::function<void(EventLoop*)>;
enum Option
{
KnoReusePort,
kReusePort
};
TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const std::string& argName,
Option option = KnoReusePort);
~TcpServer();
void setThreadInitCallback(const ThreadInitCallback& cb) { threadInitCallback_ = std::move(cb); }
void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = std::move(cb); }
void setMessageCallback(const MessageCallback& cb) { messageCallback_ = std::move(cb); }
void setWriteCallback(const WriteCompleteCallback& cb) { writeCompleteCallback_ = std::move(cb); }
// 设置subloop数量
void setThreadNum(int numThreads);
// 开启监听
void start();
private:
void NewConnection(int sockfd, const InetAddress& peerAddr);
void removeConnection(const TcpConnectionPtr& conn);
void removeConnectionInLoop(const TcpConnectionPtr& conn);
private:
using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>;
EventLoop* loop_; //用户定义的mainloop
const std::string ipPort_;
const std::string name_;
std::unique_ptr<Acceptor> accpetor_;
std::shared_ptr<EventLoopThreadPool> threadPool_; // one loop per thread
ConnectionCallback connectionCallback_; // 有新连接的回调
MessageCallback messageCallback_; // 有读写消息的回调
WriteCompleteCallback writeCompleteCallback_; // 消息发送完成以后的回调
ThreadInitCallback threadInitCallback_; // loop线程初始化回调
std::atomic_int started_;
int nextConnId_;
ConnectionMap connections_;
};
TcpConnection.cc
构造函数
static EventLoop* CheckLoopNotNull(EventLoop* loop)
{
if(loop == nullptr)
{
LOG_FATAL("%s:%s:%d mainloop is null!", __FILE__, __FUNCTION__, __LINE__);
}
return loop;
}
TcpServer::TcpServer(EventLoop *loop, InetAddress const &listenAddr, const std::string& argName, Option option) :
loop_(loop),
ipPort_(listenAddr.toIpPort()),
name_(argName),
accpetor_(new Acceptor(loop, listenAddr, option==kReusePort)),
threadPool_(new EventLoopThreadPool(loop, name_)),
connectionCallback_(),
messageCallback_(),
started_(0),
nextConnId_(1)
{
// 当有新用户连接时,此函数作为回调函数
accpetor_->setNewConnectionCallback(
std::bind(&TcpServer::NewConnection,
this, std::placeholders::_1, std::placeholders::_2));
}
新用户连接回调函数
// 在构造TcpServer时,创建了accpetor_,并把TcpServer::NewConnection绑定到Acceptor
// 当有新连接时->在MainLoop中的Acceptor::handleRead()->TcpServer::NewConnection
void TcpServer::NewConnection(int sockfd, InetAddress const &peerAddr)
{
// 根据轮询算法选择一个subloop来管理对应的channel
EventLoop* ioloop = threadPool_->getNextLoop();
char buf[64] = {0};
snprintf(buf, sizeof(buf), "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
// TcpConnection的名字
std::string connName = name_ + buf;
LOG_INFO("TcpServer::newConnecton [%s] - new connection[%s] from %s\n",
name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str()
);
// 通过sofkfd,获取其绑定的本地的ip地址和端口
sockaddr_in local;
::bzero(&local, sizeof local);
socklen_t addrlen = sizeof local;
if(::getsockname(sockfd, (sockaddr*)&local, &addrlen) < 0)
{
LOG_ERROR("sockets::getLocalAddr");
}
InetAddress localAddr(local);
// 根据连接成功的sockfd, 创建TcpConnection对象
TcpConnectionPtr conn(
new TcpConnection(
ioloop,
connName,
sockfd,
localAddr,
peerAddr
));
connections_[connName] = conn;
// 以下回调都是用户设置给 TcpServer
// TcpServer -> Channel -> poller => notify channel 调用回调
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection,
this, std::placeholders::_1)
);
// 执行此语句时是在mainLoop,将其入队到ioloop的任务队列,调用TcpConnection::connectionEstablish
ioloop->runInLoop(
std::bind(&TcpConnection::connectEstablished,
conn)
);
}
构造完TcpConnection
并设置好回调函数后,会在MainLoop
中执行TcpConnection::connectEstablished
表示此TCP连接建立成功。
移除连接
TcpServer
为每个TcpConnction
设置了关闭回调conn->setCloseCallback()
,为其绑定的函数是TcpServer::removeConnection
,也就意味着当socket关闭时,就会触发EPOLLEHUP
事件,Channel
会调用其关闭回调函数,这个关闭回调函数就是TcpServer::removeConnection
。
此函数会在其他线程调用(即不是loop_绑定的线程),通过runInLoop
函数进而一定会在MainLoop
中执行TcpServer::removeConnectionInLoop
。
void TcpServer::removeConnection(TcpConnectionPtr const &conn)
{
loop_->runInLoop(
std::bind(&TcpServer::removeConnectionInLoop, this, conn)
);
}
在MainLoop运行的移除TCP连接的操作
此函数会在MainLoop
上运行,就是移除connections_
中的映射关系。但其中的TcpConnection::connectDestroyed
会在TcpConnection
对应的SubLoop
中运行。
void TcpServer::removeConnectionInLoop(TcpConnectionPtr const &conn)
{
LOG_INFO("TcpServer::removeConnection [%s] - connection %s",
name_.c_str(), conn->name().c_str());
connections_.erase(conn->name());
EventLoop* ioLoop = conn->getLoop();
ioLoop->queueInLoop(
std::bind(&TcpConnection::connectDestroyed, conn)
);
}
设置SubLoop
数量
void TcpServer::setThreadNum(int numThreads)
{
threadPool_->setThreadNum(numThreads);
}
开始监听客户端连接
void TcpServer::start()
{
if(started_++ == 0)// 防止一个TCPServer对象被start多次
{
// 启动底层的线程池
threadPool_->start(threadInitCallback_);
loop_->runInLoop(std::bind(&Acceptor::listen, accpetor_.get()));
}
}
设置SubLoop
数量
void TcpServer::setThreadNum(int numThreads)
{
threadPool_->setThreadNum(numThreads);
}
开始监听客户端连接
void TcpServer::start()
{
if(started_++ == 0)// 防止一个TCPServer对象被start多次
{
// 启动底层的线程池
threadPool_->start(threadInitCallback_);
loop_->runInLoop(std::bind(&Acceptor::listen, accpetor_.get()));
}
}
多线程模型
- 主线程:
- 负责管理
Acceptor
和接受新连接。 - 新连接被分配给线程池中的工作线程。
- 负责管理
- 工作线程:
- 每个线程运行一个独立的
EventLoop
,处理分配到的TcpConnection
的读写事件和回调逻辑。
- 每个线程运行一个独立的
这种设计实现了主线程的轻量化,同时利用线程池处理大量并发连接。
总结
TcpServer
是 Muduo 的核心组件之一,为用户提供了简单易用的接口来实现高效的 TCP 服务器。它将底层复杂的网络操作(如 socket、epoll 等)封装为事件驱动的编程模型,并支持多线程,从而使用户能够专注于实现业务逻辑。