【Muduo】三大核心之Poller、EPollPoller

news2024/11/15 20:05:15

Poller

在Muduo中,Poller负责基于IO多路复用机制进行IO事件监听和处理的组件,作为EPollPoller的基类,为后者提供了与PollPoller统一的IO复用接口,并且声明了一个关键的创建派生类的成员函数:

static Poller *newDefaultPoller(EventLoop *loop);

此函数可以通过判断 ` ::getenv("MUDUO_USE_POLL") ` 来决定返回一个PollPoller还是EPollPoller,解耦 PollPoller 和 EPollPoller。由于newDefaultPoller成员函数在定义的时候需要派生类EPollPoller已经实现,故不可以将其定义直接写在Poller.cc中,而是另起了一个文件DefaultPoller.cc

Poller.h

#pragma once
#include "noncopyable.h"
#include "Timestamp.h"
#include <vector>
#include <unordered_map>

class Channel;
class EventLoop;

// muduo库中,多路事件分发器的核心IO复用模块
class Poller : noncopyable
{
public:
    using ChannelList = std::vector<Channel *>;

    Poller(EventLoop *loop);
    virtual ~Poller();

    /// Polls the I/O events.
    /// Must be called in the loop thread.
    /// 给所有IO复用保留统一的接口
    virtual Timestamp poll(int timeoutMs, ChannelList *activeChannels) = 0;

    /// Changes the interested I/O events.
    /// Must be called in the loop thread.
    virtual void updateChannel(Channel *channel) = 0;

    /// Remove the channel, when it destructs.
    /// Must be called in the loop thread.
    virtual void removeChannel(Channel *channel) = 0;

    // 判断是否在当前poller中
    virtual bool hasChannel(Channel *channel) const;

    // EventLoop可以通过该接口获取默认的IO复用的具体实现
    static Poller *newDefaultPoller(EventLoop *loop); // 返回Poller对象,不能直接在Poller.cc中实现

    void assertInLoopThread() const
    {
        // ownerLoop_->assertInLoopThread();
    }

protected:
    // channel->fd() : Channel
    using ChannelMap = std::unordered_map<int, Channel*>;
    ChannelMap channels_;

private:
    EventLoop *ownerLoop_;
};

 Poller.cc

#include "Poller.h"
#include "Channel.h"

Poller::Poller(EventLoop* loop)
  : ownerLoop_(loop)
{
}

Poller::~Poller() = default;

bool Poller::hasChannel(Channel* channel) const
{
  auto it = channels_.find(channel->fd());
  return it != channels_.end() && it->second == channel;
}

DefaultPoller.cc

#include "Poller.h"
#include "EPollPoller.h"
#include <stdlib.h>

// 解耦 PollPoller 和 EPollPoller
Poller* Poller::newDefaultPoller(EventLoop *loop)
{
    if (::getenv("MUDUO_USE_POLL"))
    {
        // return new PollPoller(loop);
    }
    else
    {
        return new EPollPoller(loop);
    }
    return nullptr; 
}

EPollPoller

Muduo网络库的EPollPoller模块继承了Poller,是负责基于Linux epoll机制进行IO事件监听和处理的组件,封装了一系列函数用于调用底层的Linux系统调用。在Muduo中,EPollPoller与EventLoop和Channel类一起构成了事件驱动网络编程的基础框架。下面我将详细介绍EPollPoller模块的主要功能和特点。

主要功能

  1. 事件监听:EPollPoller模块使用Linux的epoll机制来监听一组文件描述符(通常是网络连接)上的IO事件。这些事件包括可读、可写、异常等。
  2. 事件注册与注销:通过EPollPoller的接口,用户可以将感兴趣的文件描述符及其相关事件注册到epoll中,以便监听这些事件。当不再需要监听某个文件描述符时,也可以将其从epoll中注销。
  3. 事件分发:当epoll监听到某个文件描述符上有事件发生时,EPollPoller会将这些事件分发到对应的Channel对象上。每个Channel对象都封装了一个文件描述符及其相关的事件处理逻辑。
  4. 高效性:EPollPoller利用了epoll机制的高效性,能够同时监听大量的文件描述符,并且在有事件发生时只返回有事件发生的文件描述符集合,避免了轮询所有文件描述符的开销。

实现原理

  1. 创建epoll实例:在初始化时,EPollPoller会epoll_create1函数来创建一个epoll实例,并获取一个文件描述符用于后续操作。
  2. 注册事件:使用epoll_ctl函数将需要监听的文件描述符及其关注的事件类型注册到epoll实例中。可以添加新的文件描述符、修改已注册的文件描述符的事件类型或删除已注册的文件描述符。
  3. 进入事件循环:使用epoll_wait函数在epoll实例上进行阻塞等待,直到有事件发生。当有事件发生时,epoll_wait会返回发生事件的文件描述符集合及其对应的事件类型。
  4. 事件分发:EPollPoller根据epoll_wait返回的文件描述符集合和事件类型,通过在epoll_event的data.ptr字段预先设置Channel对象的地址,从而能够找到对应的Channel对象,并调用其上的回调函数来处理事件。
    struct epoll_event
    {
      uint32_t events;	/* Epoll events */
      epoll_data_t data;	/* User data variable */
    } __EPOLL_PACKED;

与其他组件的关系

在Muduo网络库中,EPollPoller与EventLoop和Channel类紧密合作,共同实现了事件驱动的网络编程框架。EventLoop负责驱动整个事件循环,定时检查EPollPoller是否有事件发生,并调用相应的处理逻辑。Channel类则是对文件描述符及其相关事件的封装,它包含了与文件描述符相关的事件处理逻辑和回调函数。

通过这三个组件的协同工作,Muduo网络库能够高效、灵活地处理大量的网络连接和IO事件,为构建高性能的网络服务器提供了坚实的基础。

 EPollPoller.h

#include "Poller.h"
#include <sys/epoll.h>

/*
epoll的使用
epoll_create 创建fd
epoll_ctl  add/mod/del
epoll_wait
*/

class Channel;

class EPollPoller : public Poller
{
public:
    EPollPoller(EventLoop *loop);
    ~EPollPoller() override; // override 表示由编译器确保父类的同名函数是虚函数

    // 重写基类的抽象方法
    Timestamp poll(int timeoutMs, ChannelList *activeChannels) override;
    void updateChannel(Channel *channel) override;
    void removeChannel(Channel *channel) override;

private:
    static const int kInitEventListSize = 16; // EventList初始长度

    // 填写活跃的连接
    void fillActiveChannels(int numEvents, ChannelList *activeChannels) const;

    // 更新channel通道
    void update(int operation, Channel *channel);

    using EventList = std::vector<epoll_event>;

    int epollfd_;
    EventList events_;
};

EpollPoller.cc

#include "EventLoop.h"
#include "LogStream.h"
#include "Poller.h"
#include "Channel.h"

#include <sys/eventfd.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <iostream>

// 防止一个线程创建多个EventLoop    threadLocal
__thread EventLoop *t_loopInThisThread = nullptr;

// 定义Poller超时时间
const int kPollTimeMs = 10000;

// 创建weakupfd,用来notify唤醒subReactor处理新来的channel
int createEventfd()
{
    int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (evtfd < 0)
    {
        LOG_FATAL << "Failed in eventfd" << errno;
    }
    return evtfd;
}

EventLoop::EventLoop()
    : looping_(false),
      quit_(false),
      callingPendingFunctors_(false),
      threadId_(CurrentThread::tid()),
      poller_(Poller::newDefaultPoller(this)),
      wakeupFd_(createEventfd()),
      wakeupChannel_(new Channel(this, wakeupFd_))
{
    LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;
    if (t_loopInThisThread)
    {
        LOG_FATAL << "Another EventLoop " << t_loopInThisThread
                  << " exists in this thread " << threadId_;
    }
    else
    {
        t_loopInThisThread = this;
    }

    // 设置weakupfd的事件类型以及发生事件后的回调操作
    wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
    // we are always reading the wakeupfd
    // 每一个EventLoop都将监听weakupChannel的EPOLLIN读事件了
    // 作用是subloop在阻塞时能够被mainLoop通过weakupfd唤醒
    wakeupChannel_->enableReading();
}

EventLoop::~EventLoop()
{
    LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_
              << " destructs in thread " << CurrentThread::tid();
    wakeupChannel_->disableAll();
    wakeupChannel_->remove();
    ::close(wakeupFd_);
    t_loopInThisThread = NULL;
}

void EventLoop::handleRead()
{
    uint64_t one = 1;
    ssize_t n = read(wakeupFd_, &one, sizeof one);
    if (n != sizeof one)
    {
        LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
    }
}

void EventLoop::loop()
{
    looping_ = true;
    quit_ = false;
    LOG_INFO << "EventLoop " << this << " start looping";

    while (!quit_)
    {
        activeChannels_.clear();
        // 当前EventLoop的Poll,监听两类fd,client的fd(正常通信的,在baseloop中)和 weakupfd(mainLoop 和 subLoop 通信用来唤醒sub的)
        pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
        for (Channel *channel : activeChannels_)
        {
            // Poller监听哪些channel发生事件了,然后上报给EventLoop,通知channel处理相应的事件
            channel->handleEvent(pollReturnTime_);
        }
        // 执行当前EventLoop事件循环需要处理的回调操作
        /**
         * IO线程 mainLoop 只 accept 然后返回client通信用的fd <= 用channel打包 并分发给 subloop
         * mainLoop事先注册一个回调cb(需要subLoop来执行),weakup subloop后,
         * 执行下面的方法,执行之前mainLoop注册的cb操作(一个或多个)
         */
        doPendingFunctors();
    }

    LOG_INFO << "EventLoop " << this << " stop looping";
    looping_ = false;
}

/**
 * 退出事件循环
 * 1、loop在自己的线程中 调用quit,此时肯定没有阻塞在poll中
 * 2、在其他线程中调用quit,如在subloop(woker)中调用mainLoop(IO)的qiut
 *
 *                  mainLoop
 * 
 *      Muduo库没有 生产者-消费者线程安全的队列 存储Channel
 *      直接使用wakeupfd进行线程间的唤醒       
 *
 * subLoop1         subLoop2        subLoop3
 */
void EventLoop::quit()
{
    quit_ = true;
    // 2中,此时,若当前woker线程不等于mainLoop线程,将本线程在poll中唤醒
    if (!isInLoopThread())
    {
        wakeup();
    }
}

void EventLoop::runInLoop(Functor cb)
{
    // LOG_DEBUG<<"EventLoop::runInLoop  cb:" << (cb != 0);
    if (isInLoopThread()) // 产生段错误
    { // 在当前loop线程中 执行cb
        LOG_DEBUG << "在当前loop线程中 执行cb";
        cb();
    }
    else
    { // 在其他loop线程执行cb,需要唤醒其loop所在线程,执行cb
        LOG_DEBUG << "在其他loop线程执行cb,需要唤醒其loop所在线程,执行cb";
        queueInLoop(cb);
    }
}

void EventLoop::queueInLoop(Functor cb)
{
    {
        std::unique_lock<std::mutex> ulock(mutex_);
        pendingFunctors_.emplace_back(cb);
    }

    // 唤醒相应的,需要执行上面回调操作的loop线程
    // 若当前线程正在执行回调doPendingFunctors,但是又有了新的回调cb
    // 防止执行完回调后又阻塞在poll上无法执行新cb,所以预先wakeup写入一个数据
    if (!isInLoopThread() || callingPendingFunctors_) 
    {
        wakeup(); // 唤醒loop所在线程
    }
}

// 用来唤醒loop所在的线程,向wakeupfd写一个数据,wakeupChannel就发生读事件,当前loop线程就会被唤醒
void EventLoop::wakeup()
{
    uint64_t one = 1;
    ssize_t n = ::write(wakeupFd_, &one, sizeof one);
    if (n != sizeof one)
    {
        LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
    }
}

void EventLoop::updateChannel(Channel *channel)
{
    // channel是发起方,通过loop调用poll
    poller_->updateChannel(channel);
}

void EventLoop::removeChannel(Channel *channel)
{
    // channel是发起方,通过loop调用poll
    poller_->removeChannel(channel);
}

bool EventLoop::hasChannel(Channel *channel)
{
    return poller_->hasChannel(channel);
}

// 执行回调,由TcpServer提供的回调函数
void EventLoop::doPendingFunctors()
{
    std::vector<Functor> functors;
    callingPendingFunctors_ = true; // 正在执行回调操作

    { // 使用swap,将原pendingFunctors_置空并且释放,其他线程不会因为pendingFunctors_阻塞
        std::unique_lock<std::mutex> lock(mutex_);
        functors.swap(pendingFunctors_);
    }

    for (const Functor &functor : functors)
    {
        functor(); // 执行当前loop需要的回调操作
    }

    callingPendingFunctors_ = false;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1685117.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

NDIS小端口驱动(五)

在需要的时候&#xff0c;我们也许需要NDIS微型端口程序信息&#xff0c;下面会从多个方面来讨论如何查询NDIS微型端口驱动。 查询无连接微型端口驱动程序 若要查询无连接微型端口驱动程序维护的 OID&#xff0c;绑定协议调用 NdisOidRequest 并传递 一个NDIS_OID_REQUEST 结…

OSPF多区域组网实验(华为)

思科设备参考&#xff1a;OSPF多区域组网实验&#xff08;思科&#xff09; 技术简介 OSPF多区域功能通过划分网络为多个逻辑区域来提高网络的可扩展性和管理性能。每个区域内部运行独立的SPF计算&#xff0c;而区域之间通过区域边界路由器进行路由信息交换。这种划分策略适用…

线性代数(二)

1.标量 标量也叫0D张量&#xff0c;一个标量就是一个数&#xff0c;它只有大小&#xff0c;没有方向。 import torch x torch.Tensor(3) print(x)2.向量 向量也叫1D张量。向量只有一个轴&#xff0c;沿着行的方向&#xff0c;或者沿着列的方向。向量一般指列向量。 import…

光伏储能EMS 风电智慧能量管理系统 -安科瑞王盼盼

安科瑞18721098782王盼盼 一&#xff1a;储能 EMS&#xff08;Energy Management System&#xff09; 储能 EMS&#xff1a;储能 EMS 是一个综合管理系统&#xff0c;用于整体管理和优化储能系统的运行。它基于电力系统的需求和需求响应&#xff0c;通过控制和协调储能设备的…

WPF中DataGrid实现多选框功能

1. 效果图 2. Model建立 public class RstModelCheck : ObservableObject {//为了显示Head1和Head2.而且View中绑定属性而非字段&#xff0c;否则不能显示。public string? Name { get; set; } public bool PlatenAll {get > _platenAll;set{SetProperty(ref _platenAl…

【代码随想录】【算法训练营】【第15天】 [102]二叉树的层序遍历 [226]翻转二叉树 [101]对称二叉树

前言 思路及算法思维&#xff0c;指路 代码随想录。 题目来自 LeetCode。 day 15&#xff0c;一周中最困难的周三~ 题目详情 [102] 二叉树的层序遍历 题目描述 102 二叉树的层序遍历 解题思路 前提&#xff1a;二叉树的层级遍历 思路&#xff1a;利用队列的“先进先出…

C#利用WinForm实现可以查看指定目录文件下所有图片

目录 一、关于Winform 二、创建应用 三、功能实现 四、代码部分 一、关于Winform Windows 窗体是用于生成 Windows 桌面应用的 UI 框架。 它提供了一种基于 Visual Studio 中提供的可视化设计器创建桌面应用的高效方法。 利用视觉对象控件的拖放放置等功能&#xff0c;可…

信捷PLC 编程常用寄存器及编程技巧说明

最近在用信捷的PLC&#xff0c;分享下常用的寄存器和编程技巧说明。 技巧主要包括以下几个方面&#xff1a; 充分规划各个功能区&#xff1a;在编写程序时&#xff0c;需要充分规划各个功能区。 考虑伺服步进功能和气缸手动功能的应用&#xff0c;手动操作时&#xff0c;可…

C语言 | Leetcode C语言题解之第107题二叉树的层序遍历II

题目&#xff1a; 题解&#xff1a; int** levelOrderBottom(struct TreeNode* root, int* returnSize, int** returnColumnSizes) {int** levelOrder malloc(sizeof(int*) * 2001);*returnColumnSizes malloc(sizeof(int) * 2001);*returnSize 0;if (!root) {return level…

个人感觉对Material设计有用的几个网址

(一) Modular and customizable Material Design UI components for Android GIthub: material-components-android (二) 学习Material设计 Material Design (三) 用于创建Material主题&#xff0c;支持导出多种格式 material-theme-builder

Web API——获取DOM元素

目录 1、根据选择器来获取DOM元素 2.、根据选择器来获取DOM元素伪数组 3、根据id获取一个元素 4、通过标签类型名获取所有该标签的元素 5、通过类名获取元素 目标&#xff1a;能查找/获取DOM对象 1、根据选择器来获取DOM元素 语法&#xff1a; document.querySelector(css选择…

一维前缀和[模版]

题目链接 题目: 分析: 因为要求数组中连续区间的和, 可以使用前缀和算法注意:下标是从1开始算起的, 真正下标0的位置是0第一步: 预处理出来一个前缀和数组dp dp[i] 表示: 表示[1,i] 区间所有元素的和dp[i] dp[i-1] arr[i]例如示例一中: dp数组为{1,3,7}第二步: 使用前缀数…

css特性(继承性、层叠性)

1.继承性 可以继承的常见属性&#xff08;文字控制属性都可以继承&#xff09; ps:可以通过调试器查看是否能够继承 注意&#xff1a;a标签的color会继承失效&#xff1b;h系列标签的font-size会继承失效 2.层叠性 后面的样式会覆盖前面的样式 给同一个标签设置不同的样式…

酷开科技以内容为契机,酷开系统向消费者需求的深度挖掘迈进一步

酷开系统还拥有强大的内容资源和推荐算法&#xff0c;能够根据消费者的兴趣爱好为其提供个性化的推荐服务。无论是电影、电视剧、综艺节目&#xff0c;还是新闻、体育、娱乐资讯&#xff0c;酷开系统都能帮助大家快速找到感兴趣的内容&#xff0c;并且通过智能推荐算法不断优化…

vue3插槽solt 使用

背景增加组件的复用性&#xff0c;个人体验组件化还是react 方便。 Vue插槽solt如何传递具名插槽的数据给子组件&#xff1f; 一、solt 原理 知其然知其所以然 Vue的插槽&#xff08;slots&#xff09;是一种分发内容的机制&#xff0c;允许你在组件模板中定义可插入的内容…

4月粽子行业线上市场销售数据分析

随着节日庆祝常态化&#xff0c;消费者对礼物消费的态度发生变化&#xff0c;这会影响粽子的消费模式和市场需求。再加上技术进步&#xff0c;如速冻粽子和真空粽子的推广&#xff0c;也极大地推动了粽子行业的发展&#xff0c;使得产品更易于保存和运输&#xff0c;从而满足了…

Spark-RDD-持久化详解

Spark概述 Spark-RDD概述 1.持久化与序列化的关系 在Spark中&#xff0c;持久化&#xff08;Persistence&#xff09;和序列化&#xff08;Serialization&#xff09;是两个关键概念&#xff0c;它们在RDD处理过程中起着重要作用&#xff0c;并且有一定的关联&#xff1a; &a…

Windows安装mingw32/w64

1.下载 MinGW-w64 WinLibs - GCCMinGW-w64 compiler for Windows Releases niXman/mingw-builds-binaries (github.com) MinGW-w64、UCRT 和 MSVCRT 是 Windows 平台上常用的 C/C 运行库&#xff0c;它们有以下不同点&#xff1a; MinGW-w64&#xff1a;是一个基于 GCC 的…

Transformer,革命性的深度学习架构

Transformer 是一种革命性的深度学习架构,专门设计用于处理序列数据,特别是在自然语言处理(NLP)任务中表现卓越。它由 Vaswani 等人在 2017 年发表的论文《Attention is All You Need》中首次提出,打破了当时基于循环神经网络(RNN)和卷积神经网络(CNN)的序列建模常规,…

Golang | Leetcode Golang题解之第108题将有序数组转换为二叉搜索树

题目&#xff1a; 题解&#xff1a; func sortedArrayToBST(nums []int) *TreeNode {rand.Seed(time.Now().UnixNano())return helper(nums, 0, len(nums) - 1) }func helper(nums []int, left, right int) *TreeNode {if left > right {return nil}// 选择任意一个中间位置…