计算图的设计
Graph的结构
- Operators: 记录所有的节点
- Input operator: 指定的输入节点
- Output operator: 指定的输出节点
- Global input data: 模型的外部全局输入(用户指定的输入)
Operator的结构
- Input data: 节点的输入数据
- Output data: 节点的输出数据
- Operator params: 计算节点的参数
- Next operators: 该节点的下一个节点,数量有且大于一个
- Layer:
每个
Operator
具体计算的执行者,layer先从input data中取得本层的输入,再通过layer定义的计算过程,并得到output data中计算的过程中所需要的参数已经被提前存放到
Operator params
中
Graph中的数据流动
我们从下图中可以看出,一个Graph
中包含了两个要素,一个要素是多个operators
,另一个要素是连通operators
之间的数据通路。
也就是说,前一个operator
的输出将作为后一个operator
的输入存在,其中在输入和输出中传递的数据,是以前面课程中谈到的Tensor
类进行的。
其中,在普通的计算中,上面的op1的output_data是拷贝到op2的input_data中的,而在我们的这个推理网络中,我们是进行了一个内存的复用的。
我们可以看到,在图中,Graph在执行时在逻辑上可以分为两条路径,一条是控制流,另外一条是数据流。在数据流中,前一个operator
产生的输出传递到后续operator
作为输入。
那么Graph是如何得知一个operator
的后续operator
的?我们可以看到在前方Operator
定义中,有一个变量为Next operators
,这个变量记录了一个operator
的后继节点。在上图中,我们可以看到op1
有两个后继节点op2
和op3
,他们也是通过op1.next_oprators
得到的。
所以在图的执行中,有两个很重要的部分:
- 通过
op.layer
根据输入来进行计算,并得到当前层的输出 - 将当前层的输出顺利并且正确地传递到后继节点的输入当中。传递的路径是previous op.output to next op.input 这部分看起来是赋值,但在这个项目中已经变成了指针的拷贝会快很多。
计算图的执行顺序:
计算节点的执行是通过广度优先搜索来实现的,当然也有人说这就是一种拓扑排序的实现。
那什么是广度优先呢?
从图中我们可以看出,现在要执行的图是总共拥有7个op, 分别从op1到op7.
它们之间的前后关系如图中的箭头指向,例如op2
, op3
, op4
均为op1
的后继节点,换句话说,只有等到op1
执行结束之后,op2
, op3
, op4
才能开始执行,这三个节点的输入也都来自于op3
的输出,以下的顺序是上面这个图中的执行顺序。
- 从
graph.input_operator
的定义可以知道,op1
是开始执行的节点,因此在当前时刻将op1放入到执行队列中 - op1被从执行队列中取出执行,并得到op1的计算输出,存放到
op1.output_data
中;同时,根据op1.output_operators
定位到op1的后续三个节点,op2
,op3
和op4
, 随后将op1.output_data
拷贝到这三个后继节点的输入中 - 现在的执行队列存放了三个节点,分别为
op2
,op3
和op4
. 随后我们根据先进先出的顺序取出op2
开始执行,因为op2
没有后继节点,所以执行完毕后直接开始下一轮迭代 - 取出队列中的队头
op3
,在op3
执行完毕之后将op3.output_data
拷贝到op5.input_data
中,并将op5
入执行队列
......
随后的执行顺序如图所示,总之也是在一个节点执行完毕之后,通过current_op.output_operators
来寻找它的后继节点,并将当前节点的输出拷贝到后继节点的输入中
项目中计算图调度执行实现
项目中的计算图调度执行是对上方图例的一个还原,我们在这一节中通过分析代码的方式来看看怎么来做一个广度优先搜索(拓扑排序)。
寻找并拷贝上一级的输出到后继节点
void RuntimeGraph::ProbeNextLayer(
const std::shared_ptr<RuntimeOperator> ¤t_op,
std::deque<std::shared_ptr<RuntimeOperator>> &operator_queue,
std::vector<std::shared_ptr<Tensor<float>>> layer_output_datas) {
const auto &next_ops = current_op->output_operators;
std::vector<std::vector<std::shared_ptr<ftensor>>> next_input_datas_arr;
for (const auto &next_op : next_ops) {
const auto &next_rt_operator = next_op.second;
const auto &next_input_operands = next_rt_operator->input_operands;
// 找到后继节点
if (next_input_operands.find(current_op->name) != next_input_operands.end()) {
std::vector<std::shared_ptr<ftensor>> next_input_datas =
next_input_operands.at(current_op->name)->datas;
next_input_datas_arr.push_back(next_input_datas);
next_rt_operator->meet_num += 1;
//检查 next_rt_operator 是否需要当前操作符的输出数据作为输入(通过检查 next_input_operands 中是否包//含当前操作符的名字)。
//如果需要当前操作符的输出作为输入,那么就获取相应的输入数据(next_input_datas)。
//将 next_input_datas 存入 next_input_datas_arr,这是一个二维向量,用于存储所有下一层操作符的输入数//据。
//增加 next_rt_operator 的 meet_num,可能是用来追踪该操作符已满足的条件数目。
if (std::find(operator_queue.begin(), operator_queue.end(),
next_rt_operator) == operator_queue.end()) {
if (CheckOperatorReady(next_rt_operator)) {
operator_queue.push_back(next_rt_operator);
//代码检查 next_rt_operator 是否已经存在于 operator_queue 中:
//如果不存在于队列中,并且满足一定的就绪条件(通过 CheckOperatorReady 函数判断),则将 //next_rt_operator 添加到 operator_queue 中,以便后续处理。
//最后,调用 SetOpInputData 函数,将之前收集到的下一层操作符的输入数据与当前层的输出数据关联起来。
//如果ready了,那就把后继节点放入到队列之中
}
}
}
}
SetOpInputData(layer_output_datas, next_input_datas_arr);
}
void RuntimeGraph::ProbeNextLayer( const std::shared_ptr<RuntimeOperator> ¤t_op, std::deque<std::shared_ptr<RuntimeOperator>> &operator_queue, std::vector<std::shared_ptr<Tensor<float>>> layer_output_datas)
可以看到该函数有三个参数,分别为
current_op
,operator_queue
和layer_output_datas
,这三个参数的定义如下:
current_op
表示当前执行完毕的节点,operator_queue
就是在上一节中提到的节点执行队列,layer_output_datas
就是当前current_op
被执行后得到的对应输出。const auto &next_ops = current_op->output_operators; std::vector<std::vector<std::shared_ptr<ftensor>>> next_input_datas_arr;
得到当前节点
current_op
的后继节点,next_ops
std::vector<std::vector<std::shared_ptr<ftensor>>> next_input_datas_arr; for (const auto &next_op : next_ops) { const auto &next_rt_operator = next_op.second; // layer_output_datas 需要拷贝到next_input_operands的datas中 const auto &next_input_operands = next_rt_operator->input_operands;
这里对
next_ops
进行遍历,依次获得后继节点中的其中一个next_op
,随后我们得到next_op
的输入数据引用。我们要得到
next_op.input_operands
呢?我们就是要把current_op.output_data
拷贝到其中,完成current_op
输出到后继节点输入的拷贝。next_rt_operator->meet_num += 1; // 0 --> 1 if (std::find(operator_queue.begin(), operator_queue.end(),next_rt_operator) == operator_queue.end()) { if (CheckOperatorReady(next_rt_operator)) { // 把后继节点放入到执行队列 operator_queue.push_back(next_rt_operator); } }
可以看到其中的
meet_num
,对于一个节点next_operator
来说,如果meet_num
的数量等于它前驱的数量,说明它现在可以被放入到执行队列中。
bool RuntimeGraph::CheckOperatorReady(
const std::shared_ptr<RuntimeOperator> &op) {
CHECK(op != nullptr);
CHECK(op->meet_num <= op->input_operands.size());
if (op->meet_num == op->input_operands.size()) {
return true;
} else {
return false;
}
}
判断,如果meet_num == 输入节点数的话,那就代表之前节点的输出已经全部结束了,现在可以将他们放入到下一节点的输入里面了。
void RuntimeGraph::SetOpInputData(
std::vector<std::shared_ptr<Tensor<float>>> &src,
std::vector<std::vector<std::shared_ptr<Tensor<float>>>> &dest) {
CHECK(!src.empty() && !dest.empty()) << "Src or dest array is empty!";
for (uint32_t j = 0; j < src.size(); ++j) {
const auto &src_data = src.at(j)->data();
for (uint32_t i = 0; i < dest.size(); ++i) {
// CHECK(!dest.empty() && dest.at(i).size() == src.size());
dest.at(i).at(j)->set_data(src_data);
}
}
}
// 这是一个名为 SetOpInputData 的函数,可能是在运行时图中进行数据关联操作的一部分。
// 函数的参数包括:
// src:一个存储浮点类型张量(Tensor)共享指针的向量,表示要用于设置输入数据的源数据。
// dest:一个二维向量,其中每行表示一个操作符的输入数据,每列表示不同的源数据。
// 函数开始时,会使用断言(CHECK)来确保源数据 src 和目标数据 dest 都不为空,否则会产生错误信息。
// 然后,通过两个嵌套的循环遍历源数据 src 和目标数据 dest:
// 外部循环遍历源数据 src 中的每个元素。
// 内部循环遍历目标数据 dest 中的每一行(操作符的输入数据)。
// 在内部循环中,获取源数据 src 的具体数据(src_data)。
// 接着,将源数据 src_data 设置到目标数据中,这个过程通过 dest.at(i).at(j)->set_data(src_data) 来实现。这里 i 表示操作符的索引,j 表示源数据的索引。
总体是将
layer_output_datas
这个输出张量复制到next_input_datas_arr
这个张量数组(后继的输入)上,指针复制几乎无消耗。
广度优先搜索的执行顺序的实现
就是在咱们的Forward函数中:
我们首先来看它的两个参数,inputs
为模型的输入张量,debug
表示是否开启打印调试功能。
std::vector<std::shared_ptr<Tensor<float>>> RuntimeGraph::Forward(
const std::vector<std::shared_ptr<Tensor<float>>> &inputs, bool debug)
这里是Forward
方法中对图状态的检查,只有图状态为complete
的时候才能执行图的调度,图的complete
时间发生在:
- 图中的计算节点都初始化完毕
- 输入输入输出算子都准备好相关的空间之后
input_op
为整张图的开始执行节点,也就是模型的执行入口。
if (graph_state_ < GraphState::Complete) {
LOG(FATAL) << "Graph need be build!";
}
CHECK(graph_state_ == GraphState::Complete)
<< "Graph status error, current state is " << int(graph_state_);
std::shared_ptr<RuntimeOperator> input_op;
if (input_operators_maps_.find(input_name_) == input_operators_maps_.end()) {
LOG(FATAL) << "Can not find the input node: " << input_name_;
} else {
input_op = input_operators_maps_.at(input_name_);
}
将输入节点送入到执行队列中, 执行队列在这里的变量为operator_queue
,是一个deque
结构,方便从尾部插入,并从头部取出(完成先进先出)。
std::deque<std::shared_ptr<RuntimeOperator>> operator_queue;
operator_queue.push_back(input_op);
std::map<std::string, double> run_duration_infos;
while (!operator_queue.empty()) {
std::shared_ptr<RuntimeOperator> current_op = operator_queue.front();
operator_queue.pop_front();
if (!current_op || current_op == output_op) {
if (debug) {
LOG(INFO) << "Model Inference End";
}
break;
}
......
}
std::shared_ptr\<RuntimeOperator> current_op = operator_queue.front(); 从队列中获取一个被执行的节点,按照先进先出的顺序执行。
if (current_op == input_op) {
ProbeNextLayer(current_op, operator_queue, inputs);
}
这里分为两种情况,如果 当前节点是输入节点,就直接使用ProbeNextLayer
将输入拷贝到输入节点的下一层中(因为input节点不涉及到别的操作,所以可以直接赋值)。
std::string current_op_name = current_op->name;
if (!CheckOperatorReady(current_op)) {
if (operator_queue.empty()) {
// 当current op是最后一个节点的时候,说明它已经不能被ready 就是说既没有ready,又是最后一个节点,所以没有其他的节点,不能被meet_num+1了。
LOG(FATAL) << "Current operator is not ready!";
break;
} else {
// 如果不是最后一个节点,它还有被ready的可能性,只是可能由于什么原因放错了位置,那就放回到里面等待再meet_num+1再执行
operator_queue.push_back(current_op);
}
}
如果当前的节点(current_op
)不是输入节点(input_operator
)就对它是否准备好进行检查,检查的方式同样是使用CheckOperatorReady
检查当前节点的入度,如果入度等于0,那么当前的节点就允许被执行。
如果这个节点还没有ready
,就需要重新被放入到operator_queue
当中。
const std::vector<std::shared_ptr<RuntimeOperand>> &input_operand_datas
= current_op->input_operands_seq;
std::vector<std::shared_ptr<Tensor<float>>> layer_input_datas;
for (const auto &input_operand_data : input_operand_datas) {
for (const auto &input_data : input_operand_data->datas) {
layer_input_datas.push_back(input_data);
}
}
将当前op
中的input
移动到layer_input_datas
(全指针拷贝,损耗可以忽略不计),也就是从op->input_operands_seq
中到layer_input_datas中。
InferStatus status = current_op->layer->Forward(layer_input_datas, current_op->output_operands->datas);
在op自身ready
,且输入已经准备到layer_input_data
之后,开始执行算子,但是这节课中算子执行不讨论。
ProbeNextLayer(current_op, operator_queue, current_op->output_operands->datas);
在执行完毕后,对当前的算子current_op
的输出同步它下一级后继节点的输入中。
while (!operator_queue.empty())
当执行队列中的节点执行均执行完毕,且图中没有未执行的节点时就跳出循环。
CHECK(output_op->input_operands.size() == 1)
<< "The graph only support one path to the output node yet!";
const auto &output_op_input_operand = output_op->input_operands.begin();
const auto &output_operand = output_op_input_operand->second;
return output_operand->datas;
将output operator
的input operand
输出为最后的结果,换句话理解,输出节点的输入张量就是最后得到的结果。
最后可以看到对于resnet18的输出网络,实现执行分支再执行最下面的