workflow源码解析:ThreadTask

news2024/11/16 13:24:14

1、使用程序,一个简单的加法运算程序

#include <iostream>
#include <workflow/WFTaskFactory.h>
#include <errno.h>

// 直接定义thread_task三要素
// 一个典型的后端程序由三个部分组成,并且完全独立开发。即:程序=协议+算法+任务流。

// 定义INPUT
struct AddInput
{
    int x;
    int y;
};

// 定义OUTPUT
struct AddOutput
{
    int res;
};

// 加法流程
void add_routine(const AddInput *input, AddOutput *output)
{
    output->res = input->x + input->y;
}

using AddTask = WFThreadTask<AddInput, AddOutput>;

void callback(AddTask *task)
{
	auto *input = task->get_input();
	auto *output = task->get_output();

	assert(task->get_state() == WFT_STATE_SUCCESS);

    fprintf(stderr, "%d + %d = %d\n", input->x, input->y, output->res);
}

int main()
{
    using AddFactory = WFThreadTaskFactory<AddInput, AddOutput>;
	AddTask *task = AddFactory::create_thread_task("add_task",
												add_routine,
												callback);
	AddInput *input = task->get_input();

	input->x = 1;
	input->y = 2;

	task->start();

	getchar();
	return 0;
}

2、类继承关系

WFThreadTaskFactory代码

// src/factory/WFTaskFactory.h
template<class INPUT, class OUTPUT>
class WFThreadTaskFactory
{
private:
	using T = WFThreadTask<INPUT, OUTPUT>;
    ...
public:
	static T *create_thread_task(const std::string& queue_name,
								 std::function<void (INPUT *, OUTPUT *)> routine,
								 std::function<void (T *)> callback);

    ...
};
// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
WFThreadTask<INPUT, OUTPUT> *
WFThreadTaskFactory<INPUT, OUTPUT>::create_thread_task(const std::string& queue_name,
						std::function<void (INPUT *, OUTPUT *)> routine,
						std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback)
{
	return new __WFThreadTask<INPUT, OUTPUT>(WFGlobal::get_exec_queue(queue_name),
											 WFGlobal::get_compute_executor(),
											 std::move(routine),
											 std::move(callback));
}

__WFThreadTask代码

// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
class __WFThreadTask : public WFThreadTask<INPUT, OUTPUT>
{
protected:
	virtual void execute()  //实现ExecSession的纯虚函数
	{
		this->routine(&this->input, &this->output); //执行用户程序的routine
	}

protected:
	std::function<void (INPUT *, OUTPUT *)> routine;

public:
	__WFThreadTask(ExecQueue *queue, Executor *executor,
				   std::function<void (INPUT *, OUTPUT *)>&& rt,
				   std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :
		WFThreadTask<INPUT, OUTPUT>(queue, executor, std::move(cb)),
		routine(std::move(rt))
	{
	}
};

WFThreadTask代码

// src/factory/WFTask.h
template<class INPUT, class OUTPUT>
class WFThreadTask : public ExecRequest
{
public:
	void start();
	void dismiss();

	INPUT *get_input() { return &this->input; }
	OUTPUT *get_output() { return &this->output; }

	void *user_data;

	int get_state() const { return this->state; }
	int get_error() const { return this->error; }

	void set_callback(std::function<void (WFThreadTask<INPUT, OUTPUT> *)> cb);
protected:
	virtual SubTask *done();

protected:
	INPUT input;
	OUTPUT output;
	std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback;

public:
	WFThreadTask(ExecQueue *queue, Executor *executor,
				 std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :
		ExecRequest(queue, executor),
		callback(std::move(cb))
	{
        // 初始化
	}

protected:
	virtual ~WFThreadTask() { }
};

ExecRequest代码

// src/kernel/ExecRequest.h
class ExecRequest : public SubTask, public ExecSession
{
public:
	ExecRequest(ExecQueue *queue, Executor *executor);
	ExecQueue *get_request_queue() const { return this->queue; }
	void set_request_queue(ExecQueue *queue) { this->queue = queue; }
	virtual void dispatch()  // 实现SubTask的纯虚函数,这个纯虚函数主要是任务的开始执行接口
	{
		this->executor->request(this, this->queue);
		...
	}

protected:
	int state;
	int error;

	ExecQueue *queue;
	Executor *executor;

protected:
	virtual void handle(int state, int error); // 实现ExecSession的纯虚函数
};

SubTask代码

class SubTask
{
     // 子任务被调起的时机
     virtual void dispatch() = 0;
     // 子任务执行完成的时机
     virtual SubTask *done() = 0;
     // 内部实现,决定了任务流走向
     void subtask_done();
     ...
};

ExecSession代码

/src/kernel/Executor.h
class ExecSession
{
private:
	virtual void execute() = 0;
	virtual void handle(int state, int error) = 0;

protected:
	ExecQueue *get_queue() { return this->queue; }

private:
	ExecQueue *queue;
    ...
};

继承关系图

__WFThreadTask__目前还未用到,暂不清楚

在这里插入图片描述

3、两个重要成员: ExecQueue, Executor

ExecQueue代码

/src/kernel/Executor.h
class ExecQueue
{
    ...
private:
	struct list_head task_list;
	pthread_mutex_t mutex;
};

Executor代码

/src/kernel/Executor.h
class Executor
{
public:
    // 一次要执行的接口,对于线程执行器来说,就是把一个执行任务扔进某个队列中
    int request(ExecSession *session, ExecQueue *queue);

private:
    // 执行器和系统资源,是一个包含关系
    thrdpool_t *thrdpool;
};

request() 函数把任务扔进线程池队列等待执行,线程池会从队列拿到这个任务,然后执行executor_thread_routine

// src/kernel/Executor.cc
int Executor::request(ExecSession *session, ExecQueue *queue)
{
    ExecSessionEntry *entry = new ExecSessionEntry;

	session->queue = queue;
	entry->session = session;
	entry->thrdpool = this->thrdpool;
	queue->mutex.lock();
	list_add_tail(&entry->list, &queue->session_list);
	if (queue->session_list.next == &entry->list)
	{
		struct thrdpool_task task = {Executor::executor_thread_routine, queue};
		/*
		{
			.routine	=	Executor::executor_thread_routine,
			.context	=	queue
		};
		*/
		if (thrdpool_schedule(&task, this->thrdpool) < 0)
		{
			list_del(&entry->list);
			delete entry;
			entry = NULL;
		}
	}

	queue->mutex.unlock();
	return -!entry;
}
struct ExecSessionEntry
{
	struct list_head list;
	ExecSession *session;
	thrdpool_t *thrdpool;
};
// src/kernel/Executor.cc
void Executor::executor_thread_routine(void *context)
{
	ExecQueue *queue = (ExecQueue *)context;
	ExecSessionEntry *entry;
	ExecSession *session;

	queue->mutex.lock();
	entry = list_entry(queue->session_list.next, ExecSessionEntry, list);
	list_del(&entry->list);
	session = entry->session;
	if (!list_empty(&queue->session_list))
	{
		struct thrdpool_task task = {Executor::executor_thread_routine, queue};
		/*
		{
			.routine	=	Executor::executor_thread_routine,
			.context	=	queue
		};
		*/
		__thrdpool_schedule(&task, entry, entry->thrdpool);
	}
	else
		delete entry;

	queue->mutex.unlock();
	session->execute(); //这里会执行到用户routine
	session->handle(ES_STATE_FINISHED, 0);
}

4、参考链接

https://github.com/chanchann/workflow_annotation/blob/main/src_analysis/12_thread_task.md
https://blog.csdn.net/j497205974/article/details/135554164?spm=1001.2014.3001.5502

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1390822.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

解决C语言wprintf函数无法打印中文的问题

在Visual Studio中&#xff0c;wchar_t[]字符数组用来存储UTF-16编码的字符串&#xff0c;但C语言库函数wprintf无法打印含有汉字的wchar_t字符串。 解决办法是用WriteConsoleW函数重新实现一个自己的my_wprintf函数。 #include <stdio.h> #include <Windows.h>//…

PDF文件中字体乱码的一种简单的处理方法

要解决问题先得碰到问题&#xff0c;碰到问题就迈出了解决问题的关键一步。 问题PDF文件的下载链接 这文件用Acrobat打开&#xff0c;无法搜索文本&#xff0c;复制文本出来也都是乱码。但用sumatra PDF打开就不存在这个问题&#xff01; 用Acrobat的印前检查解决。prefligh…

Python网络爬虫进阶:自动切换HTTP代理IP的应用

前言 当你决定做一个网络爬虫的时候&#xff0c;就意味着你要面对一个很大的挑战——IP池和中间件。这两个东西听起来很大上&#xff0c;但其实就是为了让你的爬虫不被封杀了。下面我就来给你讲讲如何搞定这些东西。 第一步&#xff1a;创建爬虫IP池的详细过程 首先&#xf…

电商数据分析--常见的数据采集工具及方法

数据采集|数据运营和数据分析 走进数据&#xff0c;一起学习数据处理&#xff0c;数据分析&#xff0c;数据挖掘&#xff0c;一起成长&#xff0c;相信通过一起努力&#xff0c;未来2-3年我们都会成为公司的中流砥柱。懂数据&#xff0c;会分析&#xff0c;会挖掘&#xff0c;…

mathtype2024版本下载与安装(mac版本也包含在内)

安装包补丁主要是mathtype的安装包&#xff0c;与它的补丁。 详细安装过程&#xff1a; step1&#xff1a; 使用方法是下载完成后先安装MathType-win-zh.exe文件&#xff0c;跟着步骤走直接安装就行。 step2&#xff1a; 关闭之后&#xff0c;以管理员身份运行MathType7PJ.exe…

【linux】visudo

碎碎念 visudo命令是用来修改一个叫做 /etc/sudoers 的文件的&#xff0c;用来设置哪些 用户 和 组 可以使用sudo命令。并且使用visudo而不是使用 vi /etc/sudoers 的原因在于&#xff1a;visudo自带了检查功能&#xff0c;可以判断是否存在语法问题&#xff0c;所以更加安全 …

单节点部署 Gpmall 商城系统

目录 实验中使用的技术 实验过程 实验中使用的技术 Java Redis Elasticsearch&#xff08;先不用&#xff09; Nginx MariaDB ZooKeeper Kafka 实验过程 1.Xnode1克隆虚拟机gpmall CRT连接&#xff08;root密码&#xff1a;000000&#xff09; 2修改主机名 [root…

纵行科技参加“十四五”国家重点研发计划课题“工业化建造自动识别与数据采集(AIDC)成套技术”工程试点

近期&#xff0c;“十四五”国家重点研发计划NQI课题组“产学研用”联合团队开展的“工业化建造自动识别与数据采集&#xff08;AIDC&#xff09;成套技术”工程建造场景集成应用试点&#xff08;第一阶段&#xff09;&#xff0c;在广州白云国际机场T3航站楼项目西指廊及北港湾…

uniapp使用安装sass

1.首先你要安装node-sass npm install node-sass --save-dev2.安装sass-loader npm install sass-loader --save-dev3.修改style标签&#xff0c;声明使用sass <style lang"scss" scoped>

AI工具(20240116):Copilot Pro,Fitten Code等

Copilot Pro Copilot Pro是微软推出的Copilot的付费增强版本,通过提供优先访问GPT-4等最新AI模型,大大提升用户的创造力和工作效率。该服务可与Microsoft 365订阅捆绑使用,支持在Word、Excel等Office应用内直接使用Copilot功能,帮助用户更快速地起草文档、电子邮件和演示文稿等…

【CV】使用 matplotlib 画统计图,并用 OpenCV 显示静图和动图

1. 效果 静图 动图 2.思路 准备数据使用 pyplot 画统计图图片写入流&#xff0c;流转图&#xff08;numpy&#xff09;matplotlib 颜色 RGB 转 OpenCV 颜色 BRG 4. 静图 代码过程有注释&#xff0c;很简单的实现。注意 matplotlib RGB 转 OpenCV BGR image image[:, :,…

刘知远LLM入门到实战——自然语言基础

文章目录 自然语言处理基础词表示语言模型N-gram ModelNeural Language Model: 为什么NLP等领域的模型越来越大&#xff1f; 大模型会带来哪些新的范式和挑战&#xff1f; 自然语言处理基础 让计算机理解人类语言&#xff0c;图灵测试就是基于对话的方式。 研究历史&#xff…

shell简单截取curl GET返回的body消息体

目录 需求背景&#xff1a; 示例&#xff1a; 解决方式&#xff1a; 需求背景&#xff1a; 用shell解析 curl命令GET到的消息体&#xff0c;获取body消息体里的某个字段的值,只是个简单的示例&#xff0c;可以在此基础上更改满足自己的需求 示例&#xff1a; curl一个API…

pytorch一致数据增强—独用增强

前作 [1] 介绍了一种用 pytorch 模仿 MONAI 实现多幅图&#xff08;如&#xff1a;image 与 label&#xff09;同用 random seed 保证一致变换的写法&#xff0c;核心是 MultiCompose 类和 to_multi 包装函数。不过 [1] 没考虑各图用不同 augmentation 的情况&#xff0c;如&am…

鸿蒙使用 axios

1、已安装ohpm&#xff0c;可参考上一篇 2、回到项目的根目录执行 ohpm install ohos/axios 安装成功后&#xff0c;查看项目的package 3、开放网络权限 在模块的module.json5中添加权限 "module": {"requestPermissions": [{"name": "…

【FastAPI】路径参数(二)

预设值 如果你有一个接收路径参数的路径操作&#xff0c;但你希望预先设定可能的有效参数值&#xff0c;则可以使用标准的 Python Enum 类型。 导入 Enum 并创建一个继承自 str 和 Enum 的子类。通过从 str 继承&#xff0c;API 文档将能够知道这些值必须为 string 类型并且能…

智能时代,让AI为你撰写专业应用文

大家好我是在看&#xff0c;记录普通人学习探索AI之路。 何谓应用文&#xff1f;简单来说&#xff0c;应用文是指在日常生活中以及工作中撰写的&#xff0c;旨在传递信息、处理事务的一种文体类型。其范畴广泛&#xff0c;涵盖了诸如请假条、通知书、辞职信、检查报告、欠条、…

回归预测 | Matlab实现MSADBO-CNN-LSTM基于改进蜣螂算法优化卷积神经网络-长短期记忆神经网络多特征回归预测

回归预测 | Matlab实现MSADBO-CNN-LSTM基于改进蜣螂算法优化卷积神经网络-长短期记忆神经网络多特征回归预测 目录 回归预测 | Matlab实现MSADBO-CNN-LSTM基于改进蜣螂算法优化卷积神经网络-长短期记忆神经网络多特征回归预测预测效果基本描述程序设计参考资料 预测效果 基本描…

分布式搜索引擎ElasticSearch——基础

分布式搜索引擎ElasticSearch——基础 文章目录 分布式搜索引擎ElasticSearch——基础初识elasticsearch什么是elasticsearchelasticsearch的发展正向索引和倒排索引安装elasticsearch&#xff0c;kibana部署单点es创建网络加载镜像运行 部署kibana部署DevTools 安装IK分词器在…

YOLOv5改进系列(26)——添加RFAConv注意力卷积(感受野注意力卷积运算)

【YOLOv5改进系列】前期回顾&#xff1a; YOLOv5改进系列&#xff08;0&#xff09;——重要性能指标与训练结果评价及分析 YOLOv5改进系列&#xff08;1&#xff09;——添加SE注意力机制 YOLOv5改进系列&#xff08;2&#xff09;——添加CBAM注意力机制 YOLOv5改进系列&…