目录
1 CNStream之前博客汇总
1.1 Pipeline中的EventBus
1.2 Pipeline中的内存池
1.3 Pipeline中的视频解码流程分析
1.4 Pipeline中的视频编码流程分析
1.5 Pipeline中的反射机制
1.6 Pipeline中的单例模式代码
1.7 怎么将CNStream适配到NVIDIA Jetson Orin
2 构建Pipeline整体流程
2.1 new Pipeline("TACLPipeline")
2.2 BuildPipelineByJSONFile
2.2.1 graph_config.ParseByJSONFile(config_file)
2.2.2 BuildPipeline(graph_config);
3 不同module之间怎么传递数据
4 怎么区分多路不同流----用stream_id
参考文献:
1 CNStream之前博客汇总
1.1 Pipeline中的EventBus
见我以前的博客:
EventBus的C++实现、代码分析-CSDN博客
1.2 Pipeline中的内存池
见我以前的博客:
C++内存池Memory Pool的高级实现、代码详解、CMake构建工程、应用实例-CSDN博客
1.3 Pipeline中的视频解码流程分析
见我以前的博客:
aclStream流处理多路并发Pipeline框架中 视频解码 代码调用流程整理、类的层次关系整理、回调函数赋值和调用流程整理_acl 多模型 多视频流处理-CSDN博客
1.4 Pipeline中的视频编码流程分析
见我以前的博客:
aclStream流处理多路并发Pipeline框架中VEncode Module代码调用流程整理、类的层次关系整理、回调函数赋值和调用流程整理-CSDN博客
1.5 Pipeline中的反射机制
见我以前的博客:
CNStream代码中C++反射机制的使用-CSDN博客
1.6 Pipeline中的单例模式代码
见我以前的博客:
C++单例模式代码实现与分析-CSDN博客
1.7 怎么将CNStream适配到NVIDIA Jetson Orin
见我以前的博客:
完整指南:CNStream流处理多路并发框架适配到NVIDIA Jetson Orin (一) 依赖库编译、第三方库编译安装_cnstream编译-CSDN博客
完整指南:CNStream流处理多路并发框架适配到NVIDIA Jetson Orin (二) 源码架构流程梳理、代码编写_cnstreamer-CSDN博客
完整指南:CNStream流处理多路并发框架适配到NVIDIA Jetson Orin (三) 代码编译、各种问题解决、代码修改-CSDN博客
完整指南:CNStream流处理多路并发框架适配到NVIDIA Jetson Orin (四) 运行、调试、各种问题解决_mpeg 并发处理-CSDN博客
2 构建Pipeline整体流程
直接从main函数开始看
int main(int argc, char** argv) {
gflags::ParseCommandLineFlags(&argc, &argv, false);
FLAGS_stderrthreshold = google::INFO;
FLAGS_colorlogtostderr = true;
FLAGS_log_dir = "logs";
FLAGS_max_log_size = 100; //最大日志大小(MB�? 如果设置�?将默认为1
FLAGS_stop_logging_if_full_disk = true;
path_check(FLAGS_log_dir);
google::InitGoogleLogging("taclstream");
LOGI(APP) << "TACLSTREAM VERSION:" << cnstream::VersionString();
int rt = general_init();
if (rt != ERROR_OK) {
LOGE(MAIN) << "general_init failed";
exit(rt);
}
pause();
LOGI(MAIN) << "TACLStream server exit after pause!";
google::ShutdownGoogleLogging();
return EXIT_SUCCESS;
}
然后看general_init()函数
int general_init() {
int ret = build_pipeline(FLAGS_config_fname);
if (ret != 0) {
LOGE(MAIN) << "build pipeline failed!";
return ret;
}
LOGI(MAIN) << "build pipeline succeed.";
std::string file_service_host;
int file_service_port{};
if (enable_file_service(file_service_host, file_service_port)) {
LOGI(MAIN) << "file service start succeed. port is " << file_service_port;
}else {
LOGW(MAIN) << "not support file service.";
}
int http_server_port = file_service_port;
//节点信息
g_node_info.node_name = "tcnstream";
g_node_info.node_ip = file_service_host;
g_node_info.node_port = http_server_port;
try{
httplib_service_start(http_server_port);
LOGI(MAIN) << "http server start succeed, server port is " << http_server_port << ".";
}
catch (GeneralException2& e) {
httplib_service_close();
LOGE(TCNSTREAM) << "init failed: " << e.err_code() << "," << e.err_str();
return e.err_code();
}
// 启动时检查本地是否有缓存任务文件
restart_session_from_local();
start_profiler();
// 启动保活线程
std::thread keepliveTh([]() {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(30));
post_keepalive_msg();
}
});
keepliveTh.detach();
return ERROR_OK;
}
然后看build_pipeline(FLAGS_config_fname);函数
int build_pipeline(string const& config) {
s_pipeline = std::shared_ptr<Pipeline>(new Pipeline("TACLPipeline"), [](Pipeline* pipeline) { pipeline->Stop(); });
if (s_pipeline == nullptr) {
return ERROR_ALLOC_MEMORY;
}
//build pipeline
if (!s_pipeline->BuildPipelineByJSONFile(config)) {
LOGE(SESSION_MGR) << "Build pipeline failed.";
return EXIT_FAILURE;
}
//message observer
static MsgObserver msg_observer(s_pipeline.get(), g_source_name);
s_pipeline->SetStreamMsgObserver(reinterpret_cast<cnstream::StreamMsgObserver*>(&msg_observer));
s_pipeline->GetEventBus()->AddBusWatch([&](const Event & event)->EventHandleFlag{
if(event.module_name == "TuringPipeline/decode/source" && EventType::EVENT_EOS == event.type){
LOGI(SESSION_MGR) << "recv bus event: " << static_cast<int8_t>(event.type) << "_" << event.module_name << "_" << event.stream_id << "_" << event.message;
{
std::string run_msg_str;
std::unique_lock<std::mutex> locker(s_session_locker);
auto it = s_task_sessions.find(event.stream_id);
if (it != s_task_sessions.end()) {
LOGE(SESSION_MGR) << event.stream_id << "stream interruption!!!!!!!!!!";
//如果是文件就直接停止,如果不是文件就进入重连机制
if(it->second->get_video_type() == EFILE)
{
it->second->stop(g_source_name);
run_msg_str = it->second->post_end_mark();
LOGI(SESSION_MGR) << "run msg is: " << run_msg_str;
//puglish msg
cnstream::ActiveMQ* activemq_run_msg = dynamic_cast<cnstream::ActiveMQ*>(s_pipeline->GetModule(g_activemq_name));
if (activemq_run_msg != nullptr && run_msg_str != "") {
activemq_run_msg->PublishEvent(run_msg_str);
}
s_task_sessions.erase(it->first);
}
else if(it->second->get_s_state_() != S_USER_CLOSE)
{
it->second->set_s_state_(S_EXCEPTION_CLOSE);
}
}
}
}
});
//start pipeline
if (!s_pipeline->Start()) {
LOGE(SESSION_MGR) << "Pipeline start failed.";
return EXIT_FAILURE;
}
return ERROR_OK;
}
这里面重点就看3个函数
- new Pipeline("TACLPipeline")、
- BuildPipelineByJSONFile、
- s_pipeline->Start()。
2.1 new Pipeline("TACLPipeline")
其中构造函数如下
Pipeline::Pipeline(const std::string& name) : name_(name) {
// stream message handle thread
exit_msg_loop_ = false;
smsg_thread_ = std::thread(&Pipeline::StreamMsgHandleFunc, this);
event_bus_.reset(new (std::nothrow) EventBus());
LOGF_IF(CORE, nullptr == event_bus_) << "Pipeline::Pipeline() failed to alloc EventBus";
GetEventBus()->AddBusWatch(std::bind(&Pipeline::DefaultBusWatch, this, std::placeholders::_1));
idxManager_.reset(new (std::nothrow) IdxManager());
LOGF_IF(CORE, nullptr == idxManager_) << "Pipeline::Pipeline() failed to alloc IdxManager";
graph_.reset(new (std::nothrow) CNGraph<NodeContext>());
LOGF_IF(CORE, nullptr == graph_) << "Pipeline::Pipeline() failed to alloc CNGraph";
}
前面两个都是跟eventbus相关的,然后IdxManager是用来管理stream_id的,然后最下面的CNGraph就是图,这里创建图的时候传入的模板类型是
/**
* @brief The node context used by pipeline.
*/
struct NodeContext {
std::shared_ptr<Module> module;
std::shared_ptr<Connector> connector;
uint64_t parent_nodes_mask = 0;
uint64_t route_mask = 0; // for head nodes
// for gets node instance by a module, see Module::context_;
std::weak_ptr<CNGraph<NodeContext>::CNNode> node;
};
这里面的Connector就是用来连接不同module的,
class Connector : private NonCopyable {
public:
/**
* @brief Connector constructor.
* @param
* [conveyor_count]: the conveyor num of this connector.
* [conveyor_capacity]: the maximum buffer number of a conveyor.
*/
explicit Connector(const size_t conveyor_count, size_t conveyor_capacity = 20);
~Connector();
const size_t GetConveyorCount() const;
size_t GetConveyorCapacity() const;
bool IsConveyorFull(int conveyor_idx) const;
bool IsConveyorEmpty(int conveyor_idx) const;
size_t GetConveyorSize(int conveyor_idx) const;
uint64_t GetFailTime(int conveyor_idx) const;
CNFrameInfoPtr PopDataBufferFromConveyor(int conveyor_idx);
bool PushDataBufferToConveyor(int conveyor_idx, CNFrameInfoPtr data);
void Start();
void Stop();
bool IsStopped();
void EmptyDataQueue();
private:
Conveyor* GetConveyorByIdx(int idx) const;
Conveyor* GetConveyor(int conveyor_idx) const;
std::vector<Conveyor*> conveyors_;
size_t conveyor_capacity_ = 20;
std::vector<uint64_t> fail_times_;
std::atomic<bool> stop_{false};
}; // class Connector
Connector里面有个conveyors_成员,这个conveyors_可以看到是个vector,那就说明有多个,为什么有多个,是因为比如我们有很多路流,那么两个module之间连接时要有多个conveyor,每个conveyor负责一路流的传输,然后这里的conveyor_capacity_就是指队列的长度不能超过20,
/**
* @brief Conveyor is used to transmit data between two modules.
*
* Conveyor belongs to Connector.
* Each Connect could have several conveyors which depends on the paramllelism of each module.
*
* Conveyor has one buffer queue for transmitting data from one module to another.
* The upstream node module will push data to buffer queue, and the downstream node will pop data from buffer queue.
*
* The capacity of buffer queue could be set in configuration json file (see README for more information of
* configuration json file). If there is no element in buffer queue, the downstream node will wait to pop and
* be blocked. On contrary, if the queue is full, the upstream node will wait to push and be blocked.
*/
class Conveyor : private NonCopyable {
public:
explicit Conveyor(size_t max_size);
~Conveyor() = default;
bool PushDataBuffer(CNFrameInfoPtr data);
CNFrameInfoPtr PopDataBuffer();
std::vector<CNFrameInfoPtr> PopAllDataBuffer();
uint32_t GetBufferSize();
uint64_t GetFailTime();
#ifdef UNIT_TEST
public: // NOLINT
#else
private: // NOLINT
#endif
private:
std::queue<CNFrameInfoPtr> dataq_;
size_t max_size_;
uint64_t fail_time_ = 0;
std::mutex data_mutex_;
std::condition_variable notempty_cond_;
const std::chrono::milliseconds rel_time_{20};
}; // class Conveyor
这个Conveyor类就是负责具体的传输数据的。
关于Pipeline的构造函数就先看这么多,图的东西在后面的两个函数中再看。
2.2 BuildPipelineByJSONFile
inline bool Pipeline::BuildPipelineByJSONFile(const std::string& config_file) {
CNGraphConfig graph_config;
if (!graph_config.ParseByJSONFile(config_file)) {
LOGE(CORE) << "Parse graph config file failed.";
return false;
}
return BuildPipeline(graph_config);
}
2.2.1 graph_config.ParseByJSONFile(config_file)
这里首先是解析json文件,
bool CNConfigBase::ParseByJSONFile(const std::string& jfile) {
printf("jfile is %s\n", jfile.c_str());
std::ifstream ifs(jfile);
if (!ifs.is_open()) {
LOGE(CORE) << "Config file open failed :" << jfile;
return false;
}
std::string jstr((std::istreambuf_iterator<char>(ifs)), std::istreambuf_iterator<char>());
ifs.close();
config_root_dir = GetPathDir(jfile);
if (!ParseByJSONStr(jstr)) {
return false;
}
return true;
}
然后看
bool CNGraphConfig::ParseByJSONStr(const std::string& json_str) {
rapidjson::Document doc;
if (doc.Parse<rapidjson::kParseCommentsFlag>(json_str.c_str()).HasParseError()) {
LOGE(CORE) << "Parse graph configuration failed. Error code [" << std::to_string(doc.GetParseError()) << "]"
<< " Offset [" << std::to_string(doc.GetErrorOffset()) << "]. ";
return false;
}
// traversing config items
for (rapidjson::Document::ConstMemberIterator iter = doc.MemberBegin(); iter != doc.MemberEnd(); ++iter) {
rapidjson::StringBuffer sbuf;
rapidjson::Writer<rapidjson::StringBuffer> jwriter(sbuf);
iter->value.Accept(jwriter);
std::string item_name = iter->name.GetString();
std::string item_value = sbuf.GetString();
if (IsProfilerItem(item_name)) {
// parse if profiler config
if (!profiler_config.ParseByJSONStr(item_value)) {
LOGE(CORE) << "Parse profiler config failed.";
return false;
}
} else if (IsSubgraphItem(item_name)) {
// parse if subgraph config
CNSubgraphConfig subgraph_config;
subgraph_config.name = item_name;
if (!subgraph_config.ParseByJSONStr(item_value)) {
LOGE(CORE) << "Parse subgraph config failed. Subgraph name : [" + item_name + "].";
return false;
}
// correct the relative path of subgraph configuration file.
subgraph_config.config_path = config_root_dir + subgraph_config.config_path;
subgraph_configs.push_back(std::move(subgraph_config));
} else {
// parse module config and insert graph nodes
CNModuleConfig mconf;
mconf.config_root_dir = config_root_dir;
mconf.name = item_name;
if (!mconf.ParseByJSONStr(item_value)) {
LOGE(CORE) << "Parse module config failed. Module name : [" << mconf.name << "]";
return false;
}
module_configs.push_back(std::move(mconf));
}
} // for json items
return true;
}
结合者config文件看
{
"profiler_config" : {
"enable_profiling" : false,
"enable_tracing" : false
},
"subgraph:app_config":{
"config_path" : "configs/app_config.json"
},
"subgraph:decode" : {
"config_path" : "configs/decode_config.json",
"next_modules" : ["subgraph:object_detection"]
},
"subgraph:object_detection" : {
"config_path" : "configs/yolov5_nv_pcb_df.json",
"next_modules" : ["subgraph:object_tracking"]
},
"subgraph:object_tracking" : {
"config_path" : "configs/object_track_sort.json",
"next_modules" : ["subgraph:business"]
},
"subgraph:business" : {
"config_path" : "configs/business.json",
"next_modules" : ["subgraph:image_store","subgraph:sinker"]
},
"subgraph:image_store" : {
"config_path" : "configs/image_store.json",
"next_modules" : ["subgraph:activemq", "subgraph:mysql"]
},
"subgraph:activemq" : {
"config_path" : "configs/sinker_configs/activemq.json"
},
"subgraph:mysql" : {
"config_path" : "configs/sinker_configs/mysql.json"
},
"subgraph:sinker" : {
"config_path" : "configs/sinker_configs/rtsp.json"
},
"subgraph:preview_decode" : {
"config_path" : "configs/preview_decode_config.json",
"next_modules" : ["subgraph:preview_image_store"]
},
"subgraph:preview_image_store" : {
"config_path" : "configs/image_store.json"
}
}
然后调试看到,他就是把左右的都保存到了subgraph_configs里面。
然后就开始调用BuildPipeline(graph_config);了
2.2.2 BuildPipeline(graph_config);
bool Pipeline::BuildPipeline(const CNGraphConfig& graph_config) {
auto t = graph_config;
t.name = GetName();
if (!graph_->Init(t)) {
LOGE(CORE) << "Init graph failed.";
return false;
}
// create modules by config
std::vector<std::shared_ptr<Module>> modules; // used to init profiler
if (!CreateModules(&modules)) {
LOGE(CORE) << "Create modules failed.";
return false;
}
// generate parant mask for all nodes and route mask for head nodes.
GenerateModulesMask();
// This call must after GenerateModulesMask called,
profiler_.reset(
new PipelineProfiler(graph_->GetConfig().profiler_config, GetName(), modules, GetSortedModuleNames()));
// create connectors for all nodes beside head nodes.
// This call must after GenerateModulesMask called,
// then we can determine witch are the head nodes.
return CreateConnectors();
}
然后先看init函数
template <typename T>
bool CNGraph<T>::Init() {
Clear();
// subgraph analysis loop detect
static thread_local std::set<std::string> subgraph_paths;
if (nullptr == parent_graph_) {
// root graph, init subgraph_paths
subgraph_paths.clear();
}
dag_algorithm_.Reserve(config_.module_configs.size() + config_.subgraph_configs.size());
// insert vertices
for (const auto& module_config : config_.module_configs)
if (!AddVertex(module_config)) return false;
for (const auto& subgraph_config : config_.subgraph_configs) {
if (!IsSubgraphItem(subgraph_config.name)) {
// check subgraph name prefix, when subgraph_config.name is not parsed by CNGraphConfig::Parsexxxxx,
// maybe will set an wrong name by user.
LOGE(CORE) << "Subgraph's name must set with an prefix [" << std::string(kSubgraphConfigPrefix)
<< "], wrong name : " << subgraph_config.name;
return false;
}
if (parent_graph_) {
// Current graph is a subgraph of other graphs
auto real_path = __help_functions__::GetRealPath(subgraph_config.config_path);
if (real_path.empty()) return false;
if (!subgraph_paths.insert(real_path).second) {
LOGE(CORE) << GetLogPrefix() << "A graph analysis loop was detected when parsing the subgraph named ["
<< subgraph_config.name + "].";
return false;
}
}
if (!AddVertex(subgraph_config)) return false;
}
if (!InitEdges()) return false;
FindHeadsAndTails();
// check circle
auto topo_result = dag_algorithm_.TopoSort();
if (topo_result.second.size()) {
LOGE(CORE) << GetLogPrefix() + "Ring detected.";
return false;
}
return true;
}
其中
dag_algorithm_.Reserve(config_.module_configs.size() + config_.subgraph_configs.size());
这里是图的顶点的个数,也就是config里面所有的module和subgraph这两个配置项加起来的个数。
然后直接到了if (!AddVertex(subgraph_config)) return false;
template <typename T>
bool CNGraph<T>::AddVertex(const CNSubgraphConfig& config) {
if (!__help_functions__::IsNodeNameValid(config.name)) {
LOGE(CORE) << GetLogPrefix() << "Subgraph[" << config.name << "] name invalid. "
<< "The name of modules or subgraphs can not contain slashes or risks";
return false;
}
CNGraphConfig graph_config;
if (!graph_config.ParseByJSONFile(config.config_path)) {
LOGE(CORE) << GetLogPrefix() << "Parse subgraph config file failed. subgraph name: " << config.name;
return false;
}
graph_config.name = __help_functions__::NameIgnoreSubgraphPrefix(config.name);
auto subgraph = std::make_shared<CNGraph<T>>(graph_config);
subgraph->parent_graph_ = this;
if (!subgraph->Init()) {
LOGE(CORE) << GetLogPrefix() + "Init subgraph[" + config.name + "] failed.";
return false;
}
int vertex_id = dag_algorithm_.AddVertex();
vertex_map_to_node_name_.push_back(config.name);
if (!subgraph_node_map_.insert(std::make_pair(config.name, std::make_tuple(vertex_id, config, subgraph))).second) {
LOGE(CORE) << GetLogPrefix() + "Subgraph[" + config.name + "] name duplicated.";
return false;
}
return true;
}
3 不同module之间怎么传递数据
从前面的构建Pipeline的整体流程也可以看出来,在Pipeline的构造函数中最下面
Pipeline::Pipeline(const std::string& name) : name_(name) {
// stream message handle thread
exit_msg_loop_ = false;
smsg_thread_ = std::thread(&Pipeline::StreamMsgHandleFunc, this);
event_bus_.reset(new (std::nothrow) EventBus());
LOGF_IF(CORE, nullptr == event_bus_) << "Pipeline::Pipeline() failed to alloc EventBus";
GetEventBus()->AddBusWatch(std::bind(&Pipeline::DefaultBusWatch, this, std::placeholders::_1));
idxManager_.reset(new (std::nothrow) IdxManager());
LOGF_IF(CORE, nullptr == idxManager_) << "Pipeline::Pipeline() failed to alloc IdxManager";
graph_.reset(new (std::nothrow) CNGraph<NodeContext>());
LOGF_IF(CORE, nullptr == graph_) << "Pipeline::Pipeline() failed to alloc CNGraph";
}
前面两个都是跟eventbus相关的,然后IdxManager是用来管理stream_id的,然后最下面的CNGraph就是图,这里创建图的时候传入的模板类型是
/**
* @brief The node context used by pipeline.
*/
struct NodeContext {
std::shared_ptr<Module> module;
std::shared_ptr<Connector> connector;
uint64_t parent_nodes_mask = 0;
uint64_t route_mask = 0; // for head nodes
// for gets node instance by a module, see Module::context_;
std::weak_ptr<CNGraph<NodeContext>::CNNode> node;
};
这里面的Connector就是用来连接不同module的,
class Connector : private NonCopyable {
public:
/**
* @brief Connector constructor.
* @param
* [conveyor_count]: the conveyor num of this connector.
* [conveyor_capacity]: the maximum buffer number of a conveyor.
*/
explicit Connector(const size_t conveyor_count, size_t conveyor_capacity = 20);
~Connector();
const size_t GetConveyorCount() const;
size_t GetConveyorCapacity() const;
bool IsConveyorFull(int conveyor_idx) const;
bool IsConveyorEmpty(int conveyor_idx) const;
size_t GetConveyorSize(int conveyor_idx) const;
uint64_t GetFailTime(int conveyor_idx) const;
CNFrameInfoPtr PopDataBufferFromConveyor(int conveyor_idx);
bool PushDataBufferToConveyor(int conveyor_idx, CNFrameInfoPtr data);
void Start();
void Stop();
bool IsStopped();
void EmptyDataQueue();
private:
Conveyor* GetConveyorByIdx(int idx) const;
Conveyor* GetConveyor(int conveyor_idx) const;
std::vector<Conveyor*> conveyors_;
size_t conveyor_capacity_ = 20;
std::vector<uint64_t> fail_times_;
std::atomic<bool> stop_{false};
}; // class Connector
Connector里面有个conveyors_成员,这个conveyors_可以看到是个vector,那就说明有多个,为什么有多个,是因为比如我们有很多路流,那么两个module之间连接时要有多个conveyor,每个conveyor负责一路流的传输,然后这里的conveyor_capacity_就是指队列的长度不能超过20,
/**
* @brief Conveyor is used to transmit data between two modules.
*
* Conveyor belongs to Connector.
* Each Connect could have several conveyors which depends on the paramllelism of each module.
*
* Conveyor has one buffer queue for transmitting data from one module to another.
* The upstream node module will push data to buffer queue, and the downstream node will pop data from buffer queue.
*
* The capacity of buffer queue could be set in configuration json file (see README for more information of
* configuration json file). If there is no element in buffer queue, the downstream node will wait to pop and
* be blocked. On contrary, if the queue is full, the upstream node will wait to push and be blocked.
*/
class Conveyor : private NonCopyable {
public:
explicit Conveyor(size_t max_size);
~Conveyor() = default;
bool PushDataBuffer(CNFrameInfoPtr data);
CNFrameInfoPtr PopDataBuffer();
std::vector<CNFrameInfoPtr> PopAllDataBuffer();
uint32_t GetBufferSize();
uint64_t GetFailTime();
#ifdef UNIT_TEST
public: // NOLINT
#else
private: // NOLINT
#endif
private:
std::queue<CNFrameInfoPtr> dataq_;
size_t max_size_;
uint64_t fail_time_ = 0;
std::mutex data_mutex_;
std::condition_variable notempty_cond_;
const std::chrono::milliseconds rel_time_{20};
}; // class Conveyor
这个Conveyor类就是负责具体的传输数据的。
那么实际传输数据的代码在哪里呢,每个module类里面的TransmitData会调用Pipeline里面的TransmitData函数,憨厚
void Pipeline::TransmitData(NodeContext* context, const std::shared_ptr<CNFrameInfo>& data) {
//std::chrono::high_resolution_clock::time_point tnow = std::chrono::high_resolution_clock::now();
if (data->IsInvalid()) {
OnDataInvalid(context, data);
return;
}
if (!context->parent_nodes_mask) {
// root node
// set mask to 1 for never touched modules, for case which has multiple source modules.
data->SetModulesMask(all_modules_mask_ ^ context->route_mask);
}
if (data->IsEos()) {
OnEos(context, data);
} else {
OnProcessEnd(context, data);
if (IsStreamRemoved(data->stream_id)) return;
}
auto node = context->node.lock();
auto module = context->module;
const uint64_t cur_mask = data->MarkPassed(module.get());
const bool passed_by_all_modules = PassedByAllModules(cur_mask);
if (passed_by_all_modules) {
OnPassThrough(data);
return;
}
// std::chrono::high_resolution_clock::time_point tpost = std::chrono::high_resolution_clock::now();
//std::cout << "<<<<<< 44444444444 TransmitData up time = " << std::chrono::duration_cast<std::chrono::duration<double>>(tpost - tnow).count() * 1000 << " ms" << std::endl;
// std::chrono::high_resolution_clock::time_point tnow1 = std::chrono::high_resolution_clock::now();
// transmit to next nodes
for (auto next_node : node->GetNext()) {
if (!PassedByAllParentNodes(&next_node->data, cur_mask)) continue;
auto next_module = next_node->data.module;
auto connector = next_node->data.connector;
// push data to conveyor only after data passed by all parent nodes.
if (IsProfilingEnabled() && !data->IsEos())
next_module->GetProfiler()->RecordProcessStart(kINPUT_PROFILER_NAME,
std::make_pair(data->stream_id, data->timestamp));
const int conveyor_idx = data->GetStreamIndex() % connector->GetConveyorCount();
int retry_cnt = 1;
while (!connector->IsStopped() && connector->PushDataBufferToConveyor(conveyor_idx, data) == false) {
std::this_thread::sleep_for(std::chrono::milliseconds(5 * retry_cnt));
retry_cnt = std::min(retry_cnt * 2, 10);
} // while try push
} // loop next nodes
//std::chrono::high_resolution_clock::time_point tpost1 = std::chrono::high_resolution_clock::now();
// std::cout << "<<<<<< 555555555555 TransmitData down time = " << std::chrono::duration_cast<std::chrono::duration<double>>(tpost1 - tnow1).count() * 1000 << " ms" << std::endl;
}
然后在
bool Connector::PushDataBufferToConveyor(int conveyor_idx, CNFrameInfoPtr data) {
return GetConveyor(conveyor_idx)->PushDataBuffer(data);
}
然后最终是通过Conveyor类进行传输的,在这里push到队列中
bool Conveyor::PushDataBuffer(CNFrameInfoPtr data) {
std::unique_lock<std::mutex> lk(data_mutex_);
if (dataq_.size() < max_size_) {
dataq_.push(data);
notempty_cond_.notify_one();
fail_time_ = 0;
return true;
}
fail_time_ += 1;
return false;
}
每个module在这里从队列总pop出数据进行处理
void Pipeline::TaskLoop(NodeContext* context, uint32_t conveyor_idx) {
auto module = context->module;
auto connector = context->connector;
auto node_name = module->GetName();
// process loop
while (1) {
std::shared_ptr<CNFrameInfo> data = nullptr;
// pull data from conveyor
while (!connector->IsStopped() && data == nullptr) data = connector->PopDataBufferFromConveyor(conveyor_idx);
if (connector->IsStopped()) break;
if (data == nullptr) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
OnProcessStart(context, data);
int ret = module->DoProcess(data);
if (ret < 0) OnProcessFailed(context, data, ret);
std::this_thread::yield();
} // while process loop
}
4 怎么区分多路不同流----用stream_id
根据stream_id区分多路流,具体是
ret = AddSourceForFile(source, stream_id, filename, FLAGS_src_frame_rate);
cnstream::CreateSource(source, stream_id, param);
std::make_shared<FileHandler>(module, stream_id, param);
FileHandler::FileHandler(DataSource *module, const std::string &stream_id, const FileSourceParam ¶m)
: SourceHandler(module, stream_id) {
impl_ = new (std::nothrow) FileHandlerImpl(module, param, this);
}
stream_index_ = module_->GetStreamIndex(stream_id_);
if (container_) return container_->GetStreamIndex(stream_id);
return idxManager_->GetStreamIndex(stream_id);
然后
uint32_t IdxManager::GetStreamIndex(const std::string& stream_id) {
std::lock_guard<std::mutex> guard(id_lock);
auto search = stream_idx_map.find(stream_id);
if (search != stream_idx_map.end()) {
return search->second;
}
for (uint32_t i = 0; i < GetMaxStreamNumber(); i++) {
if (!stream_bitset[i]) {
stream_bitset.set(i);
stream_idx_map[stream_id] = i;
return i;
}
}
return kInvalidStreamIdx;
}
每个Pipeline都有一个IdxManager类成员std::unique_ptr<IdxManager> idxManager_ = nullptr;,用来管理stream_id的
/**
* @brief ModuleId&StreamIdx manager for pipeline. Allocates and deallocates id for Pipeline modules & streams.
*/
class IdxManager {
public:
IdxManager() = default;
IdxManager(const IdxManager &) = delete;
IdxManager &operator=(const IdxManager &) = delete;
uint32_t GetStreamIndex(const std::string &stream_id);
void ReturnStreamIndex(const std::string &stream_id);
size_t GetModuleIdx();
void ReturnModuleIdx(size_t id_);
private:
std::mutex id_lock;
std::map<std::string, uint32_t> stream_idx_map;
std::bitset<kMaxStreamNum> stream_bitset;
uint64_t module_id_mask_ = 0;
}; // class IdxManager
5 std::shared_ptr<CNFrameInfo> data里面的stream_id是哪里来的
然后
void FileHandlerImpl::OnDecodeFrame(BufSurfWrapperPtr wrapper) {
if (frame_count_++ % param_.interval != 0) {
// LOGI(SOURCE) << "frames are discarded" << frame_count_;
return; // discard frames
}
std::shared_ptr<CNFrameInfo> data = this->CreateFrameInfo();
if (!data) {
LOGW(SOURCE) << "[FileHandlerImpl] OnDecodeFrame(): failed to create FrameInfo.";
return;
}
data->timestamp = wrapper->GetPts();
if (!wrapper->GetBufSurface()) {
data->flags = static_cast<size_t>(CNFrameFlag::CN_FRAME_FLAG_INVALID);
this->SendFrameInfo(data);
return;
}
#if 0 //调试代码,记得修改。
static int debug_count = 0;
BufSurface* src_buf = wrapper->GetBufSurface();
std::vector<uint8_t> indata(src_buf->surface_list[0].data_size);//调试代码,记得删除。
MemcpyHD(indata.data(), BUF_MEMORY_NORMAL, src_buf->surface_list[0].data_ptr, BUF_MEMORY_NORMAL, src_buf->surface_list[0].data_size);//调试代码,记得删除。
cv::Mat yuv_mat(src_buf->surface_list[0].height * 3 / 2, src_buf->surface_list[0].width, CV_8UC1, indata.data());//调试代码,记得修改。
// std::vector<uint8_t> indata(1920*1080*1.5);//调试代码,记得删除。
// MemcpyHD(indata.data(), BUF_MEMORY_NORMAL, src_buf->surface_list[0].data_ptr, BUF_MEMORY_NORMAL, 1920*1080*1.5);//调试代码,记得删除。
// cv::Mat yuv_mat(1080 * 3 / 2, 1920, CV_8UC1, indata.data());//调试代码,记得修改。
cv::Mat bgr_mat;
cv::cvtColor(yuv_mat, bgr_mat, cv::COLOR_YUV2BGR_NV12); //调试代码,记得修改,这是原代码
//cv::cvtColor(yuv_mat, bgr_mat, cv::COLOR_YUV2BGR_I420); //调试代码,记得修改,不对,
//cv::imwrite("OnDecodeFrame.jpg", bgr_mat);
debug_count = debug_count + 1;
char decode_name[20] = {};
if((debug_count < 3000) && (debug_count % 2 == 0))
{
sprintf(decode_name, "OnDecodeFrame_%d.jpg", debug_count);
//cv::imwrite(decode_name, bgr_mat);
}
#endif
//std::chrono::high_resolution_clock::time_point tnow = std::chrono::high_resolution_clock::now();
int ret = SourceRender::Process(data, std::move(wrapper), frame_id_++, param_);
if (ret < 0) {
LOGE(SOURCE) << "[FileHandlerImpl] OnDecodeFrame(): [" << stream_id_ << "]: Render frame failed";
return;
}
//std::chrono::high_resolution_clock::time_point tpost = std::chrono::high_resolution_clock::now();
//std::cout << "<<<<<< 0000000000000000000000000000000SourceRender::Process time = " << std::chrono::duration_cast<std::chrono::duration<double>>(tpost - tnow).count() * 1000 << " ms" << std::endl;
//std::chrono::high_resolution_clock::time_point tnow1 = std::chrono::high_resolution_clock::now();
this->SendFrameInfo(data);
// std::chrono::high_resolution_clock::time_point tpost1 = std::chrono::high_resolution_clock::now();
// std::cout << "<<<<<< 111111111111111111this->SendFrameInfo time = " << std::chrono::duration_cast<std::chrono::duration<double>>(tpost1 - tnow1).count() * 1000 << " ms" << std::endl;
}
然后
std::shared_ptr<CNFrameInfo> CreateFrameInfo(bool eos = false) {
std::shared_ptr<CNFrameInfo> data;
int retry_cnt = 1;
while (1) {
data = handler_->CreateFrameInfo(eos);
if (data != nullptr) break;
if (CreateInterrupt()) break;
std::this_thread::sleep_for(std::chrono::microseconds(5 * retry_cnt));
retry_cnt = std::min(retry_cnt * 2, 20);
}
if (!eos) {
auto dataframe = std::make_shared<CNDataFrame>();
if (!dataframe) {
return nullptr;
}
auto inferobjs = std::make_shared<CNInferObjs>();
if (!inferobjs) {
return nullptr;
}
data->collection.Add(kCNDataFrameTag, dataframe);
data->collection.Add(kCNInferObjsTag, inferobjs);
}
return data;
}
最终是
std::shared_ptr<CNFrameInfo> CNFrameInfo::Create(const std::string &stream_id, bool eos,
std::shared_ptr<CNFrameInfo> payload) {
if (stream_id == "") {
LOGE(CORE) << "CNFrameInfo::Create() stream_id is empty string.";
return nullptr;
}
std::shared_ptr<CNFrameInfo> ptr(new (std::nothrow) CNFrameInfo());
if (!ptr) {
LOGE(CORE) << "CNFrameInfo::Create() new CNFrameInfo failed.";
return nullptr;
}
ptr->stream_id = stream_id;
ptr->payload = payload;
if (eos) {
ptr->flags |= static_cast<size_t>(cnstream::CNFrameFlag::CN_FRAME_FLAG_EOS);
if (!ptr->payload) {
std::lock_guard<std::mutex> guard(s_eos_lock_);
s_stream_eos_map_[stream_id] = false;
}
return ptr;
}
return ptr;
}
这样data里面就有stream_id了。
参考文献:
手写EventBus(观察者模式、源码阅读、反射) - 简书
CNStream流处理多路并发Pipeline框架整体介绍-CSDN博客
寒武纪CNStream用户手册 — 寒武纪CNStream用户手册 6.2.0 文档
GitHub - Cambricon/CNStream: CNStream is a streaming framework for building Cambricon machine learning pipelines http://forum.cambricon.com https://gitee.com/SolutionSDK/CNStream
CNStream/docs/release_document/latest/Cambricon-CNStream-User-Guide-CN-vlatest.pdf at master · Cambricon/CNStream · GitHub
C++内存池Memory Pool的高级实现、代码详解、CMake构建工程、应用实例-CSDN博客
aclStream流处理多路并发Pipeline框架中VEncode Module代码调用流程整理、类的层次关系整理、回调函数赋值和调用流程整理-CSDN博客
CNStream代码中C++反射机制的使用-CSDN博客
aclStream流处理多路并发Pipeline框架中 视频解码 代码调用流程整理、类的层次关系整理、回调函数赋值和调用流程整理-CSDN博客
EventBus的C++实现、代码分析-CSDN博客