Muduo网络库解析--网络模块(2)

news2024/12/15 13:31:44

前文

重写Muduo库实现核心模块的Git仓库

注:本文将重点剖析 Muduo 网络库的核心框架,深入探讨作者精妙的代码设计思路,并针对核心代码部分进行重写,将原本依赖 boost 的实现替换为原生的 C++11 语法。需要说明的是,本文并不打算对整个 Muduo 库进行完整的重写。Muduo库源码链接

在上文中,我们对Muduo网络库的核心网络模块中的SocketInetAddress以及Acceptor进行了解析。这节我们将对剩余的核心网络模块中的TcpConnection以及TcpServer进行解析。

TcpConnection

Muduo 网络库 中,TcpConnection 是一个非常重要的类,主要用于表示并管理一个 TCP 连接。它抽象了应用层和网络层之间的交互,负责处理一个具体的 TCP 连接的生命周期以及数据的发送和接收。

TcpConnection 的主要作用

  1. 抽象 TCP 连接

    • TcpConnection 表示一个具体的 TCP 连接,隐藏了底层的 socket 描述符和 epoll 等细节,使得用户只需要关注逻辑层面。
    • 每一个客户端连接都会对应一个 TcpConnection 实例。
  2. 管理 TCP 连接的生命周期

    • 包括连接的建立、数据的收发、连接的关闭等。
    • 在连接的不同阶段会触发对应的回调(callback),如连接建立回调、消息回调和关闭回调。
  3. 提供高效的异步 I/O

    • 通过事件驱动模型,结合 ChannelEventLoop,实现异步非阻塞的 I/O 操作。
  4. 数据缓冲

    • 提供输入缓冲区和输出缓冲区(Buffer),用于存储接收和发送的数据。
  5. 支持用户自定义回调

    • 用户可以设置各种回调函数,比如连接建立的回调(ConnectionCallback)、消息到来的回调(MessageCallback)、写完成回调等。
  6. 线程安全

    • TcpConnection 的大部分操作是线程安全的,支持跨线程调用,比如关闭连接时可以跨线程调用 shutdown

TcpConnection 的核心功能

  1. 连接管理
    • 提供方法来开启和关闭连接(connectEstablished()connectDestroyed())。
    • 判断连接状态(isConnected() 等)。
  2. 数据传输
    • 接收数据:通过 MessageCallback 回调函数处理接收到的数据。
    • 发送数据:提供 send() 方法,用于发送字符串或二进制数据。发送过程是非阻塞的,数据会先存入输出缓冲区。
  3. 回调设置
    • 支持用户设置各种回调函数,如:
      • ConnectionCallback:连接状态变化时的回调。
      • MessageCallback:收到数据时的回调。
      • WriteCompleteCallback:数据发送完毕时的回调。
      • CloseCallback:连接关闭时的回调。
  4. 与事件循环集成
    • 每个 TcpConnection 实例绑定一个 EventLoop,并通过 Channel 监听和处理其对应 socket 的事件(如可读、可写等)。

类图如下:

image-20241214123752689

类的关键成员变量和方法

  1. 主要成员变量
    • EventLoop* loop_:所属的事件循环
    • StateE state_:表示连接的状态(如连接中、已连接、正在关闭、未连接)
    • unqiue_ptr<Socket> socket_:表示该TCP连接的socket
    • unique_ptr<Channel> channel_:表示该套接字描述的channel
    • InetAddress localAddr_:本地的IP和端口
    • InetAddress peerAddr_:对端的IP和端口
    • 输入缓冲区和输出缓冲区
    • 一系列回调函数:connectionCallback_messageCallback_
  2. 主要方法
    • void send(const std::string& message):发送数据。
    • void shutdown():关闭连接的写端。
    • void connectEstablished():在连接建立后被调用,初始化连接。
    • void connectDestroyed():在连接关闭后被调用,清理资源。
    • 回调设置方法:setConnectionCallback()setMessageCallback() 等。

TcpConnection.h

class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection>
{
public:
    TcpConnection(EventLoop* loop, 
                const std::string& name,
                int sockfd,
                const InetAddress& localAddr,
                const InetAddress& peerAddr);

    ~TcpConnection();

    EventLoop* getLoop() const { return loop_; }
    const std::string& name() const { return name_; }
    const InetAddress& localAddress() const { return localAddr_; }
    const InetAddress& peerAddress() const { return peerAddr_; }

    bool conncted() const { return state_ == StateE::kConnected; }

    // 发送数据
    void send(const std::string& buf);

    // 关闭连接
    void shutdown();
    
    void setConnectionCallback(const ConnectionCallback& cb)
    { connectionCallback_ = cb; }

    void setMessageCallback(const MessageCallback& cb)
    { messageCallback_ = cb; }

    void setWriteCompleteCallback(const WriteCompleteCallback& cb)
    { writeCompleteCallback_ = cb; }

    void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
    { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }

    void setCloseCallback(const CloseCallback& cb)
    { closeCallback_ = cb; }

    // 建立连接
    void connectEstablished();

    // 销毁连接 
    void connectDestroyed();

private:
    enum StateE
    {
        kDisconnected,	// 已关闭连接
        kConnecting,	// 正在连接
        kConnected,		// 已连接
        kDisconnecting	// 正在关闭连接
    };

    void handleRead(Timestamp receiveTime);
    void handleWrite();
    void handleClose();
    void handleError();

    void sendInLoop(const char* message, size_t len);

    void shutdownInLoop();

    void setState(StateE state) { state_ = state; }
    EventLoop* loop_;
    const std::string name_;
    std::atomic_int state_;
    bool reading_;

    std::unique_ptr<Socket> socket_;
    std::unique_ptr<Channel> channel_;

    const InetAddress localAddr_;
    const InetAddress peerAddr_; 

    ConnectionCallback connectionCallback_; // 有新连接的回调
    MessageCallback messageCallback_;       // 有读写消息的回调
    WriteCompleteCallback writeCompleteCallback_;    // 消息发送完成以后的回调
    CloseCallback closeCallback_;
    HighWaterMarkCallback highWaterMarkCallback_;   // 高水位回调
    
    // 高水位的值
    size_t highWaterMark_;

    Buffer inputBuffer_;
    Buffer outputBuffer_;
};

TcpConnection.cc

构造函数

TcpConnection 表示并管理一个 TCP 连接,在事件就绪时,会自动调用用户注册的回调函数。与此相对应,channel_ 中负责注册和管理这些用户定义的回调函数。

static EventLoop* CheckLoopNotNull(EventLoop* loop)
{
    if(loop == nullptr)
    {
        LOG_FATAL("%s:%s:%d TcpConnection is null!", __FILE__, __FUNCTION__, __LINE__);
    }
    return loop;
}

TcpConnection::TcpConnection(EventLoop *loop, std::string const &name,
                             int sockfd, InetAddress const &localAddr,
                             InetAddress const &peerAddr) : 
    loop_(CheckLoopNotNull(loop)),
    name_(name),
    state_(StateE::kConnecting),
    reading_(true),
    socket_(new Socket(sockfd)),
    channel_(new Channel(loop_, sockfd)),
    localAddr_(localAddr),
    peerAddr_(peerAddr),
    highWaterMark_(64*1024*1024)
{
    channel_->setReadCallback(
        std::bind(&TcpConnection::handleRead, this, std::placeholders::_1)
    );

    channel_->setWriteCallback(
        std::bind(&TcpConnection::handleWrite, this)
    );

    channel_->setCloseCallback(
        std::bind(&TcpConnection::handleClose, this)
    );

    channel_->setErrorCallback(
        std::bind(&TcpConnection::handleError, this)
    );

    LOG_INFO("TcpConnection::ctor[%s] at fd=%d", name_.c_str(), sockfd);

    socket_->setKeepAlive(true);
}
新连接

在接受到新的客户端连接后,构造了一个新的TcpConnection,紧接着会执行一些后续操作(设置状态等),就是在MainLoop中执行TcpConnection::connectEstablished。在构造函数中,状态设置为正在连接,在connectEstablish设置为已连接,后续可以注册读写事件了。

// 建立连接
void TcpConnection::connectEstablished()
{
    setState(kConnected);
    channel_->tie(shared_from_this()); // 将TcpConnection绑定到Channel上
    channel_->enableReading();      // 向Poller注册EPOLLIN事件

    // 新连接建立,执行回调
    connectionCallback_(shared_from_this());
}

connectionCallback_为用户注册的回调函数。

读事件

Poller 在监听到读事件就绪后,会将活跃的 Channel 集合返回给 EventLoop,即 activeChannels。随后,EventLoop 遍历 activeChannels 中的每一个 Channel,并调用其对应的就绪事件回调函数。理解了前面这段话,我们可以思考谁把各种回调函数注册到了Channel

根据 TcpConnection 的构造函数,我们可以得出结论:TcpConnection 负责管理一个 TCP 连接的完整生命周期,而 Channel 则负责管理已连接后 socket 的各种回调函数、感兴趣的事件以及就绪事件等。因此,为了实现这一管理职责,TcpConnection 需要负责 Channel 的生命周期管理;

并在构造函数中,调用Channel的公有接口注册回调函数。

注册用于处理读事件的成员函数是 TcpConnection::handleRead,这意味着当读事件就绪时,会自动调用该函数。第一步先读数据,若数据已成功读出则执行messageCallback_()messageCallback_()其实就是我们作为用户利用TcpServer::setMessageCallback注册的回调函数,在此函数就负责业务逻辑处理,将读数据和业务处理解耦。

void TcpConnection::handleRead(Timestamp receiveTime)
{
    int saveErrno = 0;
    ssize_t n = inputBuffer_.readFd(channel_->fd(), &saveErrno);
    if(n > 0)
    {
        // 已建立连接的用户,有可读事件发生
        messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
    }
    else if(n == 0)
    {
        // 客户端断开连接
        handleClose();
    }
    else
    {
        errno = saveErrno;
        LOG_ERROR("TcpConnection::handleRead");
        handleError();
    }
}
写事件

作为用户,我们希望只调用简单的接口就可以实现将数据发送出去,而不用关心其内部细节。在Muduo中提供了这样简单的接口:void TcpConnection::send(std::string const &msg)

其内部帮我们实现了线程安全:

  • 若调用send的线程与loop_所属的线程相同,则直接调用sendInLoop
  • 否则调用EventLoop::runInLoop,其实也等同于EventLoop::queueInLoop

void TcpConnection::send(std::string const &msg)
{
    if(state_ == kConnected)
    {
        if(loop_->isInLoopThread())
        {
            // 在一个线程
            sendInLoop(msg.c_str(), msg.size());    
        }
        else
        {
            loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, msg.c_str(), msg.size()));
        }
    }
}

Send只是向用户提供了一个向对端写数据的简易接口,真正做出向对端socketwirte操作的是TcpConnection::sendInLoop

此函数考虑了从应用缓冲区向内核缓冲区拷贝与网卡从内核缓冲区发送数据之间的速度,设置了一个水位标志,只有应用缓冲区的数据量越过水位标志才会发送数据。

调用 sendInLoop 会通过系统调用 write 将数据发送到对端。如果数据全部发送成功,则会触发回调函数 writeCompleteCallback_;如果未能全部发送完,则会将剩余数据存入写缓冲区,并为 Channel 设置 EPOLLOUT 事件。待内核缓冲区就绪时,Channel 会触发写事件,进而调用已注册的写回调函数(即 TcpConnection::handleWrite)来继续发送数据。

/*
    发送数据,应用程序写的快,内核发送慢,需要把带发送数据写入缓冲区,而且设置了水位回调
*/
void TcpConnection::sendInLoop(const char *message, size_t len)
{
    ssize_t nwrote = 0;         // 本次已写字节
    size_t remaining = len;     // 剩余字节
    bool faultError = false;    // 是否发生错误

    if(state_ == kDisconnected)
    {
        LOG_ERROR("disconnected, give up writing!");
        return;
    }

    // 表示channel第一次写数据=> fd未注册写事件&&发送缓冲区可读字节为0
    if(!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
    {
        nwrote = ::write(channel_->fd(), message, len); // 向内核缓冲区写数据
        if(nwrote >= 0)
        {
            remaining = len - nwrote;
            if(remaining == 0 && writeCompleteCallback_)   
            {
                // 表示数据已全部发送,调用发送完毕回调
                loop_->queueInLoop(
                    std::bind(writeCompleteCallback_, shared_from_this())
                );
            }

        }
        else // 出错
        {
            nwrote = 0;
            if(errno != EWOULDBLOCK)
            {
                LOG_ERROR("TcpConnection::sendInLoop");
                if(errno == EPIPE || errno == ECONNRESET)
                {
                    faultError = true;
                }
            }
        }
    }

    /*
    下面此判断逻辑说明:
        1.当前这一次write并没有全部发送完毕,需要将剩余的数据保存到缓冲区outputBuffer_中
        2.给Channel注册EPOLLOUT事件,poller发现tcp的发送缓冲区有空间,会通知sock - channel,调用writeCallback_回调
        3.就是调用TcpConnection::handleWrite方法,把发送缓冲区中的数据全部发送完为止
    */
    if(!faultError && remaining > 0)    // 这次write系统调用无错误 && 还有剩余数据待发送
    {
        /*
            如果在某次调用sendInLoop并未一次性地把数据全部发送完,会把数据存到缓冲区;
            待下一次调用sendInLoop会取到上次未读完的数据
        */
        size_t oldLen = outputBuffer_.readableBytes();  
        if(oldLen + remaining >= highWaterMark_
            && oldLen < highWaterMark_
            && highWaterMarkCallback_)  
            // 旧数据 + 此次未写数据 >= 高水位标志 && 旧数据 < 高水位标志 
            // => 意味着 此次未写数据要使写缓冲区待发送数据(缓冲区待发送数据 = 旧数据 + 此次未写数据)>=高水位标志
        {
            loop_->queueInLoop(
                std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining)
            );
        }
        // 将此次未写数据添加到 缓冲区
        outputBuffer_.append(static_cast<const char*>(message) + nwrote, remaining);
        if(!channel_->isWriting())
        {
            channel_->enableWriting();  // 设置EPOLLOUT事件
        }
    }
}

待内核缓冲区就绪时,Channel 会触发写事件,进而调用已注册的写回调函数(即 TcpConnection::handleWrite)来继续发送数据。

为什么在 handleWrite 中判断 state_ == kDisconnecting

写事件触发的时机:当内核发送缓冲区有空间时,写事件会触发。这时,handleWrite 会尝试继续发送缓冲区中的数据。

判断是否完成发送:如果此时缓冲区中的数据已全部发送完成,并且连接状态是 kDisconnecting,说明可以安全地关闭连接。

void TcpConnection::handleWrite()
{
    if(channel_->isWriting())
    {
        int saveErrno = 0;
        ssize_t n = outputBuffer_.writeFd(channel_->fd(), &saveErrno);
        if(n > 0)
        {
            outputBuffer_.retrieve(n);
            if(outputBuffer_.readableBytes() == 0) // 表示已发送完 
            {
                channel_->disableWriting();
                // 消息发送完之后的回调函数
                if(writeCompleteCallback_)
                {
                    loop_->queueInLoop(
                        std::bind(writeCompleteCallback_, shared_from_this())
                    );
                }
                /*
                    为什么要判断连接状态?
                    1.保证在断开连接前,所有待发送的数据都已发送完毕。
                    2.实现优雅关闭(半关闭)
                */
                if(state_ == kDisconnecting)
                {
                    shutdownInLoop();
                }
            }
        }
        else
        {
            LOG_ERROR("TcpConnection::handleWrite");
        }
    }
    else
    {
        LOG_ERROR("TcpConnection fd=%d is down, no more writing", channel_->fd());
    }
}
关闭

作为用户,可以在任意线程内(并非是TcpConnection所属的EventLoop线程内)调用 TcpConnection::shutdown() 来优雅地关闭与已连接客户端的连接。该方法会确保所有未发送的数据被完整发送后,再关闭连接的写端,从而实现对客户端的安全关闭操作。

// 关闭连接
void TcpConnection::shutdown() 
{   
    if(state_ == kConnected)
    {
        setState(kDisconnecting);
        loop_->runInLoop(std::bind(
            &TcpConnection::shutdownInLoop, this
        ));
    }
}

TcpConnection::shutdownInLoop 会确保在 EventLoop 所属的线程内执行,并首先检查数据是否已全部发送:

  • 若数据已全部发送:直接关闭写端,触发 EPOLLHUP 事件,通知对端连接已关闭。
  • 若数据未发送完:跳过关闭写端的逻辑,同时为 Channel 注册 EPOLLOUT 事件。随后,在 EventLoop 所属线程中,当内核发送缓冲区可用时触发写事件,执行 TcpConnection::handleWrite

handleWrite 中:

  • 如果缓冲区中的数据被完全发送,则会再次调用 TcpConnection::shutdownInLoop,完成闭环。
  • 最终,当所有数据发送完毕,关闭写端并触发 EPOLLOUT 事件,执行 Channel 注册的关闭事件回调,完成优雅关闭流程。
// 此方法会确保在EventLoop所属的线程内执行
void TcpConnection::shutdownInLoop()
{
    if(!channel_->isWriting()) // 表示写缓冲区内的数据全部发送完
    {
        socket_->shutdownWrite();// 关闭写端,触发EPOLLHUP;
        // =》channel::closeCallback_->TcpConnection::handleClose
    }
}

TcpConnection构造函数可知,Channel的关闭事件回调也就是TcpConnection::handleClose()

void TcpConnection::handleClose() 
{
    LOG_INFO("TcpConnection::handleClose fd=%d state=%d", channel_->fd(), (int)state_);
    state_ = kDisconnected;
    channel_->disableAll();

    TcpConnectionPtr connPtr(shared_from_this());
    connectionCallback_(connPtr);   // 执行连接关闭的回调
    closeCallback_(connPtr);        // 执行关闭连接的回调
}

connectionCallback_为用户注册的回调函数,closeCallaback_TcpServer注册的回调函数,最终会调用TcpConnection::connectDestroyed

// 连接销毁
void TcpConnection::connectDestroyed()
{
    if(state_ == kConnected)
    {
        setState(kDisconnected);
        channel_->disableAll(); // 把channel的所以有感兴趣的事件从Poller中delete
        connectionCallback_(shared_from_this());
    }
    channel_->remove();// 在Poller中移除remove
}
错误

发生错误时,即Channel触发了EPOLLERR事件,会调用Channel中注册的错误回调函数,此函数是在TcpConnection构造函数中设置的。

void TcpConnection::handleError()
{
    int optval;
    socklen_t optlen = sizeof optval;
    int err;
    if(::getsockopt(channel_->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0)
    {
        err = errno;
    }
    else
    {
        err = optval;
    }
    LOG_ERROR("TcpConnection::handleError name:%s - SO_ERROR:%d", name_.c_str(), err);
}

至此,TcpConnection 的核心部分以及与其他组件的关联已经完整串联和解析。

TcpServer

TcpServerMuduo网络库的最顶层模块,它抽象了服务端的网络通信流程,包括监听端口、接收客户端连接、创建 TcpConnection 实例,以及管理多个连接的生命周期和事件回调。

具体如何使用它来构建一个建议聊天服务器,大家可以看看我的这篇文章Muduo架构设计剖析

TcpServer的组成

  1. Acceptor

    • 负责监听指定的地址和端口,并接收新连接。
    • 为每个连接分配一个文件描述符(fd),并将其交给主线程或线程池中的事件循环(EventLoop)处理。
  2. EventLoop

    • MainLoop,管理 TcpServer 和所有连接的事件驱动逻辑。
  3. EventLoopThreadPool

    • 可以将新连接分配给不同的线程,提高并发处理能力。
  4. 用户回调函数

    • 用户可以通过 TcpServer 注册各种回调函数(如连接回调、消息回调、写完成回调等),用于处理应用层逻辑。

类图如下:

image-20241214170151812

Tcpconnection.h

class TcpServer : noncopyable
{
public:
    using ThreadInitCallback = std::function<void(EventLoop*)>;

    enum Option
    {
        KnoReusePort,
        kReusePort
    };

    TcpServer(EventLoop* loop,
                const InetAddress& listenAddr,
                const std::string& argName,
                Option option = KnoReusePort);
    
    ~TcpServer();

    void setThreadInitCallback(const ThreadInitCallback& cb) { threadInitCallback_ = std::move(cb); }
    void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = std::move(cb); }
    void setMessageCallback(const MessageCallback& cb) { messageCallback_ = std::move(cb); }
    void setWriteCallback(const WriteCompleteCallback& cb) { writeCompleteCallback_ = std::move(cb); }

    // 设置subloop数量
    void setThreadNum(int numThreads);

    // 开启监听
    void start();

private:
    void NewConnection(int sockfd, const InetAddress& peerAddr);
    void removeConnection(const TcpConnectionPtr& conn);
    void removeConnectionInLoop(const TcpConnectionPtr& conn);

private:
    using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>;

    EventLoop* loop_;   //用户定义的mainloop
    const std::string ipPort_;
    const std::string name_;
    std::unique_ptr<Acceptor> accpetor_;
    std::shared_ptr<EventLoopThreadPool> threadPool_;   // one loop per thread

    ConnectionCallback connectionCallback_; // 有新连接的回调
    MessageCallback messageCallback_;       // 有读写消息的回调
    WriteCompleteCallback writeCompleteCallback_;    // 消息发送完成以后的回调

    ThreadInitCallback threadInitCallback_; // loop线程初始化回调
    std::atomic_int  started_;

    int nextConnId_;
    ConnectionMap connections_;
};

TcpConnection.cc

构造函数
static EventLoop* CheckLoopNotNull(EventLoop* loop)
{
    if(loop == nullptr)
    {
        LOG_FATAL("%s:%s:%d mainloop is null!", __FILE__, __FUNCTION__, __LINE__);
    }
    return loop;
}

TcpServer::TcpServer(EventLoop *loop, InetAddress const &listenAddr, const std::string& argName, Option option) :
    loop_(loop),
    ipPort_(listenAddr.toIpPort()),
    name_(argName),
    accpetor_(new Acceptor(loop, listenAddr, option==kReusePort)),
    threadPool_(new EventLoopThreadPool(loop, name_)),
    connectionCallback_(),
    messageCallback_(),
    started_(0),
    nextConnId_(1)
{
    // 当有新用户连接时,此函数作为回调函数
    accpetor_->setNewConnectionCallback(
        std::bind(&TcpServer::NewConnection, 
        this, std::placeholders::_1, std::placeholders::_2));
}
新用户连接回调函数
// 在构造TcpServer时,创建了accpetor_,并把TcpServer::NewConnection绑定到Acceptor
// 当有新连接时->在MainLoop中的Acceptor::handleRead()->TcpServer::NewConnection
void TcpServer::NewConnection(int sockfd, InetAddress const &peerAddr)
{
    // 根据轮询算法选择一个subloop来管理对应的channel
    EventLoop* ioloop = threadPool_->getNextLoop();
    char buf[64] = {0};
    snprintf(buf, sizeof(buf), "-%s#%d", ipPort_.c_str(), nextConnId_);
    ++nextConnId_;
    // TcpConnection的名字
    std::string connName = name_ + buf; 

    LOG_INFO("TcpServer::newConnecton [%s] - new connection[%s] from %s\n", 
        name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str()
    );
   // 通过sofkfd,获取其绑定的本地的ip地址和端口
   sockaddr_in local;
   ::bzero(&local, sizeof local);
   socklen_t addrlen = sizeof local;
   if(::getsockname(sockfd, (sockaddr*)&local, &addrlen) < 0)
   {
        LOG_ERROR("sockets::getLocalAddr");
   }
   InetAddress localAddr(local);

   // 根据连接成功的sockfd, 创建TcpConnection对象
   TcpConnectionPtr conn(
        new TcpConnection(
            ioloop,
            connName,
            sockfd,
            localAddr,
            peerAddr
    ));

    connections_[connName] = conn;
    // 以下回调都是用户设置给 TcpServer
    // TcpServer -> Channel -> poller => notify channel 调用回调
    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->setWriteCompleteCallback(writeCompleteCallback_);
    
    conn->setCloseCallback(
        std::bind(&TcpServer::removeConnection, 
        this, std::placeholders::_1)
    );

    // 执行此语句时是在mainLoop,将其入队到ioloop的任务队列,调用TcpConnection::connectionEstablish
    ioloop->runInLoop(
        std::bind(&TcpConnection::connectEstablished,
        conn)
    );
}

构造完TcpConnection并设置好回调函数后,会在MainLoop中执行TcpConnection::connectEstablished表示此TCP连接建立成功。

移除连接

TcpServer为每个TcpConnction设置了关闭回调conn->setCloseCallback(),为其绑定的函数是TcpServer::removeConnection,也就意味着当socket关闭时,就会触发EPOLLEHUP事件,Channel会调用其关闭回调函数,这个关闭回调函数就是TcpServer::removeConnection

此函数会在其他线程调用(即不是loop_绑定的线程),通过runInLoop函数进而一定会在MainLoop中执行TcpServer::removeConnectionInLoop

void TcpServer::removeConnection(TcpConnectionPtr const &conn)
{
    loop_->runInLoop(
        std::bind(&TcpServer::removeConnectionInLoop, this, conn)
    );
}

在MainLoop运行的移除TCP连接的操作

此函数会在MainLoop上运行,就是移除connections_中的映射关系。但其中的TcpConnection::connectDestroyed会在TcpConnection对应的SubLoop中运行。

void TcpServer::removeConnectionInLoop(TcpConnectionPtr const &conn)
{
    LOG_INFO("TcpServer::removeConnection [%s] - connection %s",
        name_.c_str(), conn->name().c_str());
    connections_.erase(conn->name());
    EventLoop* ioLoop = conn->getLoop();
    ioLoop->queueInLoop(
        std::bind(&TcpConnection::connectDestroyed, conn)
    );
}
设置SubLoop数量
void TcpServer::setThreadNum(int numThreads)
{
    threadPool_->setThreadNum(numThreads);
}
开始监听客户端连接
void TcpServer::start()
{
    if(started_++ == 0)// 防止一个TCPServer对象被start多次
    {
        // 启动底层的线程池
        threadPool_->start(threadInitCallback_);
        loop_->runInLoop(std::bind(&Acceptor::listen, accpetor_.get()));
    }
}
设置SubLoop数量
void TcpServer::setThreadNum(int numThreads)
{
    threadPool_->setThreadNum(numThreads);
}
开始监听客户端连接
void TcpServer::start()
{
    if(started_++ == 0)// 防止一个TCPServer对象被start多次
    {
        // 启动底层的线程池
        threadPool_->start(threadInitCallback_);
        loop_->runInLoop(std::bind(&Acceptor::listen, accpetor_.get()));
    }
}

多线程模型

  • 主线程
    • 负责管理 Acceptor 和接受新连接。
    • 新连接被分配给线程池中的工作线程。
  • 工作线程
    • 每个线程运行一个独立的 EventLoop,处理分配到的 TcpConnection 的读写事件和回调逻辑。

这种设计实现了主线程的轻量化,同时利用线程池处理大量并发连接。

总结

TcpServer 是 Muduo 的核心组件之一,为用户提供了简单易用的接口来实现高效的 TCP 服务器。它将底层复杂的网络操作(如 socket、epoll 等)封装为事件驱动的编程模型,并支持多线程,从而使用户能够专注于实现业务逻辑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2259949.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

电脑怎么设置通电自动开机(工控机)

操作系统&#xff1a;win10 第一步&#xff0c;电脑开机时按del键进入bios页面。 第二步&#xff0c;选择advanced下的IT8712 Super IO Configuration 第三步&#xff0c;找到Auto Power On&#xff0c;将其从Power off设置为Power On 第四步&#xff0c;F10保存&#xff0c;大…

如何对小型固定翼无人机进行最优的路径跟随控制?

控制架构 文章继续采用的是 ULTRA-Extra无人机&#xff0c;相关参数如下&#xff1a; 这里用于guidance law的无人机运动学模型为&#xff1a; { x ˙ p V a cos ⁡ γ cos ⁡ χ V w cos ⁡ γ w cos ⁡ χ w y ˙ p V a cos ⁡ γ sin ⁡ χ V w cos ⁡ γ w sin ⁡ χ…

基于Redis实现令牌桶算法

基于Redis实现令牌桶算法 令牌桶算法算法流程图优点缺点 实现其它限流算法 令牌桶算法 令牌桶是一种用于分组交换和电信网络的算法。它可用于检查数据包形式的数据传输是否符合定义的带宽和突发性限制&#xff08;流量不均匀或变化的衡量标准&#xff09;。它还可以用作调度算…

学习threejs,局部纹理刷新,实现图片分块加载

&#x1f468;‍⚕️ 主页&#xff1a; gis分享者 &#x1f468;‍⚕️ 感谢各位大佬 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍⚕️ 收录于专栏&#xff1a;threejs gis工程师 文章目录 一、&#x1f340;前言1.1 ☘️Texture 贴图 二、&#x1…

超标量处理器设计笔记(10) 寄存器重命名过程的恢复、分发

重命名 寄存器重命名过程的恢复使用 Checkpoint 对 RAT 进行恢复使用 WALK 对 RAT 进行恢复使用 Architecture State 对 RAT 进行恢复总结 分发&#xff08;Dispatch&#xff09; 寄存器重命名过程的恢复 当发生异常、分支预测失败时&#xff0c;指令占用 RAT、ROB 和 Issue …

海康萤石摄像机接入EasyNVR流程:开启RTSP-》萤石视频添加到EasyNVR-》未来支持海康SDK协议添加到EasyNVR

EasyNVR目前支持GB28181、RTSP、ONVIF、RTMP&#xff08;推流&#xff09;这几种协议接入&#xff0c;目前正在增加海康HIKSDK、大华DHSDK等几种SDK的接入&#xff0c;我们今天就介绍一下萤石摄像机怎么通过RTSP接入到EasyNVR。 第一步&#xff1a;萤石摄像机开启 萤石设备默…

Qt编写的文件传输工具

使用QT编写的文件传输工具 文件传输工具通过发送udp广播消息将IP广播给其他开启该程序的局域网机器 文件传输工具 通过发送udp广播消息将IP广播给其他开启该程序的局域网机器 收到的广播消息可以显示在IP地址列表中&#xff0c;点击IP地址可以自动填充到IP地址栏内 选择文件…

【潜意识Java】深入理解 Java 面向对象编程(OOP)

目录 什么是面向对象编程&#xff08;OOP&#xff09;&#xff1f; 1. 封装&#xff08;Encapsulation&#xff09; Java 中的封装 2. 继承&#xff08;Inheritance&#xff09; Java 中的继承 3. 多态&#xff08;Polymorphism&#xff09; Java 中的多态 4. 抽象&…

PWM调节DCDC参数计算原理

1、动态电压频率调整DVFS SOC芯片的核电压、GPU电压、NPU电压、GPU电压等&#xff0c;都会根据性能和实际应用场景来进行电压和频率的调整。 即动态电压频率调整DVFS&#xff08;Dynamic Voltage and Frequency scaling&#xff09;&#xff0c;优化性能和功耗。 比如某SOC在…

OpenCV相关函数

一、二值化函数&#xff08;threshold&#xff09; 功能&#xff1a;将灰度图像转换为二值图像&#xff0c;通常用于图像分割。通过设置阈值&#xff0c;把图像中低于阈值的像素设为0&#xff0c;高于阈值的像素设为1。 参数&#xff1a; src&#xff1a;输入图像。 thresh&a…

bean后处理器的作用

这是beanFactory中常见的一些后处理器&#xff1a; 其中这俩个属于bean后处理器&#xff1a; internalAutowiredAnnotationProcessor解析Autowired、Value internalCommonAnnotationProcessor解析Resource、PostConstruct、PreDestroy Bean后处理器的作用&#xff1a;为Bean…

YOLOv11融合[CVPR2024]Starnet中的star block取模块

YOLOv11v10v8使用教程&#xff1a; YOLOv11入门到入土使用教程 YOLOv11改进汇总贴&#xff1a;YOLOv11及自研模型更新汇总 《Rewrite the Stars》 一、 模块介绍 论文链接&#xff1a;https://arxiv.org/abs/2403.19967 代码链接&#xff1a;https://github.com/ma-xu/Rewri…

日常灵感:听劝是一种天赋

希望这段分享能给你提供一些新的角度&#xff0c;让你在自己的工作和生活中更好地利用这份“听劝”的天赋&#xff01; 父与子的救赎&#xff1a;听劝的天赋 学霸爸爸李先生是一个典型的"别人家的父母"。 他从小就是学霸&#xff0c;凭借过硬的学习能力从重点高中一…

The Rise and Potential of Large Language ModelBased Agents:A Survey---讨论

讨论 论法学硕士研究与Agent研究的互利性 近年来&#xff0c;随着激光诱导金属化技术的发展&#xff0c;激光诱导金属化与化学剂交叉领域的研究取得了长足的进步&#xff0c;促进了这两个领域的发展。在此&#xff0c;我们期待着LLM研究和Agent研究相互提供的一些益处和发展机…

React 第十六节 useCallback 使用详解注意事项

useCallback 概述 1、useCallback 是在React 中多次渲染缓存函数的 Hook&#xff0c;返回一个函数的 memoized的值&#xff1b; 2、如果多次传入的依赖项不变&#xff0c;那么多次定义的时候&#xff0c;返回的值是相同的,防止频繁触发更新&#xff1b; 3、多应用在 父组件为函…

Transformer: Attention Is All You Need (2017) 翻译

论文&#xff1a;Attention Is All You Need 下载地址如下: download: Transformer Attention Is All you need Attention Is All You Need 中文 《Attention Is All You Need》是《Transformer》模型的开创性论文&#xff0c;提出了一种全新的基于注意力机制的架构&#xf…

Git-分支(branch)常用命令

分支 我们在做项目开发的时候&#xff0c;无论是软件项目还是其他机械工程项目&#xff0c;我们为了提高效率以及合理的节省时间等等原因&#xff0c;现在都不再是线性进行&#xff0c;而是将一个项目抽离出诸进行线&#xff0c;每一条线在git中我们就叫做分支&#xff0c;bran…

【AIGC进阶-ChatGPT提示词副业解析】反向心理学在沟通中的运用:激将法的艺术

引言 在日常沟通和管理中&#xff0c;直接的表达方式并不总能达到预期效果。反向心理学&#xff0c;特别是其中的激将法&#xff0c;作为一种独特的沟通技巧&#xff0c;往往能在看似消极的表达中激发出积极的反应。本文将深入探讨反向心理学中激将法的运用技巧、实施策略及其…

360智脑张向征:共建可信可控AI生态 应对大模型安全挑战

发布 | 大力财经 人工智能的加速发展&#xff0c;有力推动了社会的数智化转型&#xff1b;与此同时&#xff0c;带来的相关安全风险也日益凸显。近日&#xff0c;在北京市举办的通明湖人工智能开发与应用大会上&#xff0c;360智脑总裁张向征以“大模型安全研究与实践”为主题&…

lc46全排列——回溯

46. 全排列 - 力扣&#xff08;LeetCode&#xff09; 法1&#xff1a;暴力枚举 总共n!种全排列&#xff0c;一一列举出来放入list就行&#xff0c;关键是怎么去枚举呢&#xff1f;那就每次随机取一个&#xff0c;然后删去这个&#xff0c;再从剩下的数组中继续去随机选一个&a…