重写muduo之TcpConnection

news2025/2/23 23:40:45

目录

1、 TcpConnection.h

2、 TcpConnection.cc


1、 TcpConnection.h

TcpConnection底层绑定(管理)了一个Channel,Channel有事件被Poller通知后,会调用相应的回调,这些回调也是TcpConnection中包含的方法,将这些方法绑定了一下塞给channel作为回调,如果TcpConnection相应的底层对应的channel还在poller上注册着,还会感知到poller通知的事件,并且调用相应的回调,如果此时它对应的TcpConnection对象没有了(被人remove掉)怎么办?

tie使用弱智能指针记录,在处理事件的时候(handleEvent),肯定是被tie过,在这个方法里面将弱智能指针提升一下,如果不做任何的回调调用就说明TcpConnection这个对象已经没有了。

#pragma once
#include "noncopyable.h"
#include "InetAddress.h"
#include "Callbacks.h"
#include "Buffer.h"

#include <memory>
#include <string>
#include <atomic>

class Channel;
class EventLoop;
class Socket;

/**
 * TcpServer => Acceptor => 有一个新用户连接,通过accept函数拿到connfd
 * 
 * => TcpConnection 设置回调 =》 Channel => Poller =>Channel的回调操作
 * 
*/
class TcpConnection:noncopyable,public std::enable_shared_from_this<TcpConnection>//获得当前对象的智能指针
{
public:
    TcpConnection(EventLoop* loop,
                const std::string& name,
                int sockfd,
                const InetAddress& localAddr,
                const InetAddress& peerAddr);
    ~TcpConnection();

    EventLoop* getLoop() const {return loop_;}
    const std::string& name() const{return name_;}
    const InetAddress& localAddress() const {return localAddr_;}
    const InetAddress& peerAddress() const {return peerAddr_;}

    bool connected() const {return state_==kConnected;}

    //发送数据
    void send(const std::string& buf);
    //关闭连接
    void shutdown();

    void setConnectionCallback(const ConnectionCallback& cb)
    {connectionCallback_=cb;}

    void setMessageCallback(const MessageCallback& cb)
    {messageCallback_=cb;}

    void setWriteCompleteCallback(const WriteCompleteCallback& cb)
    {writeCompleteCallback_=cb;}

    void setHighWaterMarkCallback(const HighWaterMarkCallback& cb,size_t highWaterMark)
    {highWaterMarkCallback_=cb; highWaterMark_=highWaterMark;}

    void setCloseCallback(const CloseCallback& cb)
    {closeCallback_=cb;}

    //连接建立
    void connectEstablished();
    //连接销毁
    void connectDestroyed();
private:
    enum StateE{kDisconnected,kConnecting,kConnected,kDisconnecting};
    void setState(StateE state){state_=state;}

    void handleRead(Timestamp receiveTime);
    void handleWrite();
    void handleClose();
    void handleError();

    void sendInLoop(const void* message,size_t len);

    void shutdownInLoop();

    EventLoop* loop_;//这里绝对不是baseLoop,因为TcpConnection都是在subloop里面管理的
    const std::string name_;
    std::atomic_int state_;
    bool reading_;

    //这里和Acceptor类似  Acceptor属于mainLoop  TcpConnection属于subLoop
    std::unique_ptr<Socket> socket_;
    std::unique_ptr<Channel> channel_;

    const InetAddress localAddr_;
    const InetAddress peerAddr_;

    //回调
    ConnectionCallback connectionCallback_;//有新连接时的回调
    MessageCallback messageCallback_;//有读写消息时的回调
    WriteCompleteCallback writeCompleteCallback_;//消息发送完成以后的回调
    HighWaterMarkCallback highWaterMarkCallback_;//高水位回调
    CloseCallback closeCallback_;
    size_t highWaterMark_;//设置水位线高度
    
    //数据的缓冲区
    Buffer inputBuffer_;//接收数据的缓冲区
    Buffer outputBuffer_;//发送数据的缓冲区
};

2、 TcpConnection.cc

当在上层调用了某一connection的shutdown()方法,设置当前服务器端的状态是disconnecting,然后执行shutdownInLoop,因为shutdownInLoop关闭了socket的write端,poller就给channel通知了关闭事件,就回调TcpConnection::handclose()方法,handclose()方法相当于将channel所有的事件都去掉。

#include "TcpConnection.h"
#include "Logger.h"
#include "Socket.h"
#include "Channel.h"
#include "EventLoop.h"

#include <functional>
#include <errno.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <strings.h>
#include <netinet/tcp.h>
#include <sys/socket.h>

static EventLoop *CheckLoopNotNull(EventLoop *loop)
{
    if (loop == nullptr)
    {
        LOG_FATAL("%s:%s:%d TcpConnection Loop is null!\n", __FILE__, __FUNCTION__, __LINE__);
    }
    return loop;
}

//已建立连接客户端跟服务器之间的联系
TcpConnection::TcpConnection(EventLoop *loop,
                const std::string &nameArg,
                int sockfd,
                const InetAddress &localAddr,
                const InetAddress &peerAddr)
    : loop_(CheckLoopNotNull(loop))
    ,name_(nameArg)
    ,state_(kConnecting)
    ,reading_(true)
    ,socket_(new Socket(sockfd))
    ,channel_(new Channel(loop,sockfd))
    ,localAddr_(localAddr)
    ,peerAddr_(peerAddr)
    ,highWaterMark_(64*1024*1024)//64M
{
    //下面给channel设置相应的回调函数,poller给channel通知感兴趣的事件发生了,channel会回调相应的操作函数
    channel_->setReadCallback(
        std::bind(&TcpConnection::handleRead,this,std::placeholders::_1)
    );
    channel_->setWriteCallback(
        std::bind(&TcpConnection::handleWrite,this)
    );
    channel_->setCloseCallback(
        std::bind(&TcpConnection::handleClose,this)
    );
    channel_->setErrorCallback(
        std::bind(&TcpConnection::handleError,this)
    );

    LOG_INFO("TcpConnection::ctor[%s] at fd=%d\n",name_.c_str(),sockfd);
    socket_->setKeepAlive(true);//保活机制

}

TcpConnection::~TcpConnection()
{
    LOG_INFO("TcpConnection::dtor[%s] at fd=%d state=%d\n",name_.c_str(),channel_->fd(),(int)state_);
}


void TcpConnection::send(const std::string& buf)
{
    if(state_==kConnected)
    {
        if(loop_->isInLoopThread())
        {
            sendInLoop(buf.c_str(),buf.size());
        }
        else
        {
            loop_->runInLoop(std::bind(
                &TcpConnection::sendInLoop,
                this,
                buf.c_str(),
                buf.size()
            ));
        }
    }
}

/**
 * 发送数据 应用写的快,而内核发送数据慢,需要把待发送数据写入缓冲区,而且设置了水位回调
*/
void TcpConnection::sendInLoop(const void* data,size_t len)
{
    ssize_t nwrote=0;
    size_t remaining=len;//没发送完的数据
    bool faultError=false;//是否发生错误

    //之前调用过该connection的shutdown,不能再进行发送了
    if(state_==kDisconnected)
    {
        LOG_ERROR("disconnected,give up writing!");
        return;
    }

    //表示channel_第一次开始写数据,而且缓冲区没有待发送数据
    if(!channel_->isWriting()&&outputBuffer_.readableBytes()==0)
    {
        nwrote=::write(channel_->fd(),data,len);
        if(nwrote>=0)//发送成功
        {
            remaining=len-nwrote;
            if(remaining==0&&writeCompleteCallback_)
            {
                //既然在这里数据全部发送完成,就不用再给channel设置epollout事件了
                loop_->queueInLoop(
                    std::bind(writeCompleteCallback_,shared_from_this())
                );
            }
        }
        else //nwrote<0
        {
            nwrote=0;
            //errno==EWOULDBLOCK  由于非阻塞,没有数据时正常的返回
            if(errno!=EWOULDBLOCK)//真正的错误
            {
                LOG_ERROR("TcpConnection::sendInLoop");
                if(errno==EPIPE||errno==ECONNRESET)// SIGPIPE  RESET 收到连接重置的请求
                {
                    faultError=true;
                }
            }
        }
    }
    //说明当前这一次write,并没有把数据全部发送出去,剩余的数据需要保存到缓冲区当中,然后给channel
    //注册epollout事件,poller发现tcp的发送缓冲区有空间,会通知相应的sock-channel,调用writeCallback_回调方法
    //也就是调用TcpConnection::handleWrite方法,把发送缓冲区中的数据全部发送完成
    if(!faultError&&remaining>0)
    {
        //目前发送缓冲区剩余的待发送数据的长度
        size_t oldLen=outputBuffer_.readableBytes();
        if(oldLen+remaining>=highWaterMark_
            &&oldLen<highWaterMark_
            &&highWaterMarkCallback_)
        {
            loop_->queueInLoop(
                std::bind(highWaterMarkCallback_,shared_from_this(),oldLen+remaining)
            );//调用水位线回调
        }
        outputBuffer_.append((char*)data+nwrote,remaining);//将待发送数据添加到缓冲区中
        if(!channel_->isWriting())
        {
            channel_->enableWriting();//这里一定要注册channel的写事件,否则poller不会给channel通知epollout
        }
    }
}

// 关闭连接
void TcpConnection::shutdown()
{
    if(state_==kConnected)
    {
        setState(kDisconnecting);
        loop_->runInLoop(
            std::bind(&TcpConnection::shutdownInLoop,this)
        );
    }
}

void TcpConnection::shutdownInLoop()
{
    if(!channel_->isWriting())//说明outputBuffer中的数据已经全部发送完成
    {
        socket_->shutdownWrite();//关闭写端
    }
}

// 连接建立
void TcpConnection::connectEstablished()
{
    setState(kConnected);//设置连接成功状态,初始状态为kConnecting
    channel_->tie(shared_from_this());//绑定channel,让这个channel记录一下TcpConnection对象存活的状态
    channel_->enableReading();//同poller注册channel的epollin事件

    //新连接建立,执行回调
    connectionCallback_(shared_from_this());
}

// 连接销毁
void TcpConnection::connectDestroyed()
{
    if(state_==kConnected)
    {
        setState(kDisconnected);
        channel_->disableAll();//把channel的所有感兴趣的事件,从poller中del掉  相当于epoll_ctl
        connectionCallback_(shared_from_this());//断开连接
    }
    channel_->remove();//把channel从poller中删除掉
}

void TcpConnection::handleRead(Timestamp receiveTime)
{
    int savedErrno=0;
    ssize_t n=inputBuffer_.readFd(channel_->fd(),&savedErrno);
    if(n>0)
    {
        //已建立连接的用户,有可读事件发生了,调用用户传入的回调操作omMessage
        messageCallback_(shared_from_this(),&inputBuffer_,receiveTime);
    }
    else if(n==0)
    {
        handleClose();
    }
    else
    {
        errno=savedErrno;
        LOG_ERROR("TcpConnection::handleRead");
        handleError();
    }
}
void TcpConnection::handleWrite()
{
    if(channel_->isWriting())
    {
        int savedErrno=0;
        ssize_t n=outputBuffer_.writeFd(channel_->fd(),&savedErrno);
        if(n>0)
        {
            outputBuffer_.retrieve(n);
            if(outputBuffer_.readableBytes()==0)//发送完成
            {
                channel_->disableWriting();//由可写变为不可写
                if(writeCompleteCallback_)
                {
                    //唤醒loop_对应的thread线程,执行回调
                    loop_->queueInLoop(//loop一定在TcpConnection所对应的线程中
                        std::bind(writeCompleteCallback_,shared_from_this())
                    );
                }
                if(state_==kDisconnecting)
                {
                    shutdownInLoop();
                }
            }
        }
        else
        {
            LOG_ERROR("TcpConnection::handleWrite");
        }
    }
    else//执行handlewrite,但channel并不是可写事件
    {
        LOG_ERROR("TcpConnection fd=%d is down,no more writing\n",channel_->fd());
    }
}

//poller=>channel::closeCallback=>TcpConnection::handleClose  回调
void TcpConnection::handleClose()
{
    LOG_INFO("TcpConnection::handleClose fd=%d state=%d \n",channel_->fd(),(int)state_);
    setState(kDisconnected);
    channel_->disableAll();//将channel感兴趣的事件从poller上全部删除

    TcpConnectionPtr connPtr(shared_from_this());
    connectionCallback_(connPtr);//执行连接关闭的回调
    closeCallback_(connPtr);//关闭连接的回调   执行的是TcpServer::removeConnection回调方法
}
void TcpConnection::handleError()
{
    int optval;
    socklen_t optlen=sizeof optval;
    int err=0;
    if(::getsockopt(channel_->fd(),SOL_SOCKET,SO_ERROR,&optval,&optlen)<0)
    {
        err = errno;
    }
    else
    {
        err=optval;
    }
    LOG_ERROR("TcpConnection::handleError name:%s-SO_ERROR:%d \n",name_.c_str(),err);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1668018.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

国产操作系统下Chrome的命令行使用 _ 统信 _ 麒麟

原文链接&#xff1a;国产操作系统下Chrome的命令行使用 | 统信 | 麒麟 Hello&#xff0c;大家好啊&#xff01;今天我们来聊聊如何在国产操作系统上使用命令行操作Google Chrome。无论是进行自动化测试、网页截图还是网页数据抓取&#xff0c;使用命令行操作Google Chrome都能…

房屋出租管理系统需求分析及功能介绍

房屋租赁管理系统适用于写字楼、办公楼、厂区、园区、商城、公寓等商办商业不动产的租赁管理及租赁营销&#xff1b;提供资产管理&#xff0c;合同管理&#xff0c;租赁管理&#xff0c; 物业管理&#xff0c;门禁管理等一体化的运营管理平台&#xff0c;提高项目方管理运营效率…

51输出周期为40ms的方波(C+汇编)

题目 已知Fosc12MHz&#xff0c;T1工作于方式1&#xff0c; ①&#xff1a;实现20ms延时&#xff0c;求定时器初值TH0&#xff1f;TL0&#xff1f;写出具体的计算过程。 ②&#xff1a;利用汇编或C语言编程实现输出周期为40ms的方波。 周期为40ms的方波&#xff0c;半周期就…

纯CSS实现步骤条

纯CSS实现纵向Steps步骤条效果 效果图 实现思路 步骤条是一种用于引导用户按照特定流程完成任务的导航条&#xff0c;在各种分步表单交互场景中广泛应用。步骤条通常由编号、名称和引导线三个基本要素组成。本文中要实现的是一个简单的步骤条&#xff0c;包含上述三个基本要素…

Leetcode经典题目之用队列实现栈

P. S.&#xff1a;以下代码均在VS2019环境下测试&#xff0c;不代表所有编译器均可通过。 P. S.&#xff1a;测试代码均未展示头文件stdio.h的声明&#xff0c;使用时请自行添加。 目录 1、题目展示2、题目分析3、完整代码演示4、结语 1、题目展示 前面我们了解过如何实现队列…

webservice和TCP类型接口测试

1.webservice类型接口 1.1.webservice类型接口介绍 Web服务&#xff08;WebService&#xff09;是一种基于网络的应用程序接口&#xff08;API&#xff09;&#xff0c;可通过网络来进行通信和交互。它们使用标准化的协议和格式来进行通信&#xff0c;最常见的是使用XML&#…

C++类与对象的一些练习

1.设计一个名为Rectangle的矩形类&#xff0c;其属性为矩形的长和宽&#xff0c;能计算和输出矩形的周长和面积。 class Rectangle { public:Rectangle(int c0,int k0):m_c(c),m_k(k){}int length()//周长{return 2 * (m_c m_k);}int area()//面积{return m_c * m_k;} privat…

本人通过三次电话沟通,帮助一位海外应届生进了知名公司

本人一直在做Java面试辅导&#xff0c;也经常写些Java求职类的文章&#xff0c;这里为了避免抽象&#xff0c;就写一个具体的成功案例。可以这样说&#xff0c;这位求职者在写简历和找工作时遇到的问题具有一定的普遍性&#xff0c;所以这里本人就以此为例&#xff0c;再进一步…

IPv6路由配置:ripng、ospfv3、静态路由

本次主要是对ipv6路由的配置&#xff0c;先了解ipv6&#xff0c;再进行实验配置 目录 一、&#x1f349; 什么是IPV6&#xff1f;&#x1f31f;IPv6的主要特点 二、&#x1f349;IPv6和IPv4的对比&#x1f31f; 共同点:&#x1f31f; IPv4的优缺点:&#x1f31f; IPv6的优缺点:…

天下大爱唯母爱

岁月轮转&#xff0c;人生寻常&#xff0c;又逢一年母亲节。作为子女&#xff0c;这是所有人都参与节日&#xff0c;也是每一位母亲在繁忙日常中&#xff0c;一个短暂的休息&#xff0c;停下手中的忙碌&#xff0c;听孩子的一声祝福&#xff1a;妈妈辛苦了&#xff0c;母亲节快…

pycharm 里面安装 codeium 插件的时候,不能够弹出登录界面

pycharm 里面安装 codeium 插件的时候&#xff0c;不能够弹出登录界面 pycharm 里面安装 codeium 插件的时候&#xff0c;不能够弹出登录界面--解决如下A pycharm 里面安装 codeium 插件的时候&#xff0c;不能够弹出登录界面–解决如下 #踩坑/pycharm/codeium插件无法登录 安…

修改mysql locahost或者127.0.0.1弱密码问题

一、登录有问题的数据库 sudo mysql -uroot -pxxx -hkde-offline1 -P13306 二、查询user表 这将显示与 root 用户关联的主机、用户名以及加密后的认证字符串(密码)。请注意,authentication_string 列中存储的是经过哈希加密后的密码,而不是原始密码。 MySQL [mysql…

FCOS长文详解

1. 概述 FCOS是一种one-stage、全卷积&#xff08;Fully Convolutional&#xff09;结构的目标检测模型&#xff0c;发表于2019年ICCV。&#xff08;什么是one-stage&#xff1f;&#xff09; 论文原地址&#xff1a;https://arxiv.org/abs/1904.01355 作者源码&#xff1a;ht…

告别数据泥潭:PySpark性能调优的黄金法则

阿佑今天给大家带来个一张藏宝图——使用PySpark进行性能调优的黄金法则&#xff0c;从内存管理到执行计划&#xff0c;再到并行度设置&#xff0c;每一步都是提升数据处理速度的关键&#xff01; 文章目录 Python Spark 详解1. 引言2. 背景介绍2.1 大数据处理技术演变2.2 Apac…

小猫咪邮件在线发送系统源码,支持添加附件

一款免登录发送邮件&#xff0c;支持发送附件&#xff0c;后台可添加邮箱,前台可选择发送邮箱 网站数据采取本地保存&#xff0c;所以使用前请给网站修改权限&#xff0c;否则很多功能将无法使用 安装教程&#xff1a; 1.上传服务器或者主机 2.登录后台&#xff0c;添加发送…

胆子真大,敢搞B站

今天给大家分享一款浏览器插件&#xff0c;能让你的B站在电脑端访问时候会更高级 作者已经开源到Github Star数量还在持续上升中 来看下这款插件究竟具备哪些功能 首先是开启首页干净模式&#xff0c;也就是去除大屏 正常情况我们访问B站是这个样子的~ 开启总开关后 首页的视…

【笔记】从零开始做一个男性人体的流程/躯干篇(超级详细)

躯干整体 大体 1.创建一个正方体&#xff0c;摆好位置 2.实例呀啥的都搞好 3.胸部它是一个前窄后宽的结构 斜方肌 臀部 1.臀部是前宽后窄的结构 2.我们再去侧面调整以下 胸椎向上倾斜&#xff0c;盆骨向下倾斜。脊椎是s形的 3.真实的身体没有这么方正&#xff0c;所以微调…

Adobe Premiere Pro安装

一、安装包下载 链接&#xff1a;https://pan.baidu.com/s/1aYqTSQQutDguKYZE-yNHiw?pwd72l8 提取码&#xff1a;72l8 二、安装步骤 1.鼠标右击【Pr2024(64bit)】压缩包&#xff08;win11及以上系统需先点击“显示更多选项”&#xff09;【解压到 Pr2024(64bit)】。 2.打开…

双向链表(双向带头循环)的增删查改的实现(简单易懂)

一&#xff1a;双向链表的概念 每个节点除开存有数据&#xff0c;还有一个指针指向前一个节点&#xff0c;一个指针指向后一个节点&#xff0c;尾节点和哨兵位互相指向&#xff0c;从而形成一个循环。 二&#xff1a;双向链表的实现第一点&#xff1a; 本文采用三个文件进行实…

Kexp 动态展示 k8s 资源对象依赖关系

kexp[1] 旨在以可视化的方式帮助用户理解和探索 Kubernetes 的能力。 适用场景&#xff1a; 学习和探索 Kubernetes 的功能。 应用开发&#xff0c;提供每个应用的对象图预设。 控制器和操作器的开发&#xff0c;支持动态对象图。 即将推出类似 Postman 的 Kubernetes API …