1、使用程序,一个简单的加法运算程序
#include <iostream>
#include <workflow/WFTaskFactory.h>
#include <errno.h>
// 直接定义thread_task三要素
// 一个典型的后端程序由三个部分组成,并且完全独立开发。即:程序=协议+算法+任务流。
// 定义INPUT
struct AddInput
{
int x;
int y;
};
// 定义OUTPUT
struct AddOutput
{
int res;
};
// 加法流程
void add_routine(const AddInput *input, AddOutput *output)
{
output->res = input->x + input->y;
}
using AddTask = WFThreadTask<AddInput, AddOutput>;
void callback(AddTask *task)
{
auto *input = task->get_input();
auto *output = task->get_output();
assert(task->get_state() == WFT_STATE_SUCCESS);
fprintf(stderr, "%d + %d = %d\n", input->x, input->y, output->res);
}
int main()
{
using AddFactory = WFThreadTaskFactory<AddInput, AddOutput>;
AddTask *task = AddFactory::create_thread_task("add_task",
add_routine,
callback);
AddInput *input = task->get_input();
input->x = 1;
input->y = 2;
task->start();
getchar();
return 0;
}
2、类继承关系
WFThreadTaskFactory代码
// src/factory/WFTaskFactory.h
template<class INPUT, class OUTPUT>
class WFThreadTaskFactory
{
private:
using T = WFThreadTask<INPUT, OUTPUT>;
...
public:
static T *create_thread_task(const std::string& queue_name,
std::function<void (INPUT *, OUTPUT *)> routine,
std::function<void (T *)> callback);
...
};
// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
WFThreadTask<INPUT, OUTPUT> *
WFThreadTaskFactory<INPUT, OUTPUT>::create_thread_task(const std::string& queue_name,
std::function<void (INPUT *, OUTPUT *)> routine,
std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback)
{
return new __WFThreadTask<INPUT, OUTPUT>(WFGlobal::get_exec_queue(queue_name),
WFGlobal::get_compute_executor(),
std::move(routine),
std::move(callback));
}
__WFThreadTask代码
// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
class __WFThreadTask : public WFThreadTask<INPUT, OUTPUT>
{
protected:
virtual void execute() //实现ExecSession的纯虚函数
{
this->routine(&this->input, &this->output); //执行用户程序的routine
}
protected:
std::function<void (INPUT *, OUTPUT *)> routine;
public:
__WFThreadTask(ExecQueue *queue, Executor *executor,
std::function<void (INPUT *, OUTPUT *)>&& rt,
std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :
WFThreadTask<INPUT, OUTPUT>(queue, executor, std::move(cb)),
routine(std::move(rt))
{
}
};
WFThreadTask代码
// src/factory/WFTask.h
template<class INPUT, class OUTPUT>
class WFThreadTask : public ExecRequest
{
public:
void start();
void dismiss();
INPUT *get_input() { return &this->input; }
OUTPUT *get_output() { return &this->output; }
void *user_data;
int get_state() const { return this->state; }
int get_error() const { return this->error; }
void set_callback(std::function<void (WFThreadTask<INPUT, OUTPUT> *)> cb);
protected:
virtual SubTask *done();
protected:
INPUT input;
OUTPUT output;
std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback;
public:
WFThreadTask(ExecQueue *queue, Executor *executor,
std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :
ExecRequest(queue, executor),
callback(std::move(cb))
{
// 初始化
}
protected:
virtual ~WFThreadTask() { }
};
ExecRequest代码
// src/kernel/ExecRequest.h
class ExecRequest : public SubTask, public ExecSession
{
public:
ExecRequest(ExecQueue *queue, Executor *executor);
ExecQueue *get_request_queue() const { return this->queue; }
void set_request_queue(ExecQueue *queue) { this->queue = queue; }
virtual void dispatch() // 实现SubTask的纯虚函数,这个纯虚函数主要是任务的开始执行接口
{
this->executor->request(this, this->queue);
...
}
protected:
int state;
int error;
ExecQueue *queue;
Executor *executor;
protected:
virtual void handle(int state, int error); // 实现ExecSession的纯虚函数
};
SubTask代码
class SubTask
{
// 子任务被调起的时机
virtual void dispatch() = 0;
// 子任务执行完成的时机
virtual SubTask *done() = 0;
// 内部实现,决定了任务流走向
void subtask_done();
...
};
ExecSession代码
/src/kernel/Executor.h
class ExecSession
{
private:
virtual void execute() = 0;
virtual void handle(int state, int error) = 0;
protected:
ExecQueue *get_queue() { return this->queue; }
private:
ExecQueue *queue;
...
};
继承关系图
__WFThreadTask__目前还未用到,暂不清楚
3、两个重要成员: ExecQueue, Executor
ExecQueue代码
/src/kernel/Executor.h
class ExecQueue
{
...
private:
struct list_head task_list;
pthread_mutex_t mutex;
};
Executor代码
/src/kernel/Executor.h
class Executor
{
public:
// 一次要执行的接口,对于线程执行器来说,就是把一个执行任务扔进某个队列中
int request(ExecSession *session, ExecQueue *queue);
private:
// 执行器和系统资源,是一个包含关系
thrdpool_t *thrdpool;
};
request() 函数把任务扔进线程池队列等待执行,线程池会从队列拿到这个任务,然后执行executor_thread_routine
// src/kernel/Executor.cc
int Executor::request(ExecSession *session, ExecQueue *queue)
{
ExecSessionEntry *entry = new ExecSessionEntry;
session->queue = queue;
entry->session = session;
entry->thrdpool = this->thrdpool;
queue->mutex.lock();
list_add_tail(&entry->list, &queue->session_list);
if (queue->session_list.next == &entry->list)
{
struct thrdpool_task task = {Executor::executor_thread_routine, queue};
/*
{
.routine = Executor::executor_thread_routine,
.context = queue
};
*/
if (thrdpool_schedule(&task, this->thrdpool) < 0)
{
list_del(&entry->list);
delete entry;
entry = NULL;
}
}
queue->mutex.unlock();
return -!entry;
}
struct ExecSessionEntry
{
struct list_head list;
ExecSession *session;
thrdpool_t *thrdpool;
};
// src/kernel/Executor.cc
void Executor::executor_thread_routine(void *context)
{
ExecQueue *queue = (ExecQueue *)context;
ExecSessionEntry *entry;
ExecSession *session;
queue->mutex.lock();
entry = list_entry(queue->session_list.next, ExecSessionEntry, list);
list_del(&entry->list);
session = entry->session;
if (!list_empty(&queue->session_list))
{
struct thrdpool_task task = {Executor::executor_thread_routine, queue};
/*
{
.routine = Executor::executor_thread_routine,
.context = queue
};
*/
__thrdpool_schedule(&task, entry, entry->thrdpool);
}
else
delete entry;
queue->mutex.unlock();
session->execute(); //这里会执行到用户routine
session->handle(ES_STATE_FINISHED, 0);
}
4、参考链接
https://github.com/chanchann/workflow_annotation/blob/main/src_analysis/12_thread_task.md
https://blog.csdn.net/j497205974/article/details/135554164?spm=1001.2014.3001.5502