CNStream流处理多路并发Pipeline框架相关问题整理:Pipeline整体流程、数据传输、多路并发

news2024/12/23 17:07:21

目录

1 CNStream之前博客汇总

1.1 Pipeline中的EventBus

1.2 Pipeline中的内存池

1.3 Pipeline中的视频解码流程分析

1.4 Pipeline中的视频编码流程分析

1.5 Pipeline中的反射机制

1.6 Pipeline中的单例模式代码

1.7 怎么将CNStream适配到NVIDIA Jetson Orin

2 构建Pipeline整体流程

2.1 new Pipeline("TACLPipeline")

2.2 BuildPipelineByJSONFile

2.2.1 graph_config.ParseByJSONFile(config_file)

2.2.2  BuildPipeline(graph_config);

3 不同module之间怎么传递数据

4 怎么区分多路不同流----用stream_id

5 std::shared_ptr data里面的stream_id是哪里来的

参考文献:


1 CNStream之前博客汇总

1.1 Pipeline中的EventBus

见我以前的博客: 

EventBus的C++实现、代码分析-CSDN博客

1.2 Pipeline中的内存池

见我以前的博客: 

C++内存池Memory Pool的高级实现、代码详解、CMake构建工程、应用实例-CSDN博客

1.3 Pipeline中的视频解码流程分析

见我以前的博客: 

aclStream流处理多路并发Pipeline框架中 视频解码 代码调用流程整理、类的层次关系整理、回调函数赋值和调用流程整理_acl 多模型 多视频流处理-CSDN博客

1.4 Pipeline中的视频编码流程分析

见我以前的博客:  

aclStream流处理多路并发Pipeline框架中VEncode Module代码调用流程整理、类的层次关系整理、回调函数赋值和调用流程整理-CSDN博客

1.5 Pipeline中的反射机制

见我以前的博客:

CNStream代码中C++反射机制的使用-CSDN博客

1.6 Pipeline中的单例模式代码

见我以前的博客:

C++单例模式代码实现与分析-CSDN博客

1.7 怎么将CNStream适配到NVIDIA Jetson Orin

见我以前的博客: 

完整指南:CNStream流处理多路并发框架适配到NVIDIA Jetson Orin (一) 依赖库编译、第三方库编译安装_cnstream编译-CSDN博客

完整指南:CNStream流处理多路并发框架适配到NVIDIA Jetson Orin (二) 源码架构流程梳理、代码编写_cnstreamer-CSDN博客

完整指南:CNStream流处理多路并发框架适配到NVIDIA Jetson Orin (三) 代码编译、各种问题解决、代码修改-CSDN博客

完整指南:CNStream流处理多路并发框架适配到NVIDIA Jetson Orin (四) 运行、调试、各种问题解决_mpeg 并发处理-CSDN博客

2 构建Pipeline整体流程

直接从main函数开始看


int main(int argc, char** argv) {
    gflags::ParseCommandLineFlags(&argc, &argv, false);

    FLAGS_stderrthreshold = google::INFO;
    FLAGS_colorlogtostderr = true;
    FLAGS_log_dir = "logs";
    FLAGS_max_log_size = 100; //最大日志大小(MB�? 如果设置�?将默认为1
    FLAGS_stop_logging_if_full_disk = true;

    path_check(FLAGS_log_dir);

    google::InitGoogleLogging("taclstream");

    LOGI(APP) << "TACLSTREAM VERSION:" << cnstream::VersionString();

    int rt = general_init();
    if (rt != ERROR_OK) {
        LOGE(MAIN) << "general_init failed";
        exit(rt);
    }

    pause();

    LOGI(MAIN) << "TACLStream server exit after pause!";
    google::ShutdownGoogleLogging();

    return EXIT_SUCCESS;
}

然后看general_init()函数


int general_init() {
    int ret = build_pipeline(FLAGS_config_fname);
    if (ret != 0) {
        LOGE(MAIN) << "build pipeline failed!";
        return ret;
    }
    LOGI(MAIN) << "build pipeline succeed.";

    std::string file_service_host;
    int file_service_port{};
    if (enable_file_service(file_service_host, file_service_port)) {
        LOGI(MAIN) << "file service start succeed. port is " << file_service_port;
    }else {
        LOGW(MAIN) << "not support file service.";
    }

    int http_server_port = file_service_port;

    //节点信息
    g_node_info.node_name = "tcnstream";
    g_node_info.node_ip = file_service_host;
    g_node_info.node_port = http_server_port;

    try{
        httplib_service_start(http_server_port);
        LOGI(MAIN) << "http server start succeed, server port is " << http_server_port << ".";
    }
    catch (GeneralException2& e) {
        httplib_service_close();
        LOGE(TCNSTREAM) << "init failed: " << e.err_code() << "," << e.err_str();
        return e.err_code();
    }

    // 启动时检查本地是否有缓存任务文件
    restart_session_from_local();

    start_profiler();

    // 启动保活线程
    std::thread keepliveTh([]() {
      while (true) {
        std::this_thread::sleep_for(std::chrono::seconds(30));
        post_keepalive_msg();
      }
    });
    keepliveTh.detach();

    return ERROR_OK;
}

然后看build_pipeline(FLAGS_config_fname);函数

int build_pipeline(string const& config) {
    s_pipeline = std::shared_ptr<Pipeline>(new Pipeline("TACLPipeline"), [](Pipeline* pipeline) { pipeline->Stop(); });
    if (s_pipeline == nullptr) {
        return ERROR_ALLOC_MEMORY;
    }

    //build pipeline
    if (!s_pipeline->BuildPipelineByJSONFile(config)) {
        LOGE(SESSION_MGR) << "Build pipeline failed.";
        return EXIT_FAILURE;
    }

    //message observer
    static MsgObserver msg_observer(s_pipeline.get(), g_source_name);
    s_pipeline->SetStreamMsgObserver(reinterpret_cast<cnstream::StreamMsgObserver*>(&msg_observer));

    s_pipeline->GetEventBus()->AddBusWatch([&](const Event & event)->EventHandleFlag{
        if(event.module_name == "TuringPipeline/decode/source" && EventType::EVENT_EOS == event.type){
            LOGI(SESSION_MGR) << "recv bus event: " << static_cast<int8_t>(event.type) << "_" << event.module_name << "_" << event.stream_id << "_" << event.message;
            {
                std::string run_msg_str;
                std::unique_lock<std::mutex> locker(s_session_locker);
                auto it = s_task_sessions.find(event.stream_id);
                if (it != s_task_sessions.end()) {
                    LOGE(SESSION_MGR) << event.stream_id << "stream interruption!!!!!!!!!!";
                    //如果是文件就直接停止,如果不是文件就进入重连机制
                    if(it->second->get_video_type() == EFILE)
                    {
                        it->second->stop(g_source_name);
                        run_msg_str = it->second->post_end_mark();

                        LOGI(SESSION_MGR) << "run msg is: " << run_msg_str;

                        //puglish msg
                        cnstream::ActiveMQ* activemq_run_msg = dynamic_cast<cnstream::ActiveMQ*>(s_pipeline->GetModule(g_activemq_name));
                        if (activemq_run_msg != nullptr && run_msg_str != "") {
                            activemq_run_msg->PublishEvent(run_msg_str);
                        }

                        s_task_sessions.erase(it->first);
                    }
                    else if(it->second->get_s_state_() != S_USER_CLOSE)
                    {
                        it->second->set_s_state_(S_EXCEPTION_CLOSE);
                    }
                }
            }
        }
    });

    //start pipeline
    if (!s_pipeline->Start()) {
        LOGE(SESSION_MGR) << "Pipeline start failed.";
        return EXIT_FAILURE;
    }
    return ERROR_OK;
}

这里面重点就看3个函数

  • new Pipeline("TACLPipeline")
  • BuildPipelineByJSONFile
  • s_pipeline->Start()

2.1 new Pipeline("TACLPipeline")

其中构造函数如下

Pipeline::Pipeline(const std::string& name) : name_(name) {
  // stream message handle thread
  exit_msg_loop_ = false;
  smsg_thread_ = std::thread(&Pipeline::StreamMsgHandleFunc, this);

  event_bus_.reset(new (std::nothrow) EventBus());
  LOGF_IF(CORE, nullptr == event_bus_) << "Pipeline::Pipeline() failed to alloc EventBus";
  GetEventBus()->AddBusWatch(std::bind(&Pipeline::DefaultBusWatch, this, std::placeholders::_1));

  idxManager_.reset(new (std::nothrow) IdxManager());
  LOGF_IF(CORE, nullptr == idxManager_) << "Pipeline::Pipeline() failed to alloc IdxManager";

  graph_.reset(new (std::nothrow) CNGraph<NodeContext>());
  LOGF_IF(CORE, nullptr == graph_) << "Pipeline::Pipeline() failed to alloc CNGraph";
}

前面两个都是跟eventbus相关的,然后IdxManager是用来管理stream_id的,然后最下面的CNGraph就是图,这里创建图的时候传入的模板类型是


/**
 * @brief The node context used by pipeline.
 */
struct NodeContext {
  std::shared_ptr<Module> module;
  std::shared_ptr<Connector> connector;
  uint64_t parent_nodes_mask = 0;
  uint64_t route_mask = 0;  // for head nodes
  // for gets node instance by a module, see Module::context_;
  std::weak_ptr<CNGraph<NodeContext>::CNNode> node;
};

这里面的Connector就是用来连接不同module的,

class Connector : private NonCopyable {
 public:
  /**
   * @brief Connector constructor.
   * @param
   *   [conveyor_count]: the conveyor num of this connector.
   *   [conveyor_capacity]: the maximum buffer number of a conveyor.
   */
  explicit Connector(const size_t conveyor_count, size_t conveyor_capacity = 20);
  ~Connector();

  const size_t GetConveyorCount() const;
  size_t GetConveyorCapacity() const;
  bool IsConveyorFull(int conveyor_idx) const;
  bool IsConveyorEmpty(int conveyor_idx) const;
  size_t GetConveyorSize(int conveyor_idx) const;
  uint64_t GetFailTime(int conveyor_idx) const;

  CNFrameInfoPtr PopDataBufferFromConveyor(int conveyor_idx);
  bool PushDataBufferToConveyor(int conveyor_idx, CNFrameInfoPtr data);

  void Start();
  void Stop();
  bool IsStopped();
  void EmptyDataQueue();

 private:
  Conveyor* GetConveyorByIdx(int idx) const;
  Conveyor* GetConveyor(int conveyor_idx) const;

  std::vector<Conveyor*> conveyors_;
  size_t conveyor_capacity_ = 20;
  std::vector<uint64_t> fail_times_;
  std::atomic<bool> stop_{false};
};  // class Connector

Connector里面有个conveyors_成员,这个conveyors_可以看到是个vector,那就说明有多个,为什么有多个,是因为比如我们有很多路流,那么两个module之间连接时要有多个conveyor,每个conveyor负责一路流的传输,然后这里的conveyor_capacity_就是指队列的长度不能超过20,


/**
 * @brief Conveyor is used to transmit data between two modules.
 *
 * Conveyor belongs to Connector.
 * Each Connect could have several conveyors which depends on the paramllelism of each module.
 *
 * Conveyor has one buffer queue for transmitting data from one module to another.
 * The upstream node module will push data to buffer queue, and the downstream node will pop data from buffer queue.
 *
 * The capacity of buffer queue could be set in configuration json file (see README for more information of
 * configuration json file). If there is no element in buffer queue, the downstream node will wait to pop and
 * be blocked. On contrary, if the queue is full, the upstream node will wait to push and be blocked.
 */
class Conveyor : private NonCopyable {
 public:
  explicit Conveyor(size_t max_size);
  ~Conveyor() = default;
  bool PushDataBuffer(CNFrameInfoPtr data);
  CNFrameInfoPtr PopDataBuffer();
  std::vector<CNFrameInfoPtr> PopAllDataBuffer();
  uint32_t GetBufferSize();
  uint64_t GetFailTime();

#ifdef UNIT_TEST
 public:  // NOLINT
#else
 private:  // NOLINT
#endif

 private:
  std::queue<CNFrameInfoPtr> dataq_;
  size_t max_size_;
  uint64_t fail_time_ = 0;
  std::mutex data_mutex_;
  std::condition_variable notempty_cond_;
  const std::chrono::milliseconds rel_time_{20};
};  // class Conveyor

这个Conveyor类就是负责具体的传输数据的。

关于Pipeline的构造函数就先看这么多,图的东西在后面的两个函数中再看。

2.2 BuildPipelineByJSONFile

inline bool Pipeline::BuildPipelineByJSONFile(const std::string& config_file) {
  CNGraphConfig graph_config;
  if (!graph_config.ParseByJSONFile(config_file)) {
    LOGE(CORE) << "Parse graph config file failed.";
    return false;
  }
  return BuildPipeline(graph_config);
}

2.2.1 graph_config.ParseByJSONFile(config_file)

这里首先是解析json文件,

bool CNConfigBase::ParseByJSONFile(const std::string& jfile) {
  printf("jfile is %s\n", jfile.c_str());
  std::ifstream ifs(jfile);
  if (!ifs.is_open()) {
    LOGE(CORE) << "Config file open failed :" << jfile;
    return false;
  }
  std::string jstr((std::istreambuf_iterator<char>(ifs)), std::istreambuf_iterator<char>());
  ifs.close();
  config_root_dir = GetPathDir(jfile);
  if (!ParseByJSONStr(jstr)) {
    return false;
  }
  return true;
}

然后看


bool CNGraphConfig::ParseByJSONStr(const std::string& json_str) {
  rapidjson::Document doc;
  if (doc.Parse<rapidjson::kParseCommentsFlag>(json_str.c_str()).HasParseError()) {
    LOGE(CORE) << "Parse graph configuration failed. Error code [" << std::to_string(doc.GetParseError()) << "]"
               << " Offset [" << std::to_string(doc.GetErrorOffset()) << "]. ";
    return false;
  }

  // traversing config items
  for (rapidjson::Document::ConstMemberIterator iter = doc.MemberBegin(); iter != doc.MemberEnd(); ++iter) {
    rapidjson::StringBuffer sbuf;
    rapidjson::Writer<rapidjson::StringBuffer> jwriter(sbuf);
    iter->value.Accept(jwriter);

    std::string item_name = iter->name.GetString();
    std::string item_value = sbuf.GetString();

    if (IsProfilerItem(item_name)) {
      // parse if profiler config
      if (!profiler_config.ParseByJSONStr(item_value)) {
        LOGE(CORE) << "Parse profiler config failed.";
        return false;
      }
    } else if (IsSubgraphItem(item_name)) {
      // parse if subgraph config
      CNSubgraphConfig subgraph_config;
      subgraph_config.name = item_name;
      if (!subgraph_config.ParseByJSONStr(item_value)) {
        LOGE(CORE) << "Parse subgraph config failed. Subgraph name : [" + item_name + "].";
        return false;
      }
      // correct the relative path of subgraph configuration file.
      subgraph_config.config_path = config_root_dir + subgraph_config.config_path;
      subgraph_configs.push_back(std::move(subgraph_config));
    } else {
      // parse module config and insert graph nodes
      CNModuleConfig mconf;
      mconf.config_root_dir = config_root_dir;
      mconf.name = item_name;
      if (!mconf.ParseByJSONStr(item_value)) {
        LOGE(CORE) << "Parse module config failed. Module name : [" << mconf.name << "]";
        return false;
      }

      module_configs.push_back(std::move(mconf));
    }
  }  // for json items
  return true;
}

结合者config文件看

{
  "profiler_config" : {
    "enable_profiling" : false,
    "enable_tracing" : false
  },

  "subgraph:app_config":{
    "config_path" : "configs/app_config.json"
  },

  "subgraph:decode" : {
    "config_path" : "configs/decode_config.json",
    "next_modules" : ["subgraph:object_detection"]
  },

  "subgraph:object_detection" : {
    "config_path" : "configs/yolov5_nv_pcb_df.json",
    "next_modules" : ["subgraph:object_tracking"]
  },

  "subgraph:object_tracking" : {
    "config_path" : "configs/object_track_sort.json",
    "next_modules" : ["subgraph:business"]
  },
  
  "subgraph:business" : {
    "config_path" : "configs/business.json",
    "next_modules" : ["subgraph:image_store","subgraph:sinker"]
  },
  
  "subgraph:image_store" : {
    "config_path" : "configs/image_store.json",
    "next_modules" : ["subgraph:activemq", "subgraph:mysql"]
  },
  
  "subgraph:activemq" : {
    "config_path" : "configs/sinker_configs/activemq.json"
  },

  "subgraph:mysql" : {
    "config_path" : "configs/sinker_configs/mysql.json"
  },
  
  "subgraph:sinker" : {
    "config_path" : "configs/sinker_configs/rtsp.json"
  },

  "subgraph:preview_decode" : {
    "config_path" : "configs/preview_decode_config.json",
    "next_modules" : ["subgraph:preview_image_store"]
  },
  
  "subgraph:preview_image_store" : {
    "config_path" : "configs/image_store.json"
  }
}

然后调试看到,他就是把左右的都保存到了subgraph_configs里面。

然后就开始调用BuildPipeline(graph_config);了

2.2.2  BuildPipeline(graph_config);


bool Pipeline::BuildPipeline(const CNGraphConfig& graph_config) {
  auto t = graph_config;
  t.name = GetName();
  if (!graph_->Init(t)) {
    LOGE(CORE) << "Init graph failed.";
    return false;
  }
  // create modules by config
  std::vector<std::shared_ptr<Module>> modules;  // used to init profiler
  if (!CreateModules(&modules)) {
    LOGE(CORE) << "Create modules failed.";
    return false;
  }

  // generate parant mask for all nodes and route mask for head nodes.
  GenerateModulesMask();

  // This call must after GenerateModulesMask called,
  profiler_.reset(
      new PipelineProfiler(graph_->GetConfig().profiler_config, GetName(), modules, GetSortedModuleNames()));

  // create connectors for all nodes beside head nodes.
  // This call must after GenerateModulesMask called,
  // then we can determine witch are the head nodes.
  return CreateConnectors();
}

然后先看init函数


template <typename T>
bool CNGraph<T>::Init() {
  Clear();
  // subgraph analysis loop detect
  static thread_local std::set<std::string> subgraph_paths;
  if (nullptr == parent_graph_) {
    // root graph, init subgraph_paths
    subgraph_paths.clear();
  }

  dag_algorithm_.Reserve(config_.module_configs.size() + config_.subgraph_configs.size());
  // insert vertices
  for (const auto& module_config : config_.module_configs)
    if (!AddVertex(module_config)) return false;
  for (const auto& subgraph_config : config_.subgraph_configs) {
    if (!IsSubgraphItem(subgraph_config.name)) {
      // check subgraph name prefix, when subgraph_config.name is not parsed by CNGraphConfig::Parsexxxxx,
      // maybe will set an wrong name by user.
      LOGE(CORE) << "Subgraph's name must set with an prefix [" << std::string(kSubgraphConfigPrefix)
                 << "], wrong name : " << subgraph_config.name;
      return false;
    }
    if (parent_graph_) {
      // Current graph is a subgraph of other graphs
      auto real_path = __help_functions__::GetRealPath(subgraph_config.config_path);
      if (real_path.empty()) return false;
      if (!subgraph_paths.insert(real_path).second) {
        LOGE(CORE) << GetLogPrefix() << "A graph analysis loop was detected when parsing the subgraph named ["
                   << subgraph_config.name + "].";
        return false;
      }
    }
    if (!AddVertex(subgraph_config)) return false;
  }

  if (!InitEdges()) return false;

  FindHeadsAndTails();

  // check circle
  auto topo_result = dag_algorithm_.TopoSort();
  if (topo_result.second.size()) {
    LOGE(CORE) << GetLogPrefix() + "Ring detected.";
    return false;
  }
  return true;
}

其中

  dag_algorithm_.Reserve(config_.module_configs.size() + config_.subgraph_configs.size());

这里是图的顶点的个数,也就是config里面所有的module和subgraph这两个配置项加起来的个数。

然后直接到了if (!AddVertex(subgraph_config)) return false;


template <typename T>
bool CNGraph<T>::AddVertex(const CNSubgraphConfig& config) {
  if (!__help_functions__::IsNodeNameValid(config.name)) {
    LOGE(CORE) << GetLogPrefix() << "Subgraph[" << config.name << "] name invalid. "
               << "The name of modules or subgraphs can not contain slashes or risks";
    return false;
  }
  CNGraphConfig graph_config;
  if (!graph_config.ParseByJSONFile(config.config_path)) {
    LOGE(CORE) << GetLogPrefix() << "Parse subgraph config file failed. subgraph name: " << config.name;
    return false;
  }
  graph_config.name = __help_functions__::NameIgnoreSubgraphPrefix(config.name);
  auto subgraph = std::make_shared<CNGraph<T>>(graph_config);
  subgraph->parent_graph_ = this;
  if (!subgraph->Init()) {
    LOGE(CORE) << GetLogPrefix() + "Init subgraph[" + config.name + "] failed.";
    return false;
  }

  int vertex_id = dag_algorithm_.AddVertex();
  vertex_map_to_node_name_.push_back(config.name);

  if (!subgraph_node_map_.insert(std::make_pair(config.name, std::make_tuple(vertex_id, config, subgraph))).second) {
    LOGE(CORE) << GetLogPrefix() + "Subgraph[" + config.name + "] name duplicated.";
    return false;
  }
  return true;
}

3 不同module之间怎么传递数据

从前面的构建Pipeline的整体流程也可以看出来,在Pipeline的构造函数中最下面

Pipeline::Pipeline(const std::string& name) : name_(name) {
  // stream message handle thread
  exit_msg_loop_ = false;
  smsg_thread_ = std::thread(&Pipeline::StreamMsgHandleFunc, this);

  event_bus_.reset(new (std::nothrow) EventBus());
  LOGF_IF(CORE, nullptr == event_bus_) << "Pipeline::Pipeline() failed to alloc EventBus";
  GetEventBus()->AddBusWatch(std::bind(&Pipeline::DefaultBusWatch, this, std::placeholders::_1));

  idxManager_.reset(new (std::nothrow) IdxManager());
  LOGF_IF(CORE, nullptr == idxManager_) << "Pipeline::Pipeline() failed to alloc IdxManager";

  graph_.reset(new (std::nothrow) CNGraph<NodeContext>());
  LOGF_IF(CORE, nullptr == graph_) << "Pipeline::Pipeline() failed to alloc CNGraph";
}

前面两个都是跟eventbus相关的,然后IdxManager是用来管理stream_id的,然后最下面的CNGraph就是图,这里创建图的时候传入的模板类型是


/**
 * @brief The node context used by pipeline.
 */
struct NodeContext {
  std::shared_ptr<Module> module;
  std::shared_ptr<Connector> connector;
  uint64_t parent_nodes_mask = 0;
  uint64_t route_mask = 0;  // for head nodes
  // for gets node instance by a module, see Module::context_;
  std::weak_ptr<CNGraph<NodeContext>::CNNode> node;
};

这里面的Connector就是用来连接不同module的,

class Connector : private NonCopyable {
 public:
  /**
   * @brief Connector constructor.
   * @param
   *   [conveyor_count]: the conveyor num of this connector.
   *   [conveyor_capacity]: the maximum buffer number of a conveyor.
   */
  explicit Connector(const size_t conveyor_count, size_t conveyor_capacity = 20);
  ~Connector();

  const size_t GetConveyorCount() const;
  size_t GetConveyorCapacity() const;
  bool IsConveyorFull(int conveyor_idx) const;
  bool IsConveyorEmpty(int conveyor_idx) const;
  size_t GetConveyorSize(int conveyor_idx) const;
  uint64_t GetFailTime(int conveyor_idx) const;

  CNFrameInfoPtr PopDataBufferFromConveyor(int conveyor_idx);
  bool PushDataBufferToConveyor(int conveyor_idx, CNFrameInfoPtr data);

  void Start();
  void Stop();
  bool IsStopped();
  void EmptyDataQueue();

 private:
  Conveyor* GetConveyorByIdx(int idx) const;
  Conveyor* GetConveyor(int conveyor_idx) const;

  std::vector<Conveyor*> conveyors_;
  size_t conveyor_capacity_ = 20;
  std::vector<uint64_t> fail_times_;
  std::atomic<bool> stop_{false};
};  // class Connector

Connector里面有个conveyors_成员,这个conveyors_可以看到是个vector,那就说明有多个,为什么有多个,是因为比如我们有很多路流,那么两个module之间连接时要有多个conveyor,每个conveyor负责一路流的传输,然后这里的conveyor_capacity_就是指队列的长度不能超过20,


/**
 * @brief Conveyor is used to transmit data between two modules.
 *
 * Conveyor belongs to Connector.
 * Each Connect could have several conveyors which depends on the paramllelism of each module.
 *
 * Conveyor has one buffer queue for transmitting data from one module to another.
 * The upstream node module will push data to buffer queue, and the downstream node will pop data from buffer queue.
 *
 * The capacity of buffer queue could be set in configuration json file (see README for more information of
 * configuration json file). If there is no element in buffer queue, the downstream node will wait to pop and
 * be blocked. On contrary, if the queue is full, the upstream node will wait to push and be blocked.
 */
class Conveyor : private NonCopyable {
 public:
  explicit Conveyor(size_t max_size);
  ~Conveyor() = default;
  bool PushDataBuffer(CNFrameInfoPtr data);
  CNFrameInfoPtr PopDataBuffer();
  std::vector<CNFrameInfoPtr> PopAllDataBuffer();
  uint32_t GetBufferSize();
  uint64_t GetFailTime();

#ifdef UNIT_TEST
 public:  // NOLINT
#else
 private:  // NOLINT
#endif

 private:
  std::queue<CNFrameInfoPtr> dataq_;
  size_t max_size_;
  uint64_t fail_time_ = 0;
  std::mutex data_mutex_;
  std::condition_variable notempty_cond_;
  const std::chrono::milliseconds rel_time_{20};
};  // class Conveyor

这个Conveyor类就是负责具体的传输数据的。

那么实际传输数据的代码在哪里呢,每个module类里面的TransmitData会调用Pipeline里面的TransmitData函数,憨厚


void Pipeline::TransmitData(NodeContext* context, const std::shared_ptr<CNFrameInfo>& data) {
 //std::chrono::high_resolution_clock::time_point tnow = std::chrono::high_resolution_clock::now();
  if (data->IsInvalid()) {
    OnDataInvalid(context, data);
    return;
  }
  if (!context->parent_nodes_mask) {
    // root node
    // set mask to 1 for never touched modules, for case which has multiple source modules.
    data->SetModulesMask(all_modules_mask_ ^ context->route_mask);
  }
  if (data->IsEos()) {
    OnEos(context, data);
  } else {
    OnProcessEnd(context, data);
    if (IsStreamRemoved(data->stream_id)) return;
  }

  auto node = context->node.lock();
  auto module = context->module;
  const uint64_t cur_mask = data->MarkPassed(module.get());
  const bool passed_by_all_modules = PassedByAllModules(cur_mask);

  if (passed_by_all_modules) {
    OnPassThrough(data);
    return;
  }

 // std::chrono::high_resolution_clock::time_point tpost = std::chrono::high_resolution_clock::now();
  //std::cout << "<<<<<< 44444444444 TransmitData  up time = " << std::chrono::duration_cast<std::chrono::duration<double>>(tpost - tnow).count() * 1000 << " ms" << std::endl;

 // std::chrono::high_resolution_clock::time_point tnow1 = std::chrono::high_resolution_clock::now();
  // transmit to next nodes
  for (auto next_node : node->GetNext()) {
    if (!PassedByAllParentNodes(&next_node->data, cur_mask)) continue;
    auto next_module = next_node->data.module;
    auto connector = next_node->data.connector;
    // push data to conveyor only after data passed by all parent nodes.
    if (IsProfilingEnabled() && !data->IsEos())
      next_module->GetProfiler()->RecordProcessStart(kINPUT_PROFILER_NAME,
                                                     std::make_pair(data->stream_id, data->timestamp));
    const int conveyor_idx = data->GetStreamIndex() % connector->GetConveyorCount();

    int retry_cnt = 1;
    while (!connector->IsStopped() && connector->PushDataBufferToConveyor(conveyor_idx, data) == false) {
      std::this_thread::sleep_for(std::chrono::milliseconds(5 * retry_cnt));
      retry_cnt = std::min(retry_cnt * 2, 10);
    }  // while try push
  }  // loop next nodes
  //std::chrono::high_resolution_clock::time_point tpost1 = std::chrono::high_resolution_clock::now();
 // std::cout << "<<<<<< 555555555555 TransmitData  down time = " << std::chrono::duration_cast<std::chrono::duration<double>>(tpost1 - tnow1).count() * 1000 << " ms" << std::endl;

}

然后在

bool Connector::PushDataBufferToConveyor(int conveyor_idx, CNFrameInfoPtr data) {
  return GetConveyor(conveyor_idx)->PushDataBuffer(data);
}

然后最终是通过Conveyor类进行传输的,在这里push到队列中

bool Conveyor::PushDataBuffer(CNFrameInfoPtr data) {
  std::unique_lock<std::mutex> lk(data_mutex_);
  if (dataq_.size() < max_size_) {
    dataq_.push(data);
    notempty_cond_.notify_one();
    fail_time_ = 0;
    return true;
  }
  fail_time_ += 1;
  return false;
}

每个module在这里从队列总pop出数据进行处理

void Pipeline::TaskLoop(NodeContext* context, uint32_t conveyor_idx) {
  auto module = context->module;
  auto connector = context->connector;
  auto node_name = module->GetName();

  // process loop
  while (1) {
    std::shared_ptr<CNFrameInfo> data = nullptr;
    // pull data from conveyor
    while (!connector->IsStopped() && data == nullptr) data = connector->PopDataBufferFromConveyor(conveyor_idx);
    if (connector->IsStopped()) break;
    if (data == nullptr) {
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
      continue;
    }

    OnProcessStart(context, data);
    int ret = module->DoProcess(data);
    if (ret < 0) OnProcessFailed(context, data, ret);

    std::this_thread::yield();
  }  // while process loop
}

4 怎么区分多路不同流----用stream_id

根据stream_id区分多路流,具体是

ret = AddSourceForFile(source, stream_id, filename, FLAGS_src_frame_rate);

        cnstream::CreateSource(source, stream_id, param);

                std::make_shared<FileHandler>(module, stream_id, param);               

                        FileHandler::FileHandler(DataSource *module, const std::string &stream_id,                                 const FileSourceParam &param)

                                : SourceHandler(module, stream_id) {

                                impl_ = new (std::nothrow) FileHandlerImpl(module, param, this);

                        }

                                stream_index_ = module_->GetStreamIndex(stream_id_);

                                        if (container_) return container_->GetStreamIndex(stream_id);

                                                return idxManager_->GetStreamIndex(stream_id);     

然后

uint32_t IdxManager::GetStreamIndex(const std::string& stream_id) {
  std::lock_guard<std::mutex> guard(id_lock);
  auto search = stream_idx_map.find(stream_id);
  if (search != stream_idx_map.end()) {
    return search->second;
  }

  for (uint32_t i = 0; i < GetMaxStreamNumber(); i++) {
    if (!stream_bitset[i]) {
      stream_bitset.set(i);
      stream_idx_map[stream_id] = i;
      return i;
    }
  }
  return kInvalidStreamIdx;
}

每个Pipeline都有一个IdxManager类成员std::unique_ptr<IdxManager> idxManager_ = nullptr;,用来管理stream_id的

/**
 * @brief ModuleId&StreamIdx manager for pipeline. Allocates and deallocates id for Pipeline modules & streams.
 */
class IdxManager {
 public:
  IdxManager() = default;
  IdxManager(const IdxManager &) = delete;
  IdxManager &operator=(const IdxManager &) = delete;
  uint32_t GetStreamIndex(const std::string &stream_id);
  void ReturnStreamIndex(const std::string &stream_id);
  size_t GetModuleIdx();
  void ReturnModuleIdx(size_t id_);

 private:
  std::mutex id_lock;
  std::map<std::string, uint32_t> stream_idx_map;
  std::bitset<kMaxStreamNum> stream_bitset;
  uint64_t module_id_mask_ = 0;
};  // class IdxManager

5 std::shared_ptr<CNFrameInfo> data里面的stream_id是哪里来的

然后

void FileHandlerImpl::OnDecodeFrame(BufSurfWrapperPtr wrapper) {
        if (frame_count_++ % param_.interval != 0) {
            // LOGI(SOURCE) << "frames are discarded" << frame_count_;
            return;  // discard frames
        }

        std::shared_ptr<CNFrameInfo> data = this->CreateFrameInfo();
        if (!data) {
            LOGW(SOURCE) << "[FileHandlerImpl] OnDecodeFrame(): failed to create FrameInfo.";
            return;
        }

        data->timestamp = wrapper->GetPts();
        if (!wrapper->GetBufSurface()) {
            data->flags = static_cast<size_t>(CNFrameFlag::CN_FRAME_FLAG_INVALID);
            this->SendFrameInfo(data);
            return;
        }

#if 0 //调试代码,记得修改。
        static int debug_count  = 0;
        BufSurface* src_buf = wrapper->GetBufSurface();
        std::vector<uint8_t> indata(src_buf->surface_list[0].data_size);//调试代码,记得删除。
        MemcpyHD(indata.data(), BUF_MEMORY_NORMAL, src_buf->surface_list[0].data_ptr, BUF_MEMORY_NORMAL, src_buf->surface_list[0].data_size);//调试代码,记得删除。
        cv::Mat yuv_mat(src_buf->surface_list[0].height * 3 / 2, src_buf->surface_list[0].width, CV_8UC1, indata.data());//调试代码,记得修改。
        // std::vector<uint8_t> indata(1920*1080*1.5);//调试代码,记得删除。
        // MemcpyHD(indata.data(), BUF_MEMORY_NORMAL, src_buf->surface_list[0].data_ptr, BUF_MEMORY_NORMAL, 1920*1080*1.5);//调试代码,记得删除。
        // cv::Mat yuv_mat(1080 * 3 / 2, 1920, CV_8UC1, indata.data());//调试代码,记得修改。
        cv::Mat bgr_mat;
        cv::cvtColor(yuv_mat, bgr_mat, cv::COLOR_YUV2BGR_NV12); //调试代码,记得修改,这是原代码  
        //cv::cvtColor(yuv_mat, bgr_mat, cv::COLOR_YUV2BGR_I420); //调试代码,记得修改,不对,
        //cv::imwrite("OnDecodeFrame.jpg", bgr_mat);
        debug_count =  debug_count + 1;
        char decode_name[20] = {};

        if((debug_count < 3000) && (debug_count % 2 == 0))
        {
            sprintf(decode_name, "OnDecodeFrame_%d.jpg",  debug_count);
            //cv::imwrite(decode_name, bgr_mat);
        }
        
#endif

        //std::chrono::high_resolution_clock::time_point tnow = std::chrono::high_resolution_clock::now();
        int ret = SourceRender::Process(data, std::move(wrapper), frame_id_++, param_);
        if (ret < 0) {
            LOGE(SOURCE) << "[FileHandlerImpl] OnDecodeFrame(): [" << stream_id_ << "]: Render frame failed";
            return;
        }
        //std::chrono::high_resolution_clock::time_point tpost = std::chrono::high_resolution_clock::now();
        //std::cout << "<<<<<< 0000000000000000000000000000000SourceRender::Process  time = " << std::chrono::duration_cast<std::chrono::duration<double>>(tpost - tnow).count() * 1000 << " ms" << std::endl;

        //std::chrono::high_resolution_clock::time_point tnow1 = std::chrono::high_resolution_clock::now();
        this->SendFrameInfo(data);
       // std::chrono::high_resolution_clock::time_point tpost1 = std::chrono::high_resolution_clock::now();
       // std::cout << "<<<<<< 111111111111111111this->SendFrameInfo  time = " << std::chrono::duration_cast<std::chrono::duration<double>>(tpost1 - tnow1).count() * 1000 << " ms" << std::endl;
    }

然后

std::shared_ptr<CNFrameInfo> CreateFrameInfo(bool eos = false) {
    std::shared_ptr<CNFrameInfo> data;
    int retry_cnt = 1;
    while (1) {
      data = handler_->CreateFrameInfo(eos);
      if (data != nullptr) break;
      if (CreateInterrupt()) break;
      std::this_thread::sleep_for(std::chrono::microseconds(5 * retry_cnt));
      retry_cnt = std::min(retry_cnt * 2, 20);
    }
    if (!eos) {
      auto dataframe = std::make_shared<CNDataFrame>();
      if (!dataframe) {
        return nullptr;
      }
      auto inferobjs = std::make_shared<CNInferObjs>();
      if (!inferobjs) {
        return nullptr;
      }
      data->collection.Add(kCNDataFrameTag, dataframe);
      data->collection.Add(kCNInferObjsTag, inferobjs);
    }
    return data;
  }

最终是

std::shared_ptr<CNFrameInfo> CNFrameInfo::Create(const std::string &stream_id, bool eos,
                                                 std::shared_ptr<CNFrameInfo> payload) {
  if (stream_id == "") {
    LOGE(CORE) << "CNFrameInfo::Create() stream_id is empty string.";
    return nullptr;
  }
  std::shared_ptr<CNFrameInfo> ptr(new (std::nothrow) CNFrameInfo());
  if (!ptr) {
    LOGE(CORE) << "CNFrameInfo::Create() new CNFrameInfo failed.";
    return nullptr;
  }
  ptr->stream_id = stream_id;
  ptr->payload = payload;
  if (eos) {
    ptr->flags |= static_cast<size_t>(cnstream::CNFrameFlag::CN_FRAME_FLAG_EOS);
    if (!ptr->payload) {
      std::lock_guard<std::mutex> guard(s_eos_lock_);
      s_stream_eos_map_[stream_id] = false;
    }
    return ptr;
  }

  return ptr;
}

这样data里面就有stream_id了。

参考文献:

手写EventBus(观察者模式、源码阅读、反射) - 简书

CNStream流处理多路并发Pipeline框架整体介绍-CSDN博客

寒武纪CNStream用户手册 — 寒武纪CNStream用户手册 6.2.0 文档

GitHub - Cambricon/CNStream: CNStream is a streaming framework for building Cambricon machine learning pipelines http://forum.cambricon.com https://gitee.com/SolutionSDK/CNStream

CNStream/docs/release_document/latest/Cambricon-CNStream-User-Guide-CN-vlatest.pdf at master · Cambricon/CNStream · GitHub

C++内存池Memory Pool的高级实现、代码详解、CMake构建工程、应用实例-CSDN博客

aclStream流处理多路并发Pipeline框架中VEncode Module代码调用流程整理、类的层次关系整理、回调函数赋值和调用流程整理-CSDN博客

 CNStream代码中C++反射机制的使用-CSDN博客

aclStream流处理多路并发Pipeline框架中 视频解码 代码调用流程整理、类的层次关系整理、回调函数赋值和调用流程整理-CSDN博客

EventBus的C++实现、代码分析-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2217406.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

EI收录检索报告是什么样的?怎么出具?一文了解!

一、EI检索报告是什么 EI(Engineering Village)数据库是全球最全面的工程检索二次文献数据库&#xff0c;它收录了7,000,000多篇论文的参考文献和摘要。这些论文出自5,000多种工程类期刊、会议论文集和技术报告。EI收录的文献涵盖了所有的工程领域&#xff0c;其中大约22%为会…

聊聊零基础如何开始学习鸿蒙开发技术

鸿蒙系统是一款分布式操作系统&#xff0c;其适用范围非常广泛&#xff0c;从智能手机到家用电器&#xff0c;再到工业设备&#xff0c;都能找到应用场景。特别是在智能家居领域&#xff0c;鸿蒙系统可以实现不同设备之间的无缝连接和协同工作&#xff0c;提供更加智能和便利的…

建筑工程管理软件推荐,2024年最佳选择

建筑工程管理软件助力项目全周期管理&#xff0c;包括规划、监控、资源成本控制等。类型多样&#xff0c;选购需考虑需求匹配、便捷性、集成能力、灵活性和安全性。软件优化流程、提高效率、监控进展、优化资源配置、提升协作水平。 一、建筑工程管理软件到底是什么&#xff1f…

浮点数二进制制科学计数法理解

Owed by: 春夜喜雨 http://blog.csdn.net/chunyexiyu 1. 引言 对于浮点数&#xff0c;主要是单精度-float、双精度-double两种类型。 对于浮点类型&#xff0c;我们知道其采用科学计数法&#xff0c;准确来说应该是二进制科学计数法。 为什么准确说是是二进制科学计数法&…

机器学习笔记20241017

文章目录 torchvisiondataloadernn.module卷积非线性激活模型选择训练误差泛化误差 正则化权重衰退的基本概念数学表示权重衰退的效果物理解释 数值稳定性&#xff08;Gradient Vanishing&#xff09;梯度消失原因解决方法 梯度爆炸&#xff08;Gradient Explosion&#xff09;…

linux anconda下基础环境配置(torch、opencv等)

1、torch安装&#xff08;GPU&#xff09; 下载链接&#xff1a;https://pytorch.org/ 根据配置下载对应版本&#xff0c;CUDA11.4 可用11.3下的安装包 conda install pytorch1.12.0 torchvision0.13.0 torchaudio0.12.0 cudatoolkit11.3 -c pytorch错误解决&#xff1a; 安…

好用的python相关的AI工具Bito介绍

插件名称&#xff1a;Bito 好用的python相关的AI工具Bito介绍 step 1:点插件step 2&#xff1a;搜索bito并安装step3 &#xff1a;需要登录&#xff0c;要有真实邮箱&#xff0c;按步骤走就行&#xff0c;完后就可以使用 step 1:点插件 step 2&#xff1a;搜索bito并安装 step3…

基于PHP考研互助系统【附源码】

基于PHP考研互助系统 效果如下&#xff1a; 系统首页界面 用户注册界面 考研论坛页面 每日打卡页面 管理员登录主页面 管理员主界面 用户管理界面 备考经验界面 研究背景 近些年&#xff0c;随着中国经济发展&#xff0c;人民的生活质量逐渐提高&#xff0c;对网络的依赖性越…

Q2=10 and Q2=1--PLB(Fig.4)

&#xff08;个人学习笔记&#xff0c;仅供参考&#xff09; import numpy as np from scipy.special import kv, erfc from scipy.integrate import dblquad import matplotlib.pyplot as plt import scipy.integrate as spi# Constants w 0.6198 g0_sq 21.5989 rho 0.782…

【UML】一个UML学习的还不错的几个帖子

https://segmentfault.com/a/1190000042775634 寂然解读设计模式 - UML类图&类的六大关系-阿里云开发者社区

【工具变量】上市公司企业广告支出数据(2007-2023年)

一、测算方式&#xff1a;具体而言&#xff0c;参照 Lu 等&#xff08;2022&#xff09;的研究&#xff0c;本文通过上市公司财务报表附注获取每家上市公司每年销售费用明细项目&#xff0c;筛选出广告费、广告宣传费、广告推广费、广告策划费、广告展览费等与广告支出相关的项…

CodeActAgent :Executable Code Actions Elicit Better LLM Agents解读

论文地址 https://arxiv.org/pdf/2402.01030.pdf 项目地址 https://github.com/svjack/CodeActAgent-Gradio/blob/main/README.md 代码智能体的优势 选择代码智能体有以下几个关键原因&#xff0c;它们相较于使用类似JSON的字典输出具有显著优势&#xff1a; 1. 代码的高…

软件分享|Microsoft To Do 任务管理

Microsoft To Do 是一款流行的待办事项和任务管理应用&#xff0c;它帮助用户通过创建任务、清单和安排日程来提高生产力。以下是 Microsoft To Do 的一些关键特性&#xff1a; &#x1f511; 核心特性 任务和清单&#xff1a;创建任务并将其组织到不同的清单中&#xff0c;…

【Python数据结构】深入理解Python中的列表、字典和集合!

【Python数据结构】深入理解Python中的列表、字典和集合&#xff01; 在Python编程中&#xff0c;数据结构是处理和组织数据的核心部分。Python提供了许多内置的数据结构&#xff0c;如列表、字典和集合&#xff0c;它们为程序员提供了灵活、高效的数据管理方式。本篇博客将深…

主键 外键

主键 外键 在关系型数据库中&#xff0c;主键&#xff08;Primary Key&#xff09;和外键&#xff08;Foreign Key&#xff09;是用于维护数据完整性和建立表之间关系的重要概念。 主键&#xff08;Primary Key&#xff09; 定义: 主键是一个或多个列的组合&#xff0c;其值能…

【从零开发Mybatis】引入MapperConfig.xml和Mapper映射配置

引言 学习MyBatis源码之前&#xff0c;了解它是如何通过JDBC查询数据库数据的基础知识是非常有用的。 上一篇我们编写了一个最简单的示例&#xff0c;通过JDBC查询数据库数据&#xff0c;从本文开始&#xff0c;我们将正式开始Mybatis框架的开发。 通过JDBC查询数据库数据存…

计算机网络:数据链路层 —— 共享式以太网

文章目录 共享式以太网CSMA/CD 协议CSMA/CD 协议 的基本原理 共享式以太网的争用期共享式以太网的最小帧长共享式以太网的最大帧长共享式以太网的退避算法截断二进制指数退避算法 共享二进制以太网的信道利用率使用集线器的共享式以太网10BASE-T 共享式以太网 共享式以太网是当…

微分几何-曲线论(曲线)

文章目录 曲线定义常见曲线直线圆圆柱螺线维维安尼曲线 连续曲线/光滑曲线正则曲线切向量切线方程法平面&#xff08;法面&#xff09; 弧长定理1&#xff1a;弧长公式弧长参数化定理2&#xff1a;任何一条正则曲线都可以使用弧长作参数.&#xff08;也称弧长参数为**自然参数*…

[Linux] 逐层深入理解文件系统 (1)—— 进程操作文件

标题&#xff1a;[Linux] 文件系统 &#xff08;1&#xff09;—— 进程操作文件 个人主页水墨不写bug &#xff08;图片来源于网络&#xff09; 目录 一、进程与打开的文件 二、文件的系统调用与库函数的关系 1.系统调用open() 三、内存中的文件描述符表 四、缓冲区…

【Java SE 】类和对象详解

&#x1f525;博客主页&#x1f525;&#xff1a;【 坊钰_CSDN博客 】 欢迎各位点赞&#x1f44d;评论✍收藏⭐ 目录 1&#xff0c; 面向对象认识 1.1 什么时面向对象 1.2 面向对象和面向过程 1.2.1 一个例子理解对象和过程 1. 对于电脑来说 2. 对于我们人来说 2. 类的定…