07 | Swoole 源码分析之 Channel 通道模块

news2024/11/18 6:21:55

原文首发链接:Swoole 源码分析之 Channel 通道模块
大家好,我是码农先森。

引言

通道,用于协程间通讯,支持多生产者协程和多消费者协程。底层自动实现了协程的切换和调度。

通道与 PHP 的 Array 类似,仅占用内存,没有其他额外的资源申请,所有操作均为内存操作,无 IO 消耗。

底层使用 PHP 引用计数实现,无内存拷贝。即使是传递巨大字符串或数组也不会产生额外性能消耗 channel 基于引用计数实现,是零拷贝的。

源码拆解

Channel 通道需要在协程环境中使用,我们先看下面这段代码,使用 new Channel(1) 创建一个 channel 对象,然后在第一个协程中向通道中推送数据,在第二个协程获取到通道内的数据进行消费。

use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use function Swoole\Coroutine\run;

run(function(){
    // 创建 channel 通道对象
    $channel = new Channel(1);
    Coroutine::create(function () use ($channel) {
        for($i = 0; $i < 10; $i++) {
            Coroutine::sleep(1.0);
            // 向通道内推送数据
            $channel->push(['rand' => rand(1000, 9999), 'index' => $i]);
            echo "{$i}\n";
        }
    });
    Coroutine::create(function () use ($channel) {
        while(1) {
            // 从通道中获取数据
            $data = $channel->pop(2.0);
            if ($data) {
                var_dump($data);
            } else {
                assert($channel->errCode === SWOOLE_CHANNEL_TIMEOUT);
                break;
            }
        }
    });
});

在分析源代码之前,我们可以提前看一下源码整体的调用逻辑图,以便我们有个大致的印象。

这段代码主要是在 Swoole 的协程环境中创建 Channel 对象并初始化其容量的逻辑。

// swoole-src/ext-src/swoole-channel.cc:132
static PHP_METHOD(swoole_channel_coro, __construct) {
    zend_long capacity = 1;
	
	// 解析传入的参数
    ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 1)
    Z_PARAM_OPTIONAL
    Z_PARAM_LONG(capacity)
    ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

    if (capacity <= 0) {
        capacity = 1;
    }

	// 当前对象对应的 ChannelObject 结构体指针
    ChannelObject *chan_t = php_swoole_channel_coro_fetch_object(Z_OBJ_P(ZEND_THIS));
    // 为该通道对象分配新的 Channel 实例,并设置其容量为传入的值。
    chan_t->chan = new Channel(capacity);
    zend_update_property_long(swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("capacity"), capacity);
}

这段代码主要是在 Swoole 的协程环境中向通道中推送数据并对返回结果进行处理的逻辑。

// swoole-src/ext-src/swoole-channel.cc:149
static PHP_METHOD(swoole_channel_coro, push) {
	// 获取当前对象的 Channel 实例
    Channel *chan = php_swoole_get_channel(ZEND_THIS);
    zval *zdata;
    double timeout = -1;

	// 解析传入的参数
    ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 2)
    Z_PARAM_ZVAL(zdata)
    Z_PARAM_OPTIONAL
    Z_PARAM_DOUBLE(timeout)
    ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

    Z_TRY_ADDREF_P(zdata);
    zdata = sw_zval_dup(zdata);
    // 向通道中推入数据
    if (chan->push(zdata, timeout)) {
        zend_update_property_long(
            swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), Channel::ERROR_OK);
        RETURN_TRUE;
    } else {
        zend_update_property_long(
            swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), chan->get_error());
        Z_TRY_DELREF_P(zdata);
        efree(zdata);
        RETURN_FALSE;
    }
}

// swoole-src/coroutine/channel.cc:105
bool Channel::push(void *data, double timeout) {
	// 获取当前协程对象 current_co
    Coroutine *current_co = Coroutine::get_current_safe();
    // 如果通道已关闭
    if (closed) {
    	// 设置错误并返回空指针
        error_ = ERROR_CLOSED;
        return false;
    }
    // 如果通道已满或生产者队列不为空,则设置超时消息,并根据传入的超时值添加定时器,等待生产者。
    if (is_full() || !producer_queue.empty()) {
        TimeoutMessage msg;
        msg.error = false;
        msg.timer = nullptr;
        if (timeout > 0) {
            msg.chan = this;
            msg.type = PRODUCER;
            msg.co = current_co;
            // 根据传入的超时值添加定时器
            msg.timer = swoole_timer_add(timeout, false, timer_callback, &msg);
        }

		// 挂起生产者协程
        yield(PRODUCER);

		// 如果设置了定时器,则在超时消息中删除定时器
        if (msg.timer) {
            swoole_timer_del(msg.timer);
        }
   
        // 如果当前协程被取消
        if (current_co->is_canceled()) {
        	// 设置错误并返回空指针
            error_ = ERROR_CANCELED;
            return nullptr;
        }
        
        // 如果发生超时
        if (msg.error) {
        	// 设置错误并返回空指针
            error_ = ERROR_TIMEOUT;
            return nullptr;
        }

        // 如果通道关闭且为空的情况
        if (closed && is_empty()) {
        	// 设置相应的错误并返回空指针。
            error_ = ERROR_CLOSED;
            return nullptr;
        }
    }
    
	// 将数据压入数据队列。
    data_queue.push(data);
    swoole_trace_log(SW_TRACE_CHANNEL, "push data to channel, count=%ld", length());
    
    // 如果消费者队列不为空,则唤醒消费者协程。
    if (!consumer_queue.empty()) {
        Coroutine *co = pop_coroutine(CONSUMER);
        // 恢复消费者协程
        co->resume();
    }
    return true;
}

这段代码主要是在 Swoole 的协程环境中从通道中取出数据并对返回结果进行处理的逻辑。

// swoole-src/ext-src/swoole-channel.cc:175
static PHP_METHOD(swoole_channel_coro, pop) {
	// 获取当前对象的 Channel 实例
    Channel *chan = php_swoole_get_channel(ZEND_THIS);
    // 设置超时变量为-1
    double timeout = -1;
	
	// 解析一个超时参数
    ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 1)
    Z_PARAM_OPTIONAL
    Z_PARAM_DOUBLE(timeout)
    ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

	// 从通道中取出数据,并返回一个 zval 指针
    zval *zdata = (zval *) chan->pop(timeout);
    // 如果返回的 zval 指针不为空
    if (zdata) {
    	// 将其返回给 PHP 脚本,并释放内存
        RETVAL_ZVAL(zdata, 0, 0);
        efree(zdata);
        zend_update_property_long(
            swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), Channel::ERROR_OK);
    } else {
        zend_update_property_long(
            swoole_channel_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("errCode"), chan->get_error());
        RETURN_FALSE;
    }
}

// swoole-src/coroutine/channel.cc:55
void *Channel::pop(double timeout) {
	// 获取当前协程对象 current_co
    Coroutine *current_co = Coroutine::get_current_safe();
    // 如果通道已关闭且为空
    if (closed && is_empty()) {
    	// 设置错误并返回空指针
        error_ = ERROR_CLOSED;
        return nullptr;
    }
    // 如果通道为空或者消费者队列不为空
    if (is_empty() || !consumer_queue.empty()) {
        TimeoutMessage msg;
        msg.error = false;
        msg.timer = nullptr;
        if (timeout > 0) {
            msg.chan = this;
            msg.type = CONSUMER;
            msg.co = current_co;
            // 根据传入的超时值添加定时器
            msg.timer = swoole_timer_add(timeout, false, timer_callback, &msg);
        }

		// 挂起消费者协程
        yield(CONSUMER);

		// 如果设置了定时器,则在超时消息中删除定时器
        if (msg.timer) {
            swoole_timer_del(msg.timer);
        }
   
        // 如果当前协程被取消
        if (current_co->is_canceled()) {
        	// 设置错误并返回空指针
            error_ = ERROR_CANCELED;
            return nullptr;
        }
        
        // 如果发生超时
        if (msg.error) {
        	// 设置错误并返回空指针
            error_ = ERROR_TIMEOUT;
            return nullptr;
        }

        // 如果通道关闭且为空的情况
        if (closed && is_empty()) {
        	// 设置相应的错误并返回空指针。
            error_ = ERROR_CLOSED;
            return nullptr;
        }
    }
    
    // 从数据队列中弹出数据,并返回该数据。
    void *data = data_queue.front();
    data_queue.pop();

    // 如果生产者队列不为空,则唤醒生产者协程
    if (!producer_queue.empty()) {
        Coroutine *co = pop_coroutine(PRODUCER);
        // 恢复到生产者协程
        co->resume();
    }
    return data;
}

这段代码一是针对超时回调处理的处理逻辑,并恢复相关的协程操作。二是实现了协程的挂起操作,并根据不同的类型将当前协程放入不同的队列中,以便后续根据需要恢复执行。

// swoole-src/coroutine/channel.cc:22
void Channel::timer_callback(Timer *timer, TimerNode *tnode) {
    TimeoutMessage *msg = (TimeoutMessage *) tnode->data;
    msg->error = true;
    msg->timer = nullptr;
    if (msg->type == CONSUMER) {
    	// 从消费者队列中移除该协程
        msg->chan->consumer_remove(msg->co);
    } else {
    	// 从生产者队列中移除该协程
        msg->chan->producer_remove(msg->co);
    }
    // 恢复协程
    msg->co->resume();
}

// swoole-src/coroutine/channel.cc:34
void Channel::yield(enum Opcode type) {
	// 获取当前协程
    Coroutine *co = Coroutine::get_current_safe();
    if (type == PRODUCER) {
    	// 将当前协程放入到生产者队列
        producer_queue.push_back(co);
        swoole_trace_log(SW_TRACE_CHANNEL, "producer cid=%ld", co->get_cid());
    } else {
    	// 将当前协程放入到消费者队列
        consumer_queue.push_back(co);
        swoole_trace_log(SW_TRACE_CHANNEL, "consumer cid=%ld", co->get_cid());
    }
    
    // 挂起被取消,则调用该函数
    Coroutine::CancelFunc cancel_fn = [this, type](Coroutine *co) {
        if (type == CONSUMER) {
            consumer_remove(co);
        } else {
            producer_remove(co);
        }
        co->resume();
        return true;
    };

    // 挂起当前协程
    co->yield(&cancel_fn);
}

总结

  1. Channel 通道需要在协程的环境中进行使用,通道是纯内存操作,没有 IO 消耗,非常高效。
  2. 底层使用 Channel::yield 函数实现了协程的自动切换和调度,如果通道处理超时则会自动调用 Channel::timer_callback 函数。
  3. Channel 通道是跨协程直接通信的一大利器,在实际的场景中使用起来十分的便利、高效。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1571111.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

非关系型数据库(缓存数据库)redis的基础认知与安装

目录 一.关系型数据库和非关系型数据库 关系型数据库 非关系型数据库 关系数据库与非关系型数据库的区别 ①非关系数据 关系型数据库 非关系型数据库产生背景 数据存储流向 非关系型数据库 关系数据库 二.redis的简介 1.概念 2.Redis 具有以下几个优点: 3.Redi…

Salesforce团队:科技文档的受众比你想象要多

▲ 搜索“大龙谈智能内容”关注公众号▲ 扫码见我视频号上的视频 视频时长为38分钟&#xff0c;如果你没有时间看&#xff0c;这里是我用AI做的总结&#xff1a; 视频总结 Salesforce的文档团队成员讨论了文档在销售过程中对潜在客户的重要性。 集锦 00:48 &#x1f4a1; S…

Java简单实现一个LRU(最近最少使用淘汰策略)

目录 LRU介绍&#xff1a; 一些淘汰策略&#xff1a; Java简单实现LRU&#xff1a; 测试&#xff1a; ​编辑 实现原理&#xff1a; LRU介绍&#xff1a; LRU 是 "Least Recently Used" 的缩写&#xff0c;意为"最近最少使用"。它是计算机科学中的一种…

VSCODE EIDE使用debug记录

用上vscode之后就感觉之前的keil不太爽了&#xff0c;找什么东西搜索都很麻烦&#xff0c;之前有写过eide的文章&#xff0c;想着能不能在eide里面就把debug也做了&#xff0c;发现真的可以&#xff0c;下面记录一下&#xff0c;主要是参考这个大佬的文章&#xff0c;非常感谢。…

k8s CNI Calico 网络模式总结

目录 calico架构图 IPIP模式下的架构图 calico 核心组件 Overlay 网络模式&#xff1a; Pod IP对外暴露 不对外暴露&#xff1a; 实现对外暴露的方法&#xff1a; overlay模式下的网络MTU Iptables & ipvs overlay的主要缺点&#xff1a; Full-mesh Unoverla…

Java毕业设计 基于SSM jsp商城系统 美妆系统

Java毕业设计 基于SSM jsp商城系统 美妆系统 SSM jsp 商城系统 美妆系统 功能介绍 首页 分类展示商品 搜索商品 登录 注册 邮箱激活 购物车 结算 支付 我的订单 个人信息设置 后台管理 登录 商品管理 添加修改下架商品 商品类型管理 添加修改删除分类 订单管理 确认发货 取消…

Leetcode 216.组合总和III

题目 思路 题目说只使用数字1-9&#xff0c;是k个数的和 树的宽度是1-9&#xff0c;树的深度是k 1.确定递归函数的返回值及参数&#xff1a; 返回值是void,参数这里还是先设定两个全局变量。一个是path存放符合条件单一结果。如&#xff1a;&#xff08;1&#xff0c;2&…

Web攻击越发复杂,企业如何保护云上业务

如今&#xff0c;电子政务、电子商务、网上银行、网上营业厅等依托Web应用&#xff0c;为广大用户提供灵活多样的服务。在这之中&#xff0c;流量攻击堪称是Web应用的最大敌人&#xff0c;黑客通过流量攻击获取利益、竞争对手雇佣黑客发起恶意攻击、不法分子通过流量攻击瘫痪目…

[VulnHub靶机渗透] pWnOS 2.0

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【Java、PHP】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收…

TiDB 实战分享丨第三方支付企业的核心数据库升级之路

本文介绍了一家第三方支付企业在面对市场竞争和监管压力的态势下&#xff0c;通过升级核心数据库来提升业务能力的实践。该企业选择 TiDB 分布式数据库&#xff0c;成功将其应用于核心业务、计费、清结算和交易查询等关键系统。TiDB 的水平扩展能力、高可用性和简化数据栈等优势…

交叉验证(Cross-Validation)

交叉验证的基本概念 交叉验证通常用于评估机器学习模型在未知数据上的性能。它将数据集分成k个不同的子集&#xff0c;然后进行k次训练和验证。在每次迭代中&#xff0c;选择一个子集作为测试集&#xff0c;其余的子集作为训练集。这样&#xff0c;每个子集都用作过测试集&…

二、计算机网络体系结构参考模型

一、分层结构 &#xff08;一&#xff09;为什么要分层&#xff1a; 发送文件/数据前要完成的工作&#xff1a; 1&#xff09;发起通信的计算机必须讲数据通信通路进行激活 2&#xff09;要告诉网络如何识别目的主机 3&#xff09;发起通信的计算机要查明目的主机是否开机、并且…

实时渲染 -- 体素化(Voxelization)

我们之前讨论的大部分问题都是关于面表示的。由于这些方法不需要显式地表示物体的内部空间&#xff0c;所以非常高效。 体建模方法表示的是实体而不是表面。使用体模型可以产生更丰富的仿真效果&#xff0c;如物体的运动学行为和半透明光照效果。 一、有限元模型 有限元模型…

升级一下电脑,CPU换I5-14600K,主板换华硕B760M

刚给自己电脑升级了一下&#xff0c;CPU从 AMD R5 5600X 换成 Intel I5-14600K&#xff0c;主板换成了华硕的 TUF GAMING B760M-PLUS WIFI D4。 因为我现有的两根内存是DDR4的&#xff0c;所有我选了个支持DDR4内存的主板。 我发现用AMD处理器时将系统从Win10升级到Win11后变…

关于Linux系统中使用Kazam录制的视频在Win系统中无法播放的问题解决办法

今天在linux系统【ubuntu】中录制了一段视频&#xff0c;想要在win系统中进行剪辑&#xff0c;但是发现无法打开&#xff0c;使用的是Kazam录制的mp4格式视频。 Kazam录制安装与使用方式&#xff1a; 安装方式——linux终端输入&#xff1a; sudo apt-get install kazam使用…

04---webpack编写可维护的构建配置

01 构建配置抽离成npm包&#xff1b; 意义&#xff1a;通用性&#xff1a; 业务开发者无需关注构建配置 统一团队构建脚本可维护性&#xff1a;构建配置合理的拆分 质量&#xff1a;冒烟测试 单元测试 持续集成构建配置管理的可选方案&#xff1a;1 通过多个配置文件管理不同…

Android模拟器Android Emulator进行快照snapshot保存时问题

在用Android Emulator进行快照保存时出现问题&#xff0c;不能保存快照&#xff0c;并提示 current state is not support snapshot。 在网上查找了一圈&#xff0c;发现没有针对这个问题的方案&#xff0c;比较接近的方案都是eclipse年代的&#xff0c;说要进行enable snaps…

刷题之Leetcode27题(超级详细)

27. 移除元素 力扣题目链接(opens new window)https://leetcode.cn/problems/remove-element/ 给你一个数组 nums 和一个值 val&#xff0c;你需要 原地 移除所有数值等于 val 的元素&#xff0c;并返回移除后数组的新长度。 不要使用额外的数组空间&#xff0c;你必须仅使用…

可以写网易云的了!

你好&#xff0c;我是云桃桃。 一个希望帮助更多朋友快速入门 WEB 前端的程序媛。 1枚程序媛&#xff0c;大专生&#xff0c;2年时间从1800到月入过万&#xff0c;工作5年买房。 分享成长心得。 259篇原创内容-gzh 后台回复“前端工具”可获取开发工具&#xff0c;持续更新中…

苍穹外卖Day04套餐管理部分总结

写给像我一样完完全全的小白的。本人代码水平一塌糊涂&#xff0c;前几天就是机械地跟着视频敲代码。对于Day04的作业本来感觉代码抓瞎一点不会写&#xff0c;尽力去理解业务逻辑后发现好像也没那么难&#xff0c;整体代码可以仿照Day03新增菜品来进行实现&#xff01; 一、功…