WorkFlow GO-Task 源码分析

news2024/10/21 1:13:07

WorkFlow GO-Task 源码分析

前言

任何好的框架的设计都是围绕着一个核心思想去展开,sylar的一切皆协程、muduo的one loop per thread等。一切皆是任务流就是workflow的精髓。(PS,目前作者功力尚浅,许多设计细节还未能悟透其用意,目前也只能尽力将我的理解呈现出来,有错误非常欢迎指出。

也是尝试着阅读过许多开源优秀的代码,这里记录一下我个人在阅读一份源码时的习惯:适可而止的自低向上。因为我在阅读一份完全不了解的源码时,迫不及待的想去知道每个每个模块、每个函数的实现细节,我也曾尝试以自顶向下去阅读一份源码,但是无法克制自己钻牛角尖的心,并且在经验尚浅,完全不了解设计背景的境况下,自顶向下去阅读一份源码,某一个函数的实现你只能去猜,由于经验尚浅,你大概率猜的也是错误的。所以,兜兜转转,我还是遵循我个人的习惯,自低向上去阅读一份源码。当然,应该:适可而止的自低向上,一些你完全知道起什么作用的模块其实就不必去深究了,比如:链表、红黑树、编码器等。深入细节的同时,也不要忘了我们的初心:框架的设计思想。

网络框架(包括库)的模块设计其实有很多相似的地方,比如都会有的:线程池、对epoll的封装、对io接口的封装、对tcpserver以及tcpclient的封装等。在阅读网络并发相关的源码时可以以这些方面入手。

在深入阅读workflow的源码之后,特别是在kernel文件夹下对一些基础模块的封装中感受到了对c++的克制使用。因为kernel下基础模块的实现大多都是以c语言为主。这点大家要有一个心理准备。

这里建议读者在阅读workflow,go-task源码时,以如下顺序阅读:

ExecQueue -> ExecSession -> Executor-> ExecRequest -> SubTask -> __ExecManager -> __WFGoTask -> WFGoTask -> SeriesWork

正文

下面直接以workflow给的gotask的示例作为本文的切入点:

用法

go-task的用法示例如下:

#include <stdio.h>
#include <utility>
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"

void add(int a, int b, int& res) {
    res = a + b;
}

int main(void) {
    WFFacilities::WaitGroup wait_group(1);
    int a = 1;
    int b = 1;
    int res;

    WFGoTask *task = WFTaskFactory::create_go_task("test", add, a, b, std::ref(res));   // cb1
    task->set_callback([&](WFGoTask *task) {    // cb2
        printf("%d + %d = %d\n", a, b, res);
        wait_group.done();
    });
 
    task->start();
    wait_group.wait();
    return 0;
}

如果你有一定网络编程的基础,应该很容易看懂这段小daemo。我们可以这段代码猜测:

第一行声明了一个WaitGroup变量,从后面的代码可以知道wait_group的作用是:阻塞主线程等待计算完成。在创建wait_group后,将计算过程add函数封装在一个回调函数(cb1)当中,cb1作为一个参数再来构造一个任务–WFGoTask,然后调用WFGoTask::set_callback函数又设置了一个回调函数(cb2),从代码上可以看到,该cb2的作用是:打印计算结果并通知主线程计算完毕。

所以经过上面的分析,我们可以知道:

  1. WaitGroup的实现一定是基于条件变量/信号量。

  2. 作为WFGoTask构造参数cb1,一定某一时刻被线程池里面的某个线程给调用了,并且该线程在调用add函数返回之后,一定是直接或者间接调用了一下cb2。

源码简析

示例代码中create_go_task的第一个参数其实是kernel目录下的ExecQueue队列对应的队列名。ExecQueue具体的用法以及作用稍后讲解,只需知道它是一个队列即可。

create_go_task实现很简单,它里面就是依赖一个全局的单例__ExecManager,通过这个单例拿到队列名对应的队列指针以及Executor对象。然后将队列和Executor对象作为__WFGoTask的构造参数,创建出了继承自WFGoTask的__WFGoTask对象。

这里备注一下:__ExecManager单例管理从队列名到队列指针的映射。并且在__ExecManager初始化时,会创建一个Executor对象。

目前为止,出现了几个新的类:ExecQueue、Executor、__WFGoTask。

对于ExecQueue从kernel目录下可以看到它的源码,单纯就是一个链表,使用的还是linux原生链表。它的每一个节点都是ExecSessionEntry类型,如下定义:

struct ExecSessionEntry {
	struct list_head list;
	ExecSession *session;
	thrdpool_t *thrdpool;
};

单独看ExecQueue、ExecSession、ExecSessionEntry的源码一定会蒙(我就是),所以这里直接讲解Executor的实现,前面的三个类就是被它所使用。

void Executor::executor_thread_routine(void *context) {
	ExecQueue *queue = (ExecQueue *)context;
	struct ExecSessionEntry *entry;
	ExecSession *session;
	int empty;

	entry = list_entry(queue->session_list.next, struct ExecSessionEntry, list);
	pthread_mutex_lock(&queue->mutex);
	list_del(&entry->list);
	empty = list_empty(&queue->session_list);
	pthread_mutex_unlock(&queue->mutex);

	session = entry->session;
	if (!empty) {
		struct thrdpool_task task = {
			.routine	=	Executor::executor_thread_routine,
			.context	=	queue
		};
		__thrdpool_schedule(&task, entry, entry->thrdpool);
	}
	else
		free(entry);

	session->execute();
	session->handle(ES_STATE_FINISHED, 0);
}

流程如下:

  1. 从队列中取ExecSessionEntry。

  2. 队列非空的话,将ExecSessionEntry中的session包装成thrdpool_task,并且将ExecSessionEntry的地址复用成线程池的__thrdpool_task_entry(PS:线程池在拿到__thrdpool_task_entry时用完后会自动free掉)。

  3. 队列为非空的话,直接free掉ExecSessionEntry。

  4. 最后执行ExecSession的execute、handle。

这里的execute函数其实暗示着会调用cb1,handle其实就暗示里面会调用cb2。这下前后不就连起来了?(恍然大悟!)别着急,我们继续去剖析源码。

细心的读者应该会发现这句代码没被放在锁里面:

entry = list_entry(queue->session_list.next, struct ExecSessionEntry, list);

为什么可以不放在锁里面?如果线程2,在线程1执行完list_del之前,拿到了同一个entry,这样不会有野指针的问题吗?

这里放出我的猜测:Executor::executor_thread_routine本身就已经保证了一个时刻只会有一个线程访问队列头部。这个函数的执行逻辑是这样的:当前Executor::executor_thread_routine的回调是靠上一个Executor::executor_thread_routine回调访问完链表头部之后触发的,也即下一个队列头部访问的回调还得靠上一个回调来封装。这里其实有点并行任务串行化的味道了。

struct thrdpool_task task = {
    .routine	=	Executor::executor_thread_routine,
    .context	=	queue
};
__thrdpool_schedule(&task, entry, entry->thrdpool);

最后是ExecQueue队列的start点,如下:

int Executor::request(ExecSession *session, ExecQueue *queue) {
	struct ExecSessionEntry *entry;

	session->queue = queue;
	entry = (struct ExecSessionEntry *)malloc(sizeof (struct ExecSessionEntry));
	if (entry) {
		entry->session = session;
		entry->thrdpool = this->thrdpool;
		pthread_mutex_lock(&queue->mutex);
		list_add_tail(&entry->list, &queue->session_list);
		if (queue->session_list.next == &entry->list) {
			struct thrdpool_task task = {
				.routine	=	Executor::executor_thread_routine,
				.context	=	queue
			};
			if (thrdpool_schedule(&task, this->thrdpool) < 0) {
				list_del(&entry->list);
				free(entry);
				entry = NULL;
			}
		}

		pthread_mutex_unlock(&queue->mutex);
	}

	return -!entry;
}

从源码中可以看到,就是使用malloc分配一块内存,将session封装成ExecSessionEntry,然后将其添加到队列尾部,如果队列原来为空(意味着ExecQueue没有开始执行),就启动第一个Executor::executor_thread_routine,这样它会自动链式触发执行队列当中的每一个任务的回调。

这里malloc分配的ExecSessionEntry由两个地方去释放:

  1. 这里malloc分配的ExecSessionEntry会被复用为线程池的__thrdpool_task_entry,最后被线程池调用free释放掉。

  2. 在函数Executor::executor_thread_routine中,由ExecQueue最后一个任务调用free释放。

从这里可以看到,workflow针对内存的释放也是极其晦涩(反正我在阅读源码时就是这样感觉)。为了性能,根本没使用智能指针,完全靠malloc和free。内存池也没有,这点我是无法理解的。

经过上面的分析我们了解了ExecSession、ExecQueue、Executor的作用,接下来我们分析一下,__WFGoTask是怎么使用这些类的。

从本段开头了解到ExecQueue、Executor是作为__WFGoTask的构造参数,所以下面我们以__WFGoTask为主去看看它是怎么实现的

class __WFGoTask : public WFGoTask {
    // ...
protected:
	virtual void execute() {
		this->go();
	}

protected:
	std::function<void ()> go;

public:
	__WFGoTask(ExecQueue *queue, Executor *executor,
			   std::function<void ()>&& func) :
		WFGoTask(queue, executor),
		go(std::move(func)) { /* ... */ }
};

使用了virtual关键字声明的execute函数!,并且调用了go也即cb1!(衔接起来了!)

继续看它基类的实现:

class WFGoTask : public ExecRequest {
public:
	void start() {
		assert(!series_of(this));
		Workflow::start_series_work(this, nullptr);
	}

public:
	void *user_data;

public:
	void set_callback(std::function<void (WFGoTask *)> cb) {
		this->callback = std::move(cb);
	}

protected:
	virtual SubTask *done() {
		SeriesWork *series = series_of(this);

		if (this->callback)
			this->callback(this);

		delete this;
		return series->pop();
	}

protected:
	std::function<void (WFGoTask *)> callback;

public:
	WFGoTask(ExecQueue *queue, Executor *executor) :
		ExecRequest(queue, executor) { /* ... */ }
};

WFGoTask::start()正是示例当中调用的start函数,set_callback正是设置的cb2回调。我可以明确的说,start_series_work会创建一个SeriesWork对象,并且将SeriesWork对象的指针赋值给WFGoTask祖父类SubTask的user_data成员,并且SeriesWork其实也是一个队列,它是串行队列,队列当中的任务是有先后执行顺序的。这里串行队列的设计是为特定的有先后依赖顺序的计算场景所设计的。

深入查看ExecRequest的实现:

class ExecRequest : public SubTask, public ExecSession {
public:
	ExecRequest(ExecQueue *queue, Executor *executor) { /* ... */ }

public:
	virtual void dispatch() {
		if (this->executor->request(this, this->queue) < 0)
			this->handle(ES_STATE_ERROR, errno);
	}

protected:
	ExecQueue *queue;
	Executor *executor;

protected:
	virtual void handle(int state, int error) {
		this->state = state;
		this->error = error;
		this->subtask_done();
	}
};

SubTask类和ExecSession类非常简单,由于篇幅有限这只列出我们关心的函数。

SubTask有三个关键函数:

虚函数:dispatch、done

普通成员函数:subtask_done。

SubTask::dispatch 最终被重写为:ExecRequest::dispatch

SubTask::done 最终被重写为:WFGoTask::done

其中subtask_done实现如下:

void SubTask::subtask_done() {
	SubTask *cur = this;

	while (1) {
		cur = cur->done();
		if (cur) {
			cur->dispatch();
		}
        /* ... */

		break;
	}
}

done的实现落实到了WFGoTask::done上,作用是销毁当前的task对象并且返回串行队列当中的下一个task,然后由subtask_done调用ExecRequest::dispatch将task挂到ExecQueue的链表上等待线程池的消费。

ExecSession有两个我们比较关心的纯虚函数:execute、handle。这两函数一路继承体系下来最终分别被重写为__WFGoTask::execute和ExecRequest::handle。

所以在Executor::executor_thread_routine函数中调用的execute、handle函数最终被重写为:__WFGoTask::execute、ExecRequest::handle()。

最后总结一下go-task执行的流程:

  1. 构造一个go-task对象 && 调用start函数。

  2. start函数会new一个first为go-task,last为nullptr的SeriesWork对象 && 调用first的dispatch也即ExecRequest::dispatch。

  3. executor的request函数,将go-task挂到ExecQueue链表的尾部上,由线程池去消费。当然,如果ExecQueue原来是为空的,就创建第一个Executor::executor_thread_routine。

  4. Executor::executor_thread_routine会链式触发让线程池处理ExecQueue每一个任务。

  5. 调用任务的__WFGoTask::execute。

  6. 调用任务的ExecRequest::handle。

  7. 调用SubTask::subtask_done && (如果存在的话)调用SeriesWork对象的下一个task的dispatch(PS,可能不是ExecRequest::dispatch这个重载函数)

  8. 调用WFGoTask::done。删除当前task对象并且返回串行队列的下一个串行任务。

最后要还要提醒的一句是:Executor::executor_thread_routine在向ExecQueue的链表取任务时是保证非并发的,但是在执行任务的execute时,是有可能是并发执行的! 有人可能会注意到那为什么在向链表取任务时要加锁?因为这把锁可能防止Executor::executor_thread_routine和Executor::request之间的竞争问题,而Executor::executor_thread_routine和Executor::executor_thread_routine之间并不存在竞争问题。


本章完结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2219686.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

无人机之放电速率篇

无人机的放电速率是指电池在一定时间内放出其储存电能的能力&#xff0c;这一参数对无人机的飞行时间、性能以及安全性都有重要影响。 一、放电速率的表示方法 放电速率通常用C数来表示。C数越大&#xff0c;表示放电速率越快。例如&#xff0c;一个2C的电池可以在1/2小时内放…

《知道做到》

整体看内容的信息密度较低。绿灯思维、积极心态、反复练习值得借鉴。 引言 行动是老子&#xff0c;知识是儿子&#xff0c;创造是孙子&#xff01;行是知之始&#xff0c;知是行之成。 前言 工作中最让你失望的事情是什么&#xff1f; 一个人行为的改变总是先从内心想法的转…

MySQL 【日期】函数大全(六)

目录 1、TIME_FORMAT() 按照指定的格式格式化时间。 2、TIME_TO_SEC() 将指定的时间值转为秒数。 3、TIMEDIFF() 返回两个时间之间的差值。 4、TIMESTAMP() 累加所有参数并将结果作为日期时间值返回。 5、TIMESTAMPADD() 将指定的时间间隔加到一个日期时间值上并返回结果。…

数据库->库的操作

目录 一、查看数据库 1.显示所有的数据库 二、创建数据库 1.创建数据库 2.查看警告信息 3.创建一个名为database的数据库 三、字符集编码和校验(排序)规则 1.查看数据库⽀持的字符集编码 2.查看数据库⽀持的排序规则 3.一条完整创建库的语句 4. 不同的字串集与排序规…

keepalived(高可用)+nginx(负载均衡)+web

环境 注意&#xff1a; (1) 做高可用负载均衡至少需要四台服务器&#xff1a;两台独立的高可用负载均衡器&#xff0c;两台web服务器做集群 (2) vip&#xff08;虚拟ip&#xff09;不能和物理ip冲突 (3) vip&#xff08;虚拟ip&#xff09;最好设置成和内网ip同一网段&#xf…

传感器驱动系列之PAW3212DB鼠标光电传感器

目录 一、PAW3212DB鼠标光电传感器简介 1.1 主要特点 1.2 引脚定义 1.3 传感器组装 1.4 应用场景 1.5 传感器使用注意 1.5.1 供电选择 1.5.2 SPI读写设置 1.5.3 MOTION引脚 1.6 寄存器说明 1.6.1 Product_ID1寄存器 1.6.2 MOTION_Status寄存器 1.6.3 Delta_X寄存器…

【论文笔记】X-Former: Unifying Contrastive and Reconstruction Learning for MLLMs

&#x1f34e;个人主页&#xff1a;小嗷犬的个人主页 &#x1f34a;个人网站&#xff1a;小嗷犬的技术小站 &#x1f96d;个人信条&#xff1a;为天地立心&#xff0c;为生民立命&#xff0c;为往圣继绝学&#xff0c;为万世开太平。 基本信息 标题: X-Former: Unifying Contr…

为您的 WordPress 网站打造完美广告布局 A5广告单元格插件

一个为 WordPress 网站量身定制的强大工具,它将彻底改变您展示广告的方式 灵活多变的布局设计 A5 广告单元格插件的核心优势在于其无与伦比的灵活性。无论您是想要创建整齐的网格布局,还是希望打造独特的不规则设计,这款插件都能满足您的需求。 自定义网格数量&#xff1a;从 2…

C# 条形码、二维码标签打印程序

1、条码标答打印主界面 2、打印设置 3、生成QR代码 private void GetBarcode_T(string lr) { QRCodeEncoder qrCodeEncoder = new QRCodeEncoder();//创建一个对象 qrCodeEncoder.QRCodeEncodeMode = QRCodeEncoder.ENCODE_MODE.BYTE; //设置编码测量…

Mamba学习笔记(2)—序列数据处理基础

文章目录 (1) RNN&#xff08;Recurrent Neural Networks&#xff09;基本原理代码定义 (2) SLTM (Long Short-Term Memory)基本原理代码定义 (3) GRU (Gated Recurrent Unit)基本原理代码定义 (4) Transformer&#xff08;☆☆☆Attention Is All You Need☆☆☆&#xff09;0…

量子门电路开销——T门、clifford门、toffoli门、fredkin门

在量子计算中&#xff0c;T门的成本比Clifford门高出很多倍的原因与量子计算中纠错的实现、物理门操作的复杂性以及容错量子计算架构中的成本评估有关。以下是几个关键原因&#xff0c;解释了为什么 T 门的成本在量子计算中远远高于 Clifford 门&#xff1a; 1. T 门和 Cliffo…

递归、搜索与回溯(二)——递归练习与快速幂

文章目录 递归、搜索与回溯——递归两两交换链表中的节点Pow(x, n) 递归、搜索与回溯——递归 该文仍然是解决递归问题&#xff0c;值得注意的是快速幂算法。接下来会系统学习二叉树深搜题目&#xff0c;慢慢走向搜索与回溯。 两两交换链表中的节点 原题链接&#xff1a;24. 两…

AI识谱——将乐曲转化为五线谱

导言&#xff1a; 会乐曲的小伙伴在听到一首好听的乐曲的时候&#xff0c;肯定想过将这首歌曲转换为谱子给弹出来。除了上网找乐谱、请大神帮忙扒谱或者自己扒谱外&#xff0c;小伙伴也可以尝试一下本文介绍的AI识谱流程&#xff0c;让我们开始吧&#xff01; 注意了&#xf…

2024 Python3.10 系统入门+进阶(十七):面向对象基础

目录 一、面向对象概述1.1 面向对象简介1.2 对象和类1.3 定义属性和行为1.3.1 用数据描述对象的状态1.3.2 行为就是动作 1.4 隐藏细节并创建公共接口1.5 组合1.6 继承1.6.1 继承提供抽象1.6.2 多重继承 二、封装2.1 Python类定义2.2 创建类的成员2.2.1 创建实例方法并访问2.2.2…

PythonExcel批量pingIP地址

问题&#xff1a; 作为一个电气工程师&#xff08;PLC&#xff09;&#xff0c;当设备掉线的时候&#xff0c;需要用ping工具来检查网线物理层是否可靠连接&#xff0c;当项目体量过大时&#xff0c;就不能一个手动输入命令了。 解决方案一&#xff1a; 使用CMD命令 for /L %…

机器学习在聚合物及其复合材料中的应用与实践

在当前的工业和科研领域&#xff0c;聚合物及其复合材料因其卓越的物理和化学性能而受到广泛关注。这些材料在航空航天、汽车制造、能源开发和生物医学等多个行业中发挥着至关重要的作用。随着材料科学的发展&#xff0c;传统的实验和理论分析方法已逐渐无法满足新材料研发的需…

【力扣打卡系列】滑动窗口与双指针(无重复字符的最长子串)

坚持按题型打卡&刷&梳理力扣算法题系列&#xff0c;语言为go&#xff0c;Day7 无重复字符的最长子串 题目描述解题思路 不含重复字符——》考虑使用哈希表来存储记录为了提高效率也可以用数组&#xff0c;hash : [128]bool{} &#xff08;因为存的是字符的ASCLL码&…

【Unity踩坑】无法关闭Unity(Application.Shutdown.CleanupEngine)

安装了Unity 6正式版&#xff0c;在关闭Unity 项目时&#xff0c;会出现下面的提示&#xff0c;一直无法关闭。 一直显示 Application.Shutdown.CleanupEngine。 查了一下。这是一个历史性问题了&#xff0c;看来依然没有解决。 参考&#xff1a;Application.Shutdown.Cleanu…

web API基础

作用和分类 作用: 就是使用 JS 去操作 html 和浏览器 分类&#xff1a; DOM (文档对象模型)、 BOM &#xff08;浏览器对象模型&#xff09; 什么是DOM DOM (Document Object Model) 译为文档对象模型&#xff0c;是 HTML 和 XML 文档的编程接口。 HTML DOM 定义了访问和操作 …

权限(补充)

在上一篇Linux权限&#xff08;想了解的可以点击看看哦&#xff09;中已经见识了一部分权限&#xff0c;但是少了很重要的一部分&#xff1a; 那就是用户之间的转换&#xff0c;文件读写的关系&#xff0c;这里就简单的介绍一些&#xff1b; 我们在Linux权限知道了目录权限的关…