【C++实现】从0简单理解muduo网络库

news2025/1/10 3:43:50

文章目录

  • TODO
  • 前言
  • 前置知识
    • IO 异步
    • 同步还是异步举例
    • muduo为什么用LT模式
    • vscode 的一些编译方法
  • 底层数据结构分析
  • noncopyable
  • Logger
    • 设置宏来方便打印日志
  • Timestamp 时间类
  • InetAddress
    • InetAddress 是对sockaddr_in结构体的一层封装
  • Channel
    • Poller.h
  • EpollPoller (调用epoll的模块)
    • EPollPoller 的部分实现讲解
  • EventLoop 事件循环(Reactor)
    • CurrentThread
    • 对于__thread的小实验
    • EventLoop 成员
    • wakefd的两种创建方式。
      • socketpair
      • eventfd
    • EventLoop 的部分实现讲解
  • Thread 类
  • EventLoopThreadPool
    • Socket
  • TcpConnection
  • Acceptor MainReactor 核心
    • 总结


网络库其实现在有比较多,像今天博客讲述的陈硕的就是基于cpp写的,只支持在Linux 上运行的。

TODO

用户的注册的callback是否可以使用到muduo库里面的线程池。当业务比较大的时候。

前言


陈硕大神的开源网络库
以往的一篇博客,若学过会更好理解

什么时候用muduo库呢?
muduo库不支持跨平台使用,如果有这方面的需求,可以使用libevent,但是libevent的模型没有muduo库那么强,one loop per thread 的思想实际上更好的用到了多核的性能,采用的一个IO线程进行监听listen fd,其他线程进行处理accept fd 上来的事件。

谈到了muduo库,总会有人问上一嘴libevent。


简单介绍libevent
可以使用libevent,libevent 实际上就是通过条件编译让不同的平台调用不同的接口进行适配。纯C编写的网络库,实际上他是通过填充接口体eventop 来支持各种多路复用接口的,这些方法都是在初始化的时候使用一些static 方法提前注册的。(C语言中非常常用的方式,我这段期间经常能见到类似的操作。)
在这里插入图片描述


阻塞和非阻塞的简单理解:
recv 和 send 的接口都是默认是阻塞的,并且在读取条件或发送条件不满则的时候会阻塞线程。
recv 返回值:
在这里插入图片描述

-1 : 表示连接上有错误,但errno == EAGAIN 本次读取完毕
0 : 表示对端断开连接,读到\0
> 0 表示读取的字节数。
所谓的epoll相关的接口都是在处理等待数据准备的这一阶段。

muduo库依赖了linux的epoll和pthread以及boost库,所以只能用在linux下面。并且muduo库是LT模式的,但是每一个文件描述符都是非阻塞的,经过相关的验证,证实了实际上LT模式下的muduo库的效率其实也是非常高的。


前置知识

muduo库将网络和业务逻辑进行了解耦,使用方只需要在回调函数里面编写业务逻辑即可。
setConnectionCallback,链接的创建回调,用户的连接创建和断开。

setMessageCallback 消息的读写事件回调,专门处理用户的读写事件。

setThreadNum,其中一个线程负责新用户的连接事件,mainThread 负责监听是否有事件,工作线程负责处理每一个事件的读写事件。
muduo 提供对外的接口,比如说连接到来之后可以调用你提前的回调函数,消息读写后盗用对应的回调函数。 一般来说注册了上述的回调就够用了。

下图为一个实例,一个Echo Server

#include "examples/simple/echo/echo.h"
 
#include "muduo/base/Logging.h"
 
using std::placeholders::_1;
using std::placeholders::_2;
using std::placeholders::_3;
 
// using namespace muduo;
// using namespace muduo::net;
 
EchoServer::EchoServer(muduo::net::EventLoop* loop,
                       const muduo::net::InetAddress& listenAddr)
  : server_(loop, listenAddr, "EchoServer")
{
  server_.setConnectionCallback(
      std::bind(&EchoServer::onConnection, this, _1));
  server_.setMessageCallback(
      std::bind(&EchoServer::onMessage, this, _1, _2, _3));
}
 
void EchoServer::start()
{
  server_.start();
}
 
void EchoServer::onConnection(const muduo::net::TcpConnectionPtr& conn)
{
  LOG_INFO << "EchoServer - " << conn->peerAddress().toIpPort() << " -> "
           << conn->localAddress().toIpPort() << " is "
           << (conn->connected() ? "UP" : "DOWN");
}
 
void EchoServer::onMessage(const muduo::net::TcpConnectionPtr& conn,
                           muduo::net::Buffer* buf,
                           muduo::Timestamp time)
{
  muduo::string msg(buf->retrieveAllAsString());
  LOG_INFO << conn->name() << " echo " << msg.size() << " bytes, "
           << "data received at " << time.toString();
  conn->send(msg);
}

muduo库的模型如下,由下图的mainReactor 进行监听Acceptor ,然后将监听上来的文件描述符打包成TcpConnection,由负载均衡算法挂到不同的subReactor上面。

Acceptor 的用法:TcpServer 当中封装了一个Acceptor,注册该类的ReadCallback即可,专门监听listen fd 上是否有新的连接。

在这里插入图片描述


下述的同步和异步都是操作系统的概念,是IO相关的,不是并发那套。

recv 数据准备好就是TCP的缓冲区上有数据了。

而TCP是全双工的,意味着他有发送缓冲区和接收缓冲区,数据结构上来看就是他有两端buffer数据。

对于同步IO的简单理解
int size = recv(sockfd,buf ,1024,0); buf是我们用户层的buffer,不是TCP里面的,实际上就是从TCP的缓冲区搬到应用层; 这个过程其实就是从内核缓冲区到应用层缓冲区的过程。 调用recv就是我们自己搬回来的数据返回; 即IO同步。

在这里插入图片描述

IO 异步


aio_read ,传入sockfd 和 buf 表示关心的文件描述符和对应的缓冲区,然后操作系统拷贝完数据后发送sigio信号。

相当于给内核传送下面的是数据

Node.js 就是基于异步非阻塞模式下的高性能服务器,使用js进行编写的。

唤醒subLoop的原因是因为subLoop有可能epoll_wait ,此时要设置epoll_ctl 添加节点需要唤醒是吗?
那肯定的,需要设置节点的状态,那么肯定是不在epoll_wait 这个状态的了。

同步还是异步举例


A 等待 B做完事情,得到返回值,继续处理。 --同步

A 操作告诉B操作他感兴趣的事件和通知方式,A操作继续执行自己的业务逻辑;当B监听到响应事件发生过后,B会通知A,A开始相应的数据处理逻辑。 --异步

且epoll + fork 不一定就比 epoll + pthread 的效率差,如代表nginx的效率实际上也是非常高。
nginx 可能比 muduo库效率更高。 还是跟运用场景有关。

muduo为什么用LT模式


  • LT 能够兼容很多主机;
  • 低延迟处理;
  • 并且由于ET模式需要一次性将数据读完才返回,导致进入epoll_wait 的时间延迟了,从用户看来,就是当有一条连接进来的时候,延迟变高了。
  • 平均的分配每一个socket上的读取时间,相当于雨露均沾,这并不是坏处,相当于所有的连接都能并发了。

其实muduo库是一种服务器集群的一种现象。

vscode 的一些编译方法

在vscode 的任意一个cpp文件按F1,选择 c/c++ edit 就可以调出 c_cpp_properties的这个文件
在这里插入图片描述
在这里插入图片描述

底层数据结构分析


noncopyable

  • 大部分的类都会继承noncopyable,noncopyable就是将构造和赋值重载删除,那么子类若要进行拷贝构造和赋值都要调用父类的拷贝构造或赋值,那么就会失败。
  • 而构造函数放在protected能够保证只有子类能访问,即子类类型定义对象是没问题的。
// 编译器级别,保证头文件不会被重复包含
#pragma once

/**
 * noncopytable 被继承以后,派生类对象可以正常的构造和析构,但是派生类对象
 * 无法进行拷贝构造和赋值操作。
*/
class noncopyable
{
    public:
    // 拷贝构造和赋值重载是删除
    noncopyable(const noncopyable&) = delete;
    noncopyable& operator=(const noncopyable&) = delete;
    protected:
    // 构造和析构是默认
    noncopyable() = default;
    ~noncopyable() = default;
};

muduo库的日志库做的非常好,甚至提供了专门的动态库供外部连接,即使你不用muduo库的核心内容,挑这个日志库使用也是可以的!!

Logger


日志级别:

  • INFO
  • ERROR
  • FATAL
  • DEBUG

上述DEBUG默认是关闭的,因为DEBUG会打印太多信息,拖慢主机的执行进度。

// 定义日志的级别 INFO ERROR FATAL DEBUG
enum LogLevel
{
    INFO, // 普通信息
    ERROR, // 错误信息
    FATAL, // Core 信息
    DEBUG, //调试信息
};

日志类的使用:

  • 提供setLogLevel设置日志级别
  • log 结合已经设置的日志级别进行输出

日志类是一个单例类,instance()方法就是获取日志的唯一实例对象。

// 输出一个日志类,单例类
class Logger:noncopyable
{
public:
    static Logger& instance();// 获取日志唯一的实例对象
    // 设置日志级别
    void setLogLevel(int level);
    // 写日志
    void log(std::string msg);
private:
    int logLevel_; // 前_ 系统级别变量可能产生冲突!,所以用后_;
    Logger(){}
};

单例类实现:
instance 是非常简单,c++11 的特性能够让他直接成为实例对象。

// 获取日志唯一的实例对象
Logger& Logger::instance()
{
    static Logger logger;
    return logger;
}

设置日志级别的实现

// 设置日志级别
void Logger::setLogLevel(int level)
{
    logLevel_ = level;
}

写日志,设置打印日志的格式如注释,通过方法类Timestamp的now方法打印当前的时间戳转化为的当前时间。下面有解释Timestamp的now方法。

// 写日志  [级别信息] time : msg
void Logger::log(std::string msg)
{
    switch (logLevel_)
    {
    case INFO:
    std::cout << "[INFO]";
    break;
    case ERROR:
    std::cout << "[ERROR]";
    break;
    case FATAL:
    std::cout << "[FATAL]";
    break;
    case DEBUG:
    std::cout << "[DEBUG]";
    break;

    default:
        break;
    }
    // 打印时间,msg
    // now获取时间戳,toString 将时间戳转化为一个buf进行返回。
    std::cout << Timestamp::now().toString() << " : " << msg << std::endl;
}

最终打印出来的日志格式:
在这里插入图片描述

设置宏来方便打印日志


其实这个做法在一些开源库以及纯C的项目是非常见,它可以绑定一些方法来节省调用方传递太多的参数。
为了用户使用的更加方便,通常我们可以通过设置宏来进行解决:
为了防止宏替换出现的一些问题,所以一般我们可以通过do … while 方法来进行解决的。

  1. 获取单例对象
  2. 设置log级别
  3. 给一段buffer长度
  4. 调用snprintf函数 C 库函数 int snprintf(char *str, size_t size, const char *format, ...) 设将可变参数(...)按照 format 格式化成字符串,并将字符串复制到 str 中,size 为要写入的字符的最大数目,超过 size会被截断。 也就是说这个函数是安全的。

通常用\进行换行,而且里面不要嵌入注释。

#define LOG_INFO(logmsgFormat,...) \
    do  \
    {\
        Logger &logger = Logger::instance(); \
        logger.setLogLevel(INFO);\
        char buff[1024] = {0};\
        snprintf(buff, 1024, logmsgFormat, ##__VA_ARGS__); \
    }while(0)

LOG_DUBUG 的打印信息通常太多,所以我们通常可以用宏来控制。

#ifdef MUDEBUG

#else

#endif

时间类,只实现了获取当前时间戳,和通过时间戳转化成 2022/11/12 10:42:12 这种格式时间的接口。

Timestamp 时间类


explicit 的作用:

explicit 是防止隐式类型转化,避免赋值的是时候进行隐式类型转化。

// 时间戳,用来打印日志的时候有用
class Timestamp
{
    public:
    Timestamp();
    // 避免隐式类型转化
    explicit Timestamp(int64_t microSecondsSinceEpoch);
    static Timestamp now(); // 获取当前的时间
    std::string toString() const; // 将当前时间转化为 2022/11/8 这种形式的时间
    private:
    int64_t microSecondsSinceEpoch_;// 长整型变量表示时间
};

now 函数:
获取当前的时间,返回Timestamp对象
time 函数 在time.h里面,作用就是返回当前时间的时间戳,然后我们用int64_t 构造一个对象返回。
time_t 就是 #define __SLONGWORD_TYPE long int

 // 获取当前的时间
Timestamp Timestamp::now()
{
    // 通过调用time获取当前的时间戳
    return Timestamp(time(NULL));
}

toString 函数
实现:就是将时间戳转化成当前时间格式的方法
localtime 函数 struct tm *localtime(const time_t *timer) 使用 timer 的值来填充 tm 结构。timer 的值被分解为 tm 结构,并用本地时区表示。

std::string Timestamp::toString() const
{
    char buf[128] = {0};
    // 之前写的http项目没有加时间
    tm* tm_time = localtime(&microSecondsSinceEpoch_);
    // 年/月/日 时:分:秒
    snprintf(buf,128,"%4d/%02d/%02d %02d:%02d:%02d",
    tm_time->tm_year + 1900,
    tm_time->tm_mon + 1,
    tm_time->tm_mday,
    tm_time->tm_hour,
    tm_time->tm_min,
    tm_time->tm_sec);
    return buf;
}

tm结构体的字段

年是需要加1900 才能使用,月份需要+1;

struct tm *localtime(const time_t *timer);
    
struct tm
{
  int tm_sec;			/* Seconds.	[0-60] (1 leap second) */
  int tm_min;			/* Minutes.	[0-59] */
  int tm_hour;			/* Hours.	[0-23] */
  int tm_mday;			/* Day.		[1-31] */
  int tm_mon;			/* Month.	[0-11] */
  int tm_year;			/* Year	- 1900.  */
  int tm_wday;			/* Day of week.	[0-6] */
  int tm_yday;			/* Days in year.[0-365]	*/
  int tm_isdst;			/* DST.		[-1/0/1]*/
}

InetAddress


InetAddress 是对sockaddr_in结构体的一层封装

explicit 默认初始化值定义的时候都不能够出现。当声明和定义

主要功能:
设置端口号和ip到sockaddr_in 里面

InetAddress::InetAddress(uint16_t port, std::string ip)
{
    bzero(&addr_, sizeof addr_);
    addr_.sin_family = AF_INET;
    addr_.sin_port = htons(port);
    addr_.sin_addr.s_addr = inet_addr(ip.c_str()); // 转化成网络字节序,
}

sprintf 表示不固定长度。

下面接口的主要功能:

  • 获取 sockaddr_in 里面的获取端口号和ip,主要就是网络转主机序列。
std::string InetAddress::toIp() const
{
    // addr_ 里面读ip地址,但是已经是网络字节序了!
    char buf[64] = {0};
    // 因为sin_addr 已经是网络字节序了,需要通过ntop变成主机序列。
    ::inet_ntop(AF_INET, &addr_.sin_addr, buf,sizeof(buf));
    return buf;
}
std::string InetAddress::toIpPort() const
{
    // ip:port
    char buf[64] = {0};
    ::inet_ntop(AF_INET, &addr_.sin_addr, buf,sizeof(buf));
    size_t end = strlen(buf);
    uint16_t port = ntohs(addr_.sin_port);
    sprintf(buf + end, ":%u", port); // %u 就是无符号的格式

    return buf;
}
uint16_t InetAddress::toPort() const
{
    return ntohs(addr_.sin_port);
}

TcpServer 直接相关的类实际上包含 EventLoop, InetAddress,我们已经解决了InetAddress, 我们现在只有先解决EventLoop

TcpServer 是对外的服务器变成所使用的类 当中实际上只有Channel 和 Poller 这两个是外部的类,所以我们先挑其中一个解决。 而 EventLoop 的主要成员就是 Channel 和 Poller

Channel 封装了一个fd,以及需要关心的事件events_,以及已经就绪的事件revents_,以及事件回调的函数方法。

Channel


啥时候不需要只需要前置声明?
Channel 中为何 只放 class EventLoop 的前置声明呢?
因为头文件实际上也可以按需给,如果我们只给出类型的声明,就可以只进行前置声明,后续暴露的.h文件就可以尽量给少一点。

class EventLoop 和 class Timestamp 就是前置声明。

下面就是定义两个函数类型。 相当于 void(*EventCallBack)() 这种。

using EventCallBack = std::function<void()>;
using ReadEventCallBack = std::function<void(Timestamp)>;

成员遍历一览:
revents_ 是EventLoop的poller模块调用epoll_wait的时候返回的具体发生的事件。这个时候EventLoop 会调用set_revents 设置回Channel当中,往后Channel有handlerEvent方法调用就能使用revents进行操作。

下面的回调是由我们用户决定的,也就是OnMessage,OnConnection最终就是在 ,等等方法。

ReadEventCallBack readCallback_;// void()
EventCallBack writeCallback_;   // void(Timestamp);
EventCallBack closeCallback_;
EventCallBack errorCallback_;

注释的比较详细

static const int kNoneEvent;  // 对任何事件都不感兴趣
static const int kReadEvent;  // 对读事件
static const int kWriteEvent; // 对写事件

EventLoop *loop_; // Reactor
const int fd_;    // fd就是关心的文件描述符
int events_;      // 注册fd 感兴趣的事件
int revents_;     // 表示Channel需要处理的事件
int index_;       // 这个是表达当前Channel的状态,标记处于上述三种事件状态

std::weak_ptr<void> tie_; // 当执行回调函数的时候用来判断是否有被remove
bool tied_; 				// 搭配tie_ 一起进行判断,为true才进行tie_的强类型转化

// Channel 通道能够获知fd最终发生的具体事件revents,所以它负责调用具体事件的回调操作
// 每一个Channel都代表一个事件节点,这里的定义回调的类型
ReadEventCallBack readCallback_;// void()
EventCallBack writeCallback_;   // void(Timestamp);
EventCallBack closeCallback_;
EventCallBack errorCallback_;

构造函数
传参loop,表示一个Channel一定会绑定一个他所属的EventLoop。因为Channel需要最终在Poller上进行epoll_wait,但是他们之间不直接相关,而是通过EventLoop,这里绑定EventLoop后可以通过EventLoop找到对应的Poller。

Channel::Channel(EventLoop *loop, int fd)
    : loop_(loop), fd_(fd), events_(0), revents_(0), index_(-1), tied_(-1){}

handlerEvent 是处理事件
下面用到了 Timestamp 说明需要知道Timestamp的大小。所以最好给出头文件。

void handlerEvent(Timestamp receiveTime);

表明当前fd的状态的相关函数。

// 返回fd当前的事件状态
bool isNoneEvent() const {return events_ == kNoneEvent;}
bool isWriting() const {return events_ & kWriteEvent;}
bool isReading() const {return events_ & kReadEvent;}

shared_from_this 的作用:

下述是一个错误示范,当我们需要一个指针来构造一个智能指针,用下面的做法一定会错误。

原因是this是裸指针,我们这么操作和用同一个裸指针给两个智能指针赋值是一个意思

#include<iostream>
using namespace std;
#include<memory>
class A
{
public:
	A(int y = 0) :x(y) { }
	A* getthis()
	{
		return this;
	}
	int x;
};


int main(void)
{
	shared_ptr<A>sp1(new A());
	shared_ptr<A>sp2(sp1->getthis());
	cout << sp1.use_count() << endl;
	cout << sp2.use_count() << endl;
}

shared_from_this() 的作用

shared_from_this的返回值是一个智能指针,他就是把返回的指针转化成shared_ptr 来使用,这个时候引用计数实际上进行了++,所以use_count 为2,程序不会出错。

class A :public enable_shared_from_this<A>
{
public:
	A(int y = 0) :x(y) { }
	shared_ptr<A> getthis()
	{
		return shared_from_this();
	}
	int x;
};


int main(void)
{
	shared_ptr<A>sp1(new A());
	shared_ptr<A>sp2(sp1->getthis());
	cout << sp1.use_count() << endl;
	cout << sp2.use_count() << endl;
}

设置回调的方法也很简单:
他们的实现也比较简单,直接move一下给自己用就行了。

 // 设置回调函数对象
void setReadCallback(ReadEventCallBack cb) { readCallback_ = std::move(cb); }
void setWriteCallback(EventCallBack cb) { writeCallback_ = std::move(cb); }
void setCloseCallback(EventCallBack cb){closeCallback_ = std::move(cb);}
void setErrorCallback(EventCallBack cb){errorCallback_ = std::move(cb);}

tie 函数:
用弱智能指针判断是否channel被remove掉。防止调用HandlerEvent的时候Channel已经释放了,这个tie函数会在TcpConnection建立成功的回调函数里面被调用。

// 防止当channel 被手动remove掉,channel还在执行回调操作
void tie(const std::shared_ptr<void> &);

为什么要提供set_revents 函数?

因为Channel本身只存储了文件描述符,以及就绪后对应事件的处理方法,所以需要有一个函数,让Poller模块有方法能够告知Channel需要对哪些事件就绪进行通知。

int events() const { return events_; }
// 为什么设置这里的revent? 而不是通过epoll进行监听然后设置呢?
int set_revents(int revt) { revents_ = revt; return revents_;}
//bool isNoneEvent() const { return events_ == kNoneEvent; }
 // 返回fd当前的事件状态
bool isNoneEvent() const {return events_ == kNoneEvent;}

倘若有读写事件需要关心,那么Channel肯定得知道。那么知道之后,我们肯定需要让Poller模块去帮我们关心,所以下面的update方法,实际上就是让我们的EventLoop模块去关心这个文件描述符

实际上由于Poller和Channel都有一个EventLoop* ,而EventLoop有 std::unique_ptr<Poller> poller_;字段可以找到Poller结构体,Poller结构体就可以进行更新了。

void enableWriting()
{
    events_ |= kWriteEvent;
    update(); // 调用epoll_ctl 让红黑树关心这个事件
}
// one loop pre thread
EventLoop* ownerLoop() {return loop_;}

就是update调用EvnetLoop的方法。

void Channel::update()
{
    // 通过Channel 找到 EventLoop。找到Poller 调用对应的事件
    // add code...
    loop_->updateChannel(this);
}

定义成员函数:

tie实际上就是设置一个观察者,让tie_这个弱智能指针去观察对象。

// 观察者,观察一个强智能指针
void Channel::tie(const std::shared_ptr<void>& obj)
{
    tie_ = obj;
    tied_ = true;
}

当改变channel 所表示fd的events事件后,update负责在poller里面更改fd响应的事件。即调用epoll_ctl函数。

EventLoop =》 ChannelList Poller

update就是上面所说的,就是简单的调用updateChannel 方法而已。

void Channel::update()
{
    // 通过Channel 找到 EventLoop。找到Poller 调用对应的事件
    // add code...
    loop_->updateChannel(this);
}
// 在channel所属的EventLoop,把当前的channel删除掉
void Channel::remove()
{
    // add code
    loop_->removeChannel(this);
}

loop_最终会调用到EpollPoller中的removeChannel

void EPollPoller::removeChannel(Channel *channel)
{
    int fd = channel->fd();
    channels_.erase(fd); // 删除map中映射

    LOG_INFO("func=%s => fd=%d  \n", __FUNCTION__, channel->fd());

    int index = channel->index();
    if (index == kAdded) // 如果还是在状态2,就删掉
    {
        update(EPOLL_CTL_DEL, channel);
    }
    channel->set_index(kNew); // 设置状态为可添加
}

传入的shared_ptr 究竟是个啥,要怎么理解?
tied_ 是一个bool类型。
理解channel最重要的两个函数,handlerEvent 最重要的就是判断是否对象还存在,是否能够继续调用;
那么tied_ 和 tie() 一个成员变量,一个成员函数什么时候被调用呢?
tied_ 是在 tie 方法设置之后 才是true,tie方法又是上层TcpConnection调用连接建立成功的方法connectEstablished来进行建立成功的。说明当连接建立好的时候,这个时候如果TcpConnection还存在,表明连接状态是良好的,此时我们调用EPollPoller中handlerEvent的时候能保证执行正确。
如果TcpConnection因为某种缘故断开了,这个时候处理这个handlerEvent事件已经没有意义,因为也发送不回去了,此时上层的shared_ptr已经跟着TcpConnection一起消失,大佬的编码是就不处理了。
这里大佬的编码中采用不处理的方式。因为Channel已经跟着TcpConnection一同释放了。所以这里没有else分支。
handlerEventWithGuard 则是对回调对应的具体的事件。

// fd得到poller通知以后处理事件的。
// 成员变量tied_ 主要就是判断Channel是否又被tied_ 过,以及是否正在被tied_
void Channel::handlerEvent(Timestamp receiveTime)
{
    std::shared_ptr<void> guard;
    if(tied_)
    {
        guard = tie_.lock();// 将弱指针提升,返回值不为NULL表示成功
        if(guard)
        {
            handlerEventWithGuard(receiveTime);
        }// 无需else分支,因为Channel已经和TcpConnection一起走了。
    }
    else
    {
        handlerEventWithGuard(receiveTime);
    }
}

// 根据poller通知的channel发生的具体事件,由channel执行对应的回调方法。
void Channel::handlerEventWithGuard(Timestamp receiveTime)
{
    LOG_INFO("channel handlerEvent revents:%d\n",revents_);
    if((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN))
    {
        if(closeCallback_)
        {
            closeCallback_();
        }
    }

    if(revents_ & EPOLLERR)
    {
        if(errorCallback_)
        {
            errorCallback_();
        }
    }

    if(revents_& (EPOLLIN | EPOLLPRI))
    {
        //可读事件
        if(readCallback_)
        {
            readCallback_(receiveTime);
        }
    }
    if(revents_ & EPOLLOUT)
    {
        if(writeCallback_)
        {
            writeCallback_();
        }
    }
}

Poller.h


有两个成员变量

using ChannelList = std::vector<Channel*>; 

这里的key就是sockfd,这里的value 就是 key 对应的Channel的参数类型。

using ChannelMap = std::unordered_map<int,Channel*>;

Poller是一个基类, 他的poll这个接口是为了让子类可以用epoll或者是poll进行轮询。相当于让派生类来实现多态;但我们只实现其中的EpollPoller

保留统一的接口进行IO复用

virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;

updateChannel 传参的地方this

/// Changes the interested I/O events.
/// Must be called in the loop thread.
virtual void updateChannel(Channel* channel) = 0;

Poller中的removeChannel 就是往红黑树做操作,removeChannel 也是接受一个channel对象指针,通过这个对象拿到对应信息,对我们的红黑树做操作。
那么Channel中的removeChannel函数又会做什么呢?
就是调用Poller的removeChannel,仅仅如此~

那么哪里会调用removeChannel呢?
Acceptor的析构函数,会把监听套接字从主Loop删除。

/// Remove the channel, when it destructs.
/// Must be called in the loop thread.
virtual void removeChannel(Channel* channel) = 0;

hasChannel 判断参数channel 是否在当前Poller 当中。
在Channel的析构函数中使用,但仅仅是进行断言。

bool hasChannel(Channel* channel) const;

newDefaultPoller为什么不放在Poller.cc里面,实际上放进去并没有错,但是由于他要返回的对象指针会包含两个对象,EpollPoller.h PollPoler.h 两个,所以说他会新开一个文件.h来使用

获取到一个事件循环的poller; 实际上这个就是单例模式的getInstance,返回的是一个基类的Poller对象指针。

static Poller* newDefaultPoller(EventLoop* loop);// 注意这里的操作,在DefaultPoller.cc实现

DefaultPoller.h文件中 基类不要引用派生类 放在这里相当于放到一个公共的头文件里面。

#include"Poller.h"
#include"EPollPoller.h"
#include<stdlib.h>

Poller* Poller::newDefaultPoller(EventLoop* loop)
{
    if(::getenv("MUDUO_USE_POLL"))
    {
        return nullptr;// 生成poll的实例
    }
    else
    {
        // 因为设计到具体要new 这个对象了,所以说要弄一个DefaultPoller.h
        return new EPollPoller(loop);// 生成epoll的实例
    }
}

EpollPoller (调用epoll的模块)


继承的时候需要知道这个EpollPoller 继承自Poller,所以需要知道Polelr的大小。 所以需要引头文件

class EPollPoller : public Poller

override放在子类,表明希望父类是一个虚函数,如果不是,那么会生成编译错误。

override的作用,有时候眼瞎,看不清函数,子类可能没有重写父类的接口,而是自己定义了一个函数,害怕这种情况的发生。

epoll_event 是epoll_wait 定义的一次性上层能拿到的就绪队列长度。

using EventList = std::vector<struct epoll_event>;
int epollfd_; // epoll_create 的返回值
EventList events_; //  就是上层的一次性能拿上来的最大的这个底层就绪的数组。

poll实际上就是一个 epoll 还是 poll的一个分歧点

Timestamp poll(int timeoutms,ChannelList* activeChannels) override;

基类定义的类型,子类也定义,说明using 定义的类型不能继承下来。

using EventList = std::vector<struct epoll_event>;
static const int kInitEventListSize = 16; // std::vector<struct epoll_event> 的初始值
// 更新channel通道,这里实际上就是对channel里面的事件events给epoll进行设置。
void update(int operation,Channel* channel);

EpollPoller中的poll方法,EventLoop 里面有activeChannels_实际上也是活跃的数量,然后EventLoop 在loop中处理对应的活跃的Channel,即进行回调。

// 填写活跃的链接,交付给EpollEvent,然后EventLoop 再把这个给Channel,Channel再回调
void fillActiveChannels(int numEvents,ChannelList* activeChannels) const;

EPollPoller 的部分实现讲解


EpollPoller中封装了对于红黑树的操作。即调用了epoll_ctl对红黑树的节点做更新,删除。

里面有三种状态

// channel 未添加到EPollPoller   中
const int kNew = -1; // EPollPoller   的成员index_ = -1
// channel 已添加到poller中
const int kAdded = 1;
// channel从poller中删除
const int kDeleted = 2;

epoll_create1 的作用在于他可以设置flags,其中flags 中有

EPOLL_COLEXEC 就是 父进程创建的,fork之后子进程不会也打开这个文件描述符。 默认子进程会打开父进程所打开的文件描述符。 kInitEventListSize 源码当中就是16。

在这里插入图片描述

EPollPoller::EPollPoller(EventLoop* loop) // EPOLL_CLOEXEC , kInitEventListSize
   : Poller(loop), epollfd_(::epoll_create1(EPOLL_CLOEXEC)), events_(kInitEventListSize) // vector<epoll_event>
{
    if (epollfd_ < 0)
    {
        LOG_FATAL("epoll_create errir:%d \n", errno);
    }
} // epoll_create

析构函数,相当于我们删掉底层的Poller

EPollPoller::~EPollPoller()
{
    // 编译器检查是否有实现,基类是否是virtual
    ::close(epollfd_);
}

为什么每一个Channel都要保持三种状态?
在updateChannel,removeChannel 的时候,会根据Channel的状态进行调整,Channel不会remove后直接删掉。
因为Channel的声明周期是由TcpConnection维护的,只有TcpConnection消失,Channel才会真正消失。
在这里插入图片描述

channel 里面的index 就是给EpollPoller里面的状态。 对应于未添加,已添加,删除 都在下面这个函数。

主要逻辑:

EventLoop(事件循环, 放着所有的channel)

        ChannelList            	Poller

​						ChannelMap <fd,channel*>   (只有在Poller里面注册过的,才会在ChannelMap里面出现)
// 对应epoll_ctl
void EPollPoller::updateChannel(Channel *channel)
{
    const int index = channel->index();
    if (index == kNew || index == kDeleted)
    {
        // 没有添加过的状态就是kNew,原来删除的和没有添加过的处理动作动作EPOLL_CTL_ADD 
        if (index == kNew)
        {
            int fd = channel->fd();
            channels_[fd] = channel;
        }
        // 如果已经是delete状态,那么可以直接复用,也就是新添加一个
        channel->set_index(kAdded);
        update(EPOLL_CTL_ADD, channel);
    }
    else // channel 已经在poller上注册过了
    {
        int fd = channel->fd();
        // isNoneEvent 存在说明此次我们选择删除。
        if (channel->isNoneEvent())
        {
            update(EPOLL_CTL_DEL, channel);
            channel->set_index(kDeleted);
        }
        else
        {
            // 对某些事件感兴趣
            update(EPOLL_CTL_MOD, channel);
        }
    }
}

removeChannel的时候考虑kAdded情况其实足够,因为其他两种情况Channel都已经不在红黑树上了。

void EPollPoller::removeChannel(Channel *channel)
{
    int fd = channel->fd();
    channels_.erase(fd); // 删除map中映射

    LOG_INFO("func=%s => fd=%d  \n", __FUNCTION__, channel->fd());

    int index = channel->index();
    if (index == kAdded) // 如果还是在状态2,就删掉
    {
        update(EPOLL_CTL_DEL, channel);
    }
    channel->set_index(kNew); // 设置状态为可添加
}

updateChannel:

  1. 如channel没有被注册,或者是之前添加然后被删除了,那么我们统一插入epoll模型中。
  2. 如已经注册,则查看是否有 events_ 是否和 kNoneEvent 相同 (这个封装成了一个函数罢了), 那么执行删除,否则调用update,更新红黑树上的节点的关心事件。
// 对应epoll_ctl
void EPollPoller::updateChannel(Channel *channel)
{
    const int index = channel->index();
    LOG_INFO("func=%s => fd=%d events=%d index=%d \n", __FUNCTION__, channel->fd(), channel->events(), index);
    if (index == kNew || index == kDeleted)
    {
        // 没有添加过的状态就是kNew,原来删除的
        if (index == kNew)
        {
            int fd = channel->fd();
            channels_[fd] = channel;
        }
        channel->set_index(kAdded);
        update(EPOLL_CTL_ADD, channel);
    }
    else // channel 已经在poller上注册过了
    {
        int fd = channel->fd();
        if (channel->isNoneEvent())
        {
            update(EPOLL_CTL_DEL, channel);
            channel->set_index(kDeleted);
        }
        else
        {
            // 对某些事件感兴趣
            update(EPOLL_CTL_MOD, channel);
        }
    }
}

update函数:
非常简单,实际上把channel 的东西全部取出来,赋值给一个栈上的event,然后调用epoll_ctl进行对红黑树的操作。

// 更新channel通道 epoll_ctl add/mod/del
void EPollPoller::update(int operation, Channel *channel)
{
    epoll_event event;
    ::memset(&event, 0, sizeof(event));
    int fd = channel->fd();
    event.events = channel->events();
    event.data.fd = fd;
    event.data.ptr = channel; // 赋值给void* 了


    if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
    {
        if (operation == EPOLL_CTL_DEL)
        {
            LOG_ERROR("epoll_ctl del error %d\n", errno);
        }
        else
        {
            LOG_FATAL("epoll_ctl add/mod error%d\n", errno);
        }
    }
}

poll方法:

主要调用epoll_wait,然后对numEvents 也就是多少个文件描述符有事件就绪了。

由于struct event 这个结构体我们用vector来存,所以我们需要&*event_.begin() 这种方式来取数组的首元素。

saveError 可能是保存epoll_wait 之后的结果,防止执行到后面的时候saveError被人修改了。

// poll 主要调用epoll_wait ,moduo 库用的是LT模式,activeChannels 是给到EventLoop的
Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels)
{
    // 实际上使用LOG_DEBUG更加合理
    LOG_INFO("func=%s => fd total count:%d\n", channels_.size());
    // 我们这里用的是vector,所以需要向下面这样写
    int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs);
    // poll在多个线程可能都会被调用,所以提前保存一份
    int saveErrno = errno;
    Timestamp now(Timestamp::now());
    // 事件就绪后,记录时间戳,然后将revetns记录到
    if (numEvents > 0) // 只调用一次即可
    {
        // 已经发生过了事件
        LOG_INFO("%d events happened \n", numEvents);
        fillActiveChannels(numEvents, activeChannels);
        // 这一轮所有监听的events都上来了,底层的就绪队列可能很大,所以此时需要扩容
        if (numEvents == events_.size())
        {
            events_.resize(events_.size() * 2);
        }
    }
    else if (numEvents == 0)
    {
        LOG_DEBUG("%s timeout! \n", __FUNCTION__);
    }
    else
    {
        if (saveErrno != EINTR)
        {
            errno = saveErrno;
            LOG_ERROR("EPollPoller::poll()");
        }
    }
    return now;
}

fillActiveChannels函数:

EpollPoller 里面没有ChannelList ,他是由EventLoop调用的,也就是一次epoll_wait 的结果直接给了EventLoop,他自己不保存的。

就是填充ChannelList 这个数组。

// 填写活跃的连接,EPollPoller:: 注意加上!
void EPollPoller::fillActiveChannels(int numEvents,ChannelList* activeChannels) const
{
    for (int i = 0; i < numEvents; ++i)
    {
        // 隐式类型转化
        Channel *channel = static_cast<Channel *>(events_[i].data.ptr);
        channel->set_revents(events_[i].events);
        // 事件就在这里了。
        activeChannels->push_back(channel);
    }
}

EventLoop 事件循环(Reactor)

EPollPoller 是事件分发器, EventLoop是反应堆模型。

EventLoop 这里是线程本地存储的的。
变量即使线程本地存储的,又是extern的。

extern 是用来声明一个变量的,若在声明的后面直接跟定义,那么肯定是有问题的。

和普通文件一样,定义只要放在.h 里面就一定会出问题!

例外:static 修饰的变量可以直接定义,因为他的连接属性已经改变了

CurrentThread

在这里插入图片描述

tid的访问时系统调用,所以弄了一个cacheTid,防止每次都调用系统调用拿,加快效率。

对于__thread的小实验


头文件extern __thread int t_cachedTid;

extern 的时候 所有的线程extern 就是自己线程中的这个变量。

下面的三个文件进行测试extern static 搭配 __thread 使用的场景:

// test.h
#pragma once
#include<unistd.h>
#include <sys/syscall.h>

void func();
    // 线程本地存储
    //extern __thread int t_cachedTid;
    extern __thread int t_cachedTid = 0;
    //static __thread int t_cachedTid = 0;

    void cacheTid();
    
    inline int tid()
    {
        if(__builtin_expect(t_cachedTid == 0,0))
        {
            cacheTid();
        }
        return  t_cachedTid;
    }

// test.cc
#include"test.h"
#include<iostream>
using namespace std;
//__thread int t_cachedTid = 0;
    void cacheTid()
    {
        if(t_cachedTid == 0)
        {
            // 通过linux系统调用获取tid值
            t_cachedTid = static_cast<pid_t>(::syscall(SYS_gettid));
        }
    }
int main()
{
  cout << tid() << endl;
  func();
  return 0;
}

// test2.cc
#include"test.h"
#include<iostream>
using namespace std;

void func()
{
  cout <<  tid() << endl; 
}
namespace CurrentThread
{
    __thread int t_cachedTid = 0;
    void cacheTid()
    {
        if(t_cachedTid == 0)
        {
            // 通过linux系统调用获取tid值
            t_cachedTid = static_cast<pid_t>(::syscall(SYS_gettid));
        }
    }
}

EventLoop 成员


EventLoop 里面的 vector*,别名 ChannelList,就是poller 返回给EventLoop的哪些文件描述符已经就绪了。

using Functor = std::function<void()>;
using ChannelList = std::vector<Channel*>;
std::atomic_bool looping_; // 原子操作,通过CAS实现
std::atomic_bool quit_;    // 标识退出loop循环

每一个Channel 都会在自己所在的subLoop里面执行。

wakeupFd 就是 当mainLoop 里面获得一个新连接,就可以通过 subLoop 里面的wakeupFd,用来唤醒指定的subLoop,subLoop 会将这个wakupFd_ 放在epoll_wait 里面进行等待。

int wakeupFd_; // 重要,唤醒subReactor,通过轮询算法选择一个subloop,通过该成员唤醒subloop处理channels

callingPendingFunctors_;

pendingFunctors; mutex_ 是用来保护这里的待执行的vetcor

std::mutex mutex_;                      // 互斥锁,用来保护上面vector容器的线程安全 
std::atomic_bool callingPendingFunctors_;// 标识当前loop是否有需要执行的回调操作
std::vector<Functor> pendingFunctors_;  // 存储loop需要执行的所有的回调操作

wakefd的两种创建方式。

socketpair

这个与管道不同就是双端都是可读可写的。 这里走的是网络通信

在这里插入图片描述

eventfd

eventfd 是内核的事件通知机制。muduo库中就是用这个系统调用创建每一个线程的wakeupFd的

在这里插入图片描述

mainLoop 选择 并且唤醒对应subLoop 用的字段。 就是有一个新的连接要给subLoop,但是subLoop可能在epoll_wait,所以需要将他唤醒。

int wakeupFd_; // 重要,唤醒subReactor,通过轮询算法选择一个subloop,通过该成员唤醒subloop处理channels
 // 在当前loop中执行cb
void runInLoop(Functor cb);
// 把cb放入队列中,唤醒loop所在的线程,执行cb
void queueInLoop(Functor cb); 
// 用来唤醒loop所在的线程的 mainReactor 唤醒 subReactor的
void wakeup();

很明显,就是调用poller 里面的updateChanel 方法而已。

void EventLoop::updateChannel(Channel* channel)
{
  assert(channel->ownerLoop() == this);
  assertInLoopThread();
  poller_->updateChannel(channel); // 这里是poller执行
}

证明EventLoop对象是在创建他的线程里面。

// 判断EventLoop 对象是否在自己的线程里面。
bool isInLoopThread() const {return threadId_ == CurrentThread::tid();}

如果isInLoopThread 不满足,说明evnetLoop对象不在他所创建的线程当中.

目前理解: 就是vector 里面放着待执行的回调方法,而当当前线程执行这个vector的时候,需要是Channel对应的EventLoop里面有才行,否则就重新放回去,然后唤醒它所在的线程。

// 把cb放入队列中,唤醒loop所在的线程,执行cb
void EventLoop::queueInLoop(Functor cb)
{
    {
        std::unique_lock<std::mutex> lock(mutex_);
        pendingFunctors_.emplace_back(cb);
    }
    // 唤醒相应的需要执行上面的回调操作的loop线程
    // || callingPendingFunctors_ 标识当前loop正在执行回调,此时loop又有了新的回调。
    if(!isInLoopThread() || callingPendingFunctors_)  // 这里的callingPendingFunctors_待解释
    {
        wakeup();// 唤醒loop所在线程
    }
}

EventLoop 的部分实现讲解


t_loopInThisThread 在此处声明+定义,每一个线程只有一份。

// 防止一个线程创建多个EventLoop thread local
__thread EventLoop *t_loopInThisThread = nullptr;

unique_ptr 就是里面的Poller的超时时间,10s

// epoll的超时时间,这里是Poller的超时时间
const int kPollTimeMs = 10000;

创建一个wakefd,用了eventfd这个函数来创建

// 创建wakeupfd,用来notify唤醒subReactor处理新来的channel
int createEventfd()
{
    int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (evtfd < 0)
    {
        LOG_FATAL("eventfd error:%d \n", errno);
    }
    return evtfd;
}

构造函数的字段讲解:

looping_ 就是循环的接口

quit_ 表示退出用的接口

callingPendingFunctors_ 是否有需要处理的回调

threadId 就是当前的线程号,通过CurrentThread的tid就可以获取。

poller_ 这里其实就是创建一个poller, 一个EventLoop 会有一个Poller

wakeupFd_ 这个就是createEventfd() 调用那个系统调用然后创建一个wakeupFd

wakeupChannel_ : 需要Poller 关心这个wakeupfd

currentActiveChannel_ : 目前有事件已经就绪的channel

handlerRead 就是读取一个整型的数据,目的就是为了唤醒subLoop ,由于调用这个方法的时候在类内部,所以用绑定器提前给定this的位置。

enableReading 是设置wakeupfd 关心 读事件。

此时每一个eventLoop 都会监听wakeupChannel 的EPOLLIN 读事件了。然后设置wakeupfd事件就绪的回调事件。

EventLoop::EventLoop()
    : looping_(false), quit_(false), callingPendingFunctors_(false), threadId_(CurrentThread::tid()), poller_(Poller::newDefaultPoller(this)), wakeupFd_(createEventfd()), wakeupChannel_(new Channel(this, wakeupFd_)), currentActiveChannel_(nullptr)
{
    LOG_DEBUG("EvetnLoop created %p in thread %d \n", this, threadId_);
    if (t_loopInThisThread)
    {
        LOG_FATAL("Another EventLoop %p exists in this thread %d \n", t_loopInThisThread, threadId_);
    }
    else
    {
        t_loopInThisThread = this;
    }
    // 设置wakeupFd的监听事件类型,以及发生事件后的回调方法
    wakeupChannel_->setReadCallback(std::bind(&EventLoop::handlerRead, this));
    // 每一个eventloop都将监听wakeupchannel的EPOLLIN读时间
    wakeupChannel_->enableReading();
}

wakeup 析构的时候可以将通道关闭,资源释放。

EventLoop::~EventLoop()
{
    wakeupChannel_->disableAll();
    wakeupChannel_->remove();
    ::close(wakeupFd_);
    t_loopInThisThread = nullptr;
}

开启事件循环,最重要的一个函数!

主要逻辑:

// 开启事件循环
void EventLoop::loop()
{
    // 开启事件循环
    looping_ = true;
    // 退出设置为false
    quit_ = false;

    LOG_INFO("EventLoop %p start looping \n",this);
    // 只要没有quit_ 不满足,则一直运行
    while(!quit_)
    {
        // 主要监听两类fd,一种是client的fd(listen上来的),一种是wakeupfd(唤醒subloop的fd)
        activeChannels_.clear();
        // 这里可以看到,是让poller模块将activeChannels_填充好带上来
        pollReturntime_ = poller_->poll(kPollTimeMs,&activeChannels_);
        // 然后对于所有事件就绪的Channel ,拿出来调用回调函数
        for(Channel* channel: activeChannels_)
        {
            // Poller 监听哪些channel发生时间了,然后上报给EventLoop,通知channel通知
            channel->handlerEvent(pollReturntime_);
        }
        // 执行当前EventLoop事件循环需要处理的回调操作
        /**
         * IO 线程 mainLoop accept| =》 fd channel 打包fd, channel得分配给subloop
         * mainLoop事先注册一个回调(需要subloop来执行)  wakeup subloop后,执行下面的cb操作
        */
        doPendingFunctors(); // 此时只有一种情况,MainLoop 给我新的Channel
    }

    LOG_INFO("EventLoop %p stop looping. \n",this);
    looping_ = false;
    
}

回忆 Channel 的handlerEvent 做了啥工作。

// fd得到poller通知以后处理事件的。
// 成员变量tied_ 主要就是判断Channel是否又被tied_ 过,以及是否正在被tied_
void Channel::handlerEvent(Timestamp receiveTime)
{
    std::shared_ptr<void> guard;
    if(tied_)
    {
        guard = tie_.lock();// 将弱指针提升,返回值不为NULL表示成功
        if(guard)
        {
            handlerEventWithGuard(receiveTime);
        }
    }
    else
    {
        handlerEventWithGuard(receiveTime);
    }
}

以及对于doPendingFunctors的理解:

有一种情况,当主Loop 给从Loop分配文件描述符的时候,此时往wakefd调用回调后苏醒了,从Loop就需要调用一下doPendingFunctors,判断是否是需要我往红黑树插上新的节点。

困惑:这里要是没有事件,lock这把锁每次都要加,会不会影响效率。

// 为了并发操作,定义局部的vector
 void EventLoop::doPendingFunctors()
 {
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;
    {
        std::unique_lock<std::mutex> lock(mutex_);
        functors.swap(pendingFunctors_);
    }
    // 上面是对效率的考量,主要是下面需要进行执行任务
    for(const Functor& functor:functors)
    {
        functor(); // 执行当前loop需要执行的回调操作
    }
    callingPendingFunctors_ = false;
 }
// 为了并发操作,定义局部的vector
 void EventLoop::doPendingFunctors()
 {
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;
    {
        std::unique_lock<std::mutex> lock(mutex_);
        functors.swap(pendingFunctors_);
    }
    // 上面是对效率的考量,主要是下面需要进行执行任务
    for(const Functor& functor:functors)
    {
        functor(); // 执行当前loop需要执行的回调操作
    }
    callingPendingFunctors_ = false;
 }

quit 函数:

若是自己的loop里面调用quit,说明loop 肯定不在poll函数了。

困惑:怎么会有非loop线程调用quit?在subloop(worker)中,调用mainLoop(IO线程)里面的quit?

是因为EventLoopThreadPool一开始是在主线程创立的,若此时其他线程上的EventLoopThread需要删除掉,则需要唤醒从线程,由从线程进行删除。

在这里插入图片描述

// 退出事件循环 1.loop在自己的线程中 2. 非loop的线程中,调用loop的quit
// subloop1 退出 subloop2 不理解
void EventLoop::quit()
{
    quit_ = true;
    if(! isInLoopThread()) // 如果是在其他线程中,调用quit; 在一个subloop(worker)中调用mainLoop的quit
    {
        wakeup();
    }
}

runInLoop:

在当前的loop,就执行回调方法; 否则就执行cb

void EventLoop::runInLoop(Functor cb)
{
    if(isInLoopThread()) // 在当前的loop线程中,执行cb
    {
        cb();
    }
    else // 在非loop 线程中执行cb
    {
        queueInLoop(cb);
    }
}

queueInLoop:

callingPendingFunctors_ 表示已经被唤醒,没有阻塞在loop上,避免执行期间又有新的回调到来,而无法执行。

他依旧会到poll,但是我已经wakeup,它poll立马又会返回。

// 把cb放入队列中,唤醒loop所在的线程,执行cb
void EventLoop::queueInLoop(Functor cb)
{
    {
        std::unique_lock<std::mutex> lock(mutex_);
        pendingFunctors_.emplace_back(cb);
    }
    // 唤醒相应的需要执行上面的回调操作的loop线程
    // || callingPendingFunctors_ 标识当前loop正在执行回调,此时loop又有了新的回调。
    if(!isInLoopThread() || callingPendingFunctors_)  // 这里的callingPendingFunctors_待解释
    {
        wakeup();// 唤醒loop所在线程
    }
}

wakeup就是通过给wakeupFd写一个内容,让线程起来。 唤醒loop所在的线程

// 用来唤醒loop所在的线程的 向wakeupfd写一个数据
void EventLoop::wakeup()
{
    uint64_t one = 1;
    ssize_t n = write(wakeupFd_,&one,sizeof one);
    if(n != sizeof one)
    {
        LOG_ERROR("EventLoop::wakeup() wirtes %lu bytes instead of 8\n",n);
    }
}
// 为了并发操作,定义局部的vector
 void EventLoop::doPendingFunctors()
 {
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;
    {
        std::unique_lock<std::mutex> lock(mutex_);
        functors.swap(pendingFunctors_);
    }
    // 上面是对效率的考量,主要是下面需要进行执行任务
    for(const Functor& functor:functors)
    {
        functor(); // 执行当前loop需要执行的回调操作
    }
    callingPendingFunctors_ = false;
 }

思考:

在mainLoop 和 subLoop 添加一个生产者消费者线程安全的队列。 mainLoop负责放channel,subLoop 负责拿channel。 若是这样,逻辑就清晰,但是muduo库里面只通过wakeupfd进行调用。

Thread 类


线程函数的类型。 若是要带参数,可以使用绑定器进行绑定。

using ThreadFunc = std::function<void()>;

这里封装了c++11的线程库,用了智能指针进行封装。

std::shared_ptr<std::thread> thread_; // 这里的thread是线程里面的thread

注意这里的tid_ 也不是pthread_self() 的结果。

void setDefaultName();
bool started_;
bool joined_;  
std::shared_ptr<std::thread> thread_; // 这里的thread是线程里面的thread
pid_t tid_;			// 线程的tid,这个值是top命令查看出来的
ThreadFunc func_;    // 定义线程的函数对象
std::string name_; // 线程名,调试的时候打印用的
static std::atomic_int numCreated_; // 产生的线程的数量

构造函数,默认start_ 不启动,tid_ 为0,启动的线程函数是传参的func,name_ 表示线程名称。

调用setDefaultName 就是将一定的方法把线程名称创建出来。

std::atomic_int Thread::numCreated_(0); // =0 这种explicit不能用
Thread::Thread(ThreadFunc func, const std::string &name) // 这里不需要把模板参数也写出来
    : started_(false), joined_(false), tid_(0), func_(std::move(func)), name_(name)
{
    setDefaultName();
}

setDefaultName 如果名字为空,则会创建默认的名字。

void Thread::setDefaultName()
{
    int num = ++numCreated_;
    // 如果名字为空,则会创建默认的名字。
    if(name_.empty())
    {
        char buf[32] = {0};
        snprintf(buf,sizeof buf,"Thread%d",num);
        name_ = buf;
    }
}

join 表示他是一个普通的工作线程,必须等他执行完才能够进行释放,实际上我们的资源是由shared_ptr维护的。 所以即使我们这里没有释放,也不会出现问题。

这里的析构方法实际上也就是分离线程的方法。

这里也是因为主线程虽然要释放了,但是从线程可能还有工作,此时我们的策略是分离,而不应该杀死线程。若主线程没有设置 join_ 为true,那么就是说主线程实际上可以不等待了,那么此时设置分离他的状态。

Thread::~Thread()
{
    if(started_ && !joined_)
    {
        thread_->detach();// thread类提供的设置分离线程的方法  pthread_deatch
    }
}

start 函数就是可以执行

thread_ 函数是通过右值的形式进行右值引用的。 此时开启线程,专门执行该线程函数。

为什么弄个信号量,做这么奇怪的操作?

这里是这样的,主线程创建一个线程,此时我们需要保证这个函数执行完,主线程返回的时候,从线程的tid必须是一个已经创建好的值。但是由于线程的调度策略我们并不知道,所以用信号量,让主线程在从线程 sem_post 生产一个的时候才放行主线程。

这里用的是二元信号量,本身就有互斥的含义。

void Thread::start() // 一个Thread对象,记录的就是一个新线程的详细信息、
{
  started_ = true;
  sem_t sem;
 // 第二个是是否进程间共享,第三个就是信号量的数量
  sem_init(&sem,false,0);
  // 开启线程,&获取外部成员变量
  thread_ = std::shared_ptr<std::thread>(new std::thread([&](){
    // 获取线程的tid值
    tid_ = CurrentThread::tid();
    sem_post(&sem); // sem + 1
    // 开启一个新线程
    func_(); // 开启一个新线程,专门执行该线程函数
  }));  

  // 等待新创建线程的tid值, 这里会直接阻塞住
  sem_wait(&sem); // sem - 1
}

主线程等待从线程。

void Thread::join()
{
    joined_ = true;
    thread_->join();
}

startLoop 与 threadFunc

startLoop 的终极目标就是创建一个新线程然后里面有着EventLoop,然后返回给上层

startLoop是初始化底层的Thread函数,创建新线程,执行新的线程函数threadFunc,threadFunc 会创建 EventLoop 并启动loop事件循环

而startLoop 返回的EventLoop* 的时候需要保证底层的Thread已经没问题并且loop。

EventLoop *EventLoopThread::startLoop() // 这里才创建新线程
{
    // 启动底层的线程
    thread_.start(); // start 里面的func_ 就是 EventLoopThread::threadFunc,也就是下面的函数得执行完才到我
    EventLoop *loop = nullptr;
    {
        std::unique_lock<std::mutex> lock(mutex_);
        while (loop_ == nullptr)
        {
            cond_.wait(lock); // 表示底层的线程都没创建好
        }
        loop = loop_;
    }
    return loop;
}

// 这个是新线程执行的函数
void EventLoopThread::threadFunc()
{
    EventLoop loop; // one loop per thread
    if (callback_)
    {
        callback_(&loop);
    }
    {
        std::unique_lock<std::mutex> lock(mutex_);
        loop_ = &loop;
        cond_.notify_one();
    }
    loop.loop();// 新线程就在这里调用epoll_wait,EventLoop 的 loop 函数,即开启了loop
    std::unique_lock<std::mutex> lock(mutex_);
    loop_ = nullptr; // 这里表示底层的事件已经结束了,这里是彻底Reactor 都不运行才会到这里
}

EventLoopThreadPool

EventLoop* baseLoop_; // EventLoop loop; 用户创建的一开始的Loop
std::string name_;
bool started_;
int numThreads_;
int next_;  // 用来轮询派发任务的
std::vector<std::unique_ptr<EventLoopThread>> threads_; // 封装了批量的EventLoopThread
std::vector<EventLoop*> loops_;
EventLoopThreadPool::EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg)
    : baseLoop_(baseLoop), name_(nameArg), started_(false), numThreads_(0), next_(0)
{
}
void setThreadNum(int numThreads){numThreads_ = numThreads;}

start 首先对于所有的EventLoopThread 进行设置名字。然后调用线程初始化函数初始化baseLoop_

void EventLoopThreadPool::start(const ThreadInitCallback &cb = ThreadInitCallback())
{
    started_ = true;
    for (int i = 0; i < numThreads_; ++i)
    {
        char buf[name_.size() + 32];
        snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
        EventLoopThread *t = new EventLoopThread(cb, buf);
        // vector<std::unique_ptr<EventLoopThread>>
        threads_.push_back(std::unique_ptr<EventLoopThread>(t));
        // vector<EventLoop*>
        loops_.push_back(t->startLoop()); // 底层创建线程,绑定一个新的EventLoop,并返回还loop的地址
    }
    // 整个服务端只有一个线程运行着baseLoop
    if (numThreads_ == 0 && cb)
    {
        cb(baseLoop_);
    }
}

getNextLoop

// 如果是多线程,baseLoop_默认以轮询的方式分配Channel给subLoop
EventLoop *EventLoopThreadPool::getNextLoop()
{
    // IO线程运行baseLoop_;
    EventLoop *loop = baseLoop_;
    // 若没有设置底层的线程数,返回的永远是baseLoop_;
    // 工作线程若是有,就在这里,获取下一个处理事件的loop。
    if (!loops_.empty())
    {
        loop = loops_[next_];
        ++next_;
        // 相当于 循环的数组 next_ 这样走
        if (next_ >= loops_.size())
        {
            next_ = 0;
        }
    }
    return loop;
}

Socket


是为了讲述Accept 而用到的类,TcpConnection也会使用到。

成员变量只有一个,就是sockfd_。

const int sockfd_;

成员变量就是填入了InetAddress,其实上也就是封装了一个对象。

int Socket::accept(InetAddress *peeraddr)
{
    /**
     * 参数不合法
     * 对返回的connfd 没有设置非阻塞
     * poller + non-blocking IO
    */
    struct sockaddr_in addr;
    bzero(&addr, sizeof addr);
    socklen_t len = sizeof addr; // accept 必须初始化 
    // accept4 可以让返回的connfd设置flag是nonblock
    int connfd = ::accept4(sockfd_, (sockaddr *)&addr, &len,SOCK_NONBLOCK| SOCK_CLOEXEC);
    if (connfd >= 0)
    {
        peeraddr->setSockAddr(addr);
    }
    return connfd;
}

TcpConnection


TcpConnection 封装了Channel, TcpConnnection的私有函数实际上就是被注册到Channel当中,当检测到事件的时候实际上调用的就是TcpConnection传递的方法。
用户定义的消息回调会在TcpConnection当中的读到内容的时候进行调用回调。
在这里插入图片描述
TcpServer会在接受到一个新连接的时候调用用户的connectionEstablished函数。

在这里插入图片描述

Acceptor MainReactor 核心


acctpor 里面的EventLoop 是MainLoop,他也会打包成一个Channel 给到mainLoop 当中。

newConnectionCallback_ 就是 TcpServer的构造函数里面初始化了

acceptor_->setNewConnectioCallback(std::bind(&TcpServer::newConnection,this,std::placeholders::_1,std::placeholders::_2)); 

成员变量:

EventLoop* loop_; // Acceptor 用的就是用户定义的baseLoop,也是mainLoop
Socket acceptSocket_;
Channel acceptChannel_;
NewConnectionCallback newConnectionCallback_;
bool listenning_;

newConnection 这个函数就是主要就是选择一个subLoop,这里面的ioLoop,然后将就绪事件传给他。

// 新的客户端会调用这里
void TcpServer::newConnection(int sockfd,const InetAddress& peerAddr)
{
    // 轮询算法选择一个subLoop,来管理channel
    EventLoop* ioLoop = threadPool_->getNextLoop();
    char buf[64] = {} ;
    snprintf(buf,sizeof buf,"-%s#%d",ipPort_.c_str(),nextConnId_);
    ++ nextConnId_;// 不是atomic是因为这个函数在主线程运行
    std::string connName = name_ + buf;
    LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n",
             name_.c_str(),connName.c_str(),peerAddr.toIpPort().c_str());

    // 通过sockfd获取其绑定的本机的ip地址和端口信息。
    sockaddr_in local;
    ::bzero(&local,sizeof local);
    socklen_t addrlen = sizeof local;
    if(::getsockname(sockfd,(sockaddr*)&local,&addrlen) < 0)
    {
        LOG_ERROR("sockets::getLocalAddr");
    }
    InetAddress localAddr(local);

    TcpConnectionPtr conn(new TcpConnection(
        ioLoop,
        connName,
        sockfd,
        localAddr,
        peerAddr
    ));

    connections_[connName] = conn;
    // 下面的回调都是用户设置给TcpServer-> TcpConnection->Channel->poller->notify channel调用回调
    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->setWriteCompleteCallback(writeCompleteCallback_);

    // 设置了如何关闭连接的回调
    conn->setCloseCallback(
        std::bind(&TcpServer::removeConnection,this,std::placeholders::_1)
    );
    // 直接调用TcpConnection::connectionEstablished
    ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished,conn));
}

构造函数定义了accpetChannel 相关的读事件就绪,应证了回调函数的执行都在Channel模块。

Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
    : loop_(loop), acceptSocket_(createNonblocking()),
      acceptChannel_(loop, acceptSocket_.fd()), listenning_(false)
{
    acceptSocket_.setReuseAddr(true);
    acceptSocket_.ReusePort(true);
    acceptSocket_.bindAddress(listenAddr);
    // TcpServer::start() Accept.listen
    acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}

这里调用handleRead就是执行回调方法,因为AcctorChannel 有事件就绪只要就是acceptfd上有事件就绪了。

两个回调函数,一个是accept上来需要调用的,首先把连接获取上来; 下面的就是另一个回调,就是启动subLoop接受这个connfd,接下来newConnectionCallback_里面会调用将TcpConnection所需要的函数都绑定到TcpConnection,然后TcpConnection往Channel的回调方法也会调用到其中的方法。

// listenfd有事件链接,就是有新用户链接了
void Acceptor::handleRead()
{
    InetAddress peerAddr;
    int connfd = acceptSocket_.accept(&peerAddr);
    if (connfd >= 0)
    {
        if (newConnectionCallback_)
        {
            //void TcpServer::newConnection(int sockfd,const InetAddress& peerAddr)
            newConnectionCallback_(connfd, peerAddr); // 轮询找到subLoop,唤醒,分发当前新客户端的Channel
        }
        else
        {
            ::close(connfd);
        }
    }
    else
    {
        LOG_ERROR("%s:%s:%d accept error:%d\n", __FILE__, __FUNCTION__, __LINE__, errno);
        // 资源用完
        if (errno == EMFILE)
        {
            // 可以调整当前进程的文件描述符上线
            LOG_ERROR("%s:%s:%d sockfd reached limit!\n", __FILE__, __FUNCTION__, __LINE__);
        }
    }
}

也就是linsten sock 有事件就绪,就会调用Accept::handleRead 方法读取,然后调用EvnetLoopThreadPoll里面的getNextLoop 然后选择一个ioLoop 把新的connfd打包成Channel 发送给他,并且将用户设置的方法TcpServer.cc保存,后续connfd统一都进行回调注册。 因为listen sock 的处理方法是固定的,用户传的方法,都是监听的事件就绪后的业务处理。

newConnection 这个函数,设置了用户的事件就绪后需要做什么。

// 下面的回调都是用户设置给TcpServer-> TcpConnection->Channel->poller->notify channel调用回调
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);

在这里插入图片描述

博主简化的版本,只用于学习,不可用于商用。
https://gitee.com/wuyi-ljh/somebloglinkstoshare 附带一张思维导图。

总结


个人认为比较重要的模块都有叙述,差的部分会在后续更新,gitee里面的思维导图也会持续完善,并且会梳理一下整体的逻辑。可以关注一下,以防丢失~

借鉴:
长文梳理Muduo库核心代码及优秀编程细节剖析
muduo学习笔记:base部分之CurrentThread命名空间与Thread类

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/484471.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

c++ 11标准模板(STL) std::vector (四)

定义于头文件 <vector> template< class T, class Allocator std::allocator<T> > class vector;(1)namespace pmr { template <class T> using vector std::vector<T, std::pmr::polymorphic_allocator<T>>; }(2)(C17…

庖丁解牛函数知识---C语言《1》

目录 前言&#xff1a; 1.程序中的函数 2.库函数的学习和使用 3.自定义函数 4.传值调用与传址调用 5.形参与实参 6.练习---二分查找函数 ❤博主CSDN:啊苏要学习 ▶专栏分类&#xff1a;C语言◀ C语言的学习&#xff0c;是为我们今后学习其它语言打好基础&#xff0c;C生…

Apache POI,springboot中导出excel报表

2. Apache POI 2.1 介绍 Apache POI 是一个处理Miscrosoft Office各种文件格式的开源项目。简单来说就是&#xff0c;我们可以使用 POI 在 Java 程序中对Miscrosoft Office各种文件进行读写操作。 一般情况下&#xff0c;POI 都是用于操作 Excel 文件。 Apache POI 的应用场景…

【SQL】窗口函数及行转列等操作总结

1. 窗口函数 窗口函数的应用 排名问题&#xff1a;每个部门按业绩来排名 topN问题&#xff1a;找出每个部门排名前N的员工进行奖励 窗口函数的语法 <窗口函数> over (partition by <用于分组的列名> order by <用于排序的列名>) <窗口函数>的位置&…

Java基础--->JVM(1)

文章目录 为什么学习JVM&#xff1f;什么是虚拟机&#xff1f;JVM的作用JVM组成部分类加载器类什么时候会被加载&#xff08;初始化&#xff09;有哪些类加载器什么是双亲委派机制如何打破双亲委派机制 为什么学习JVM&#xff1f; ​ 学习JVM是为了能更深入的理解Java这门语言&…

推荐算法实战项目:PNN 原理以及案例实战(附完整 Python 代码)

本文要介绍的是由上海交通大学的研究人员提出的PNN&#xff08;Product-based Neural Networks&#xff09;模型&#xff0c;该模型包含一个embedding层来学习类别数据的分布式表示&#xff0c;此外还包含product层来捕获字段之间的特征交互模式&#xff0c;最后包含一个全连接…

一个实例讲讲 ChatGPT 推理

吴恩达与 OpenAI 官方联合推出了 1.5 小时的免费视频课&#xff1a;地址&#xff1a; https://learn.deeplearning.ai/chatgpt-prompt-eng/lesson/2/guidelines 今天我学了第四讲&#xff0c;ChatGPT Inferring&#xff0c;即推理 教学中&#xff0c;给的例子是情绪判断。 我很…

手写数字识别基本思路

问题 什么是MNIST?如何使用Pytorch实现手写数字识别&#xff1f;如何进行手写数字对模型进行检验&#xff1f; 方法 mnist数据集 MNIST数据集是美国国家标准与技术研究院收集整理的大型手写数字数据集&#xff0c;包含了60,000个样本的训练集以及10,000个样本的测试集。 使用P…

RIP笔记

目录 RIP路由信息协议——UDP520端口(RIPNG521端口) RIP使用的算法——贝尔曼福特算法 RIP的版本 RIP的数据包 RIP的工作过程 RIP的计时器 周期更新计时器——默认30s 失效计时器——默认180s 垃圾回收计时器——默认120s RIP的环路问题 解决方法&#xff1a; RIP的…

12种接口优化的通用方案

一、背景 针对老项目&#xff0c;去年做了许多降本增效的事情&#xff0c;其中发现最多的就是接口耗时过长的问题&#xff0c;就集中搞了一次接口性能优化。本文将给小伙伴们分享一下接口优化的通用方案。 二、接口优化方案总结 1.批处理 批量思想&#xff1a;批量操作数据库…

Item冷启优化

Item冷启动的目标&#xff1a; 1.精准推荐。 2.激励发布。 3.挖掘高潜。 Item冷启动优化措施&#xff1a; 1.优化全链路&#xff08;召回和排序&#xff09; 2.流量调控&#xff08;新老物品的流量分配&#xff09; 评价指标&#xff1a; 作者侧&#xff1a; 发布渗透率&a…

【基于Ubuntu18.04+Melodic的realsense D435安装】

【基于Ubuntu18.04Melodic的realsense D435安装】 1. RealSense SDK安装1.1 克隆SDK1. 2 安装相关依赖1.3 安装权限脚本1. 4 进行编译与安装1.5 测试安装是否成功 2. D435i 安装ROS接口2.1 方法一realsense—ros源码2.2 方法二安装相机库 3. 总结 1. RealSense SDK安装 系统硬…

C++:分治算法之选择问题的选择第k小元素问题

目录 3.2.6 选择问题 分析过程&#xff1a; 解法一&#xff1a; 算法代码&#xff1a; 【单组数据】 【多组数据】 运行结果&#xff1a; 解法二 代码&#xff1a; 运行结果&#xff1a; 解法三&#xff1a; 3.2.6 选择问题 ¢ 对于给定的 n 个元素的数组 a[0 …

DAY 53 Haproxy负载均衡集群

常见的Web集群调度器 目前常见的Web集群调度器分为软件和硬件&#xff1a; 软件通常使用开源的LVS、Haproxy、 Nginx LVS性能最好&#xff0c;但是搭建相对复杂&#xff1b;Nginx 的upstream模块支持群集功能&#xff0c;但是对群集节点健康检查功能不强&#xff0c;高并发性能…

第一章 Linux是什么

Linux是一套操作系统&#xff0c;如同下图所示&#xff0c;Linux就是核心与系统调用接口那两层。至于应用程序不算Linux。 1.1 Linux当前应用的角色 由于Linux kernel实在是非常的小巧精致&#xff0c;可以在很多强调省电以及较低硬件资源的环境下面执行&#xff1b; 此外&…

【Elasticsearch】NLP简单应用

文章目录 NLP简介ES中的自然语言处理(NLP)NLP演示将opennlp插件放在ESplugins路径中下载NER模型配置opennlp重启ES、验证 NLP简介 NLP代表自然语言处理&#xff0c;是计算机科学和人工智能领域的一个分支。它涉及使用计算机来处理、分析和生成自然语言&#xff0c;例如英语、中…

企业对网络安全的重视度开始降低

近日&#xff0c;英国科学技术部发布了《2023年企业网络安全合规调查报告》&#xff08; Cyber Security Breaches Survey &#xff09;&#xff0c;对英国所有企业和社会性组织目前的网络威胁态势和合规建设进行研究&#xff0c;同时也就如何提升新一代网络应用的合规性给出专…

02-管理员登录与维护 尚筹网

一、管理员登陆 需要做的&#xff1a; 对存入数据库的密码进行MD5加密在登录界面登录失败时的处理抽取后台页面的公共部分检查登录状态&#xff0c;防止未登录时访问受保护资源的情况 具体操作如下&#xff1a; 1&#xff09;、MD5加密 ​ 使用到的CrowdConstant类中的一些…

人的全面发展评价指标体系—基于相关-主成分分析构建

本文先从经济、社会、生活质量和人口素质四个方面海选了众多人的全面发展评价指标&#xff0c;然后根据可观测性原则剔除无法获得的指标进行了初步筛选&#xff0c;再利用相关性分析删除相关系数大的指标&#xff0c;以及通过主成分分析删除因子负载小的指标&#xff0c;完成了…

CCD视觉检测设备如何选择光源

CCD视觉检测设备的机器视觉系统对光源的要求很高&#xff0c;光源是决定图像质量的一个重要因素。那么&#xff0c;我们就来看看CCD图像加网设备和机器视觉系统光源的选择点——CCD图像加网设备。 CCD视觉检测设备机器视觉系统光源选择要点&#xff1a; 1. 对比度&#xff1a;…