object detection Graph
以目标检测为例分析mediapip流水线处理机制
一 流水线上游输入处理
1 Calculator
算子 它是在MediaPipe框架中用于创建插件/算子 机制的基础
在MediaPipe中,插件是一种可扩展的计算模块,可以用于实现各种不同的计算功能。calculator_base.h 文件定义了一个基类,所有插件都需要继承这个基类,并实现其中的函数或方法。通过使用这个基类,MediaPipe可以统一管理插件的接口和功能,使得在创建复杂的多媒体处理程序时更加灵活和可扩展。插件可以像拼积木一样组合和排列,以实现不同的功能和效果。
calculator_base.h 文件通常会包含一些基本的函数和属性,例如插件的初始化、更新、清理等操作,以及插件之间的通信和数据交换等。这个基类为插件的实现提供了一个统一的框架和规范,使得开发者可以根据自己的需求和创意来创建自定义的插件,并将其集成到MediaPipe的多媒体处理程序中。
它 计算图里的每个node都是calculator,是计算图的逻辑计算的载体,一个calculator可以接受0或多个stream或side packet, 输出0或多个stream或side packet. Calculator需要继承相同的基类并实现所需要的接口,并且要在framework中进行注册,以便可以通过配置文件进行构建。
1.1 CalculatorBase
calculator_base.h 头文件,它定义了MediaPipe框架中用于创建插件/算子 机制的基础类。
// Copyright 2019 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef MEDIAPIPE_FRAMEWORK_COLLECTION_H_
#define MEDIAPIPE_FRAMEWORK_COLLECTION_H_
#include <cstdlib>
#include <iterator>
#include <map>
#include <set>
#include <string>
#include <typeinfo>
#include <vector>
#include "absl/base/macros.h"
#include "absl/memory/memory.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "mediapipe/framework/collection_item_id.h"
#include "mediapipe/framework/port/logging.h"
#include "mediapipe/framework/tool/tag_map.h"
#include "mediapipe/framework/tool/tag_map_helper.h"
#include "mediapipe/framework/tool/validate_name.h"
#include "mediapipe/framework/type_map.h"
namespace mediapipe {
namespace internal {
// A class to handle errors that occur in Collection. For most
// collections, these errors should be fatal. However, for a collection
// more like PacketTypeSet, the errors should be deferred and handled
// later.
//
// This class is thread compatible.
template <typename T>
struct CollectionErrorHandlerFatal {
// An error occurred during object lookup for the provided tag and
// index. The returned object reference will be provided instead.
//
// Since there isn't any state and we're not returning anything, we
// get away with only one version of this function (which is const
// but returns a non-const reference).
T& GetFallback(const absl::string_view tag, int index) const {
LOG(FATAL) << "Failed to get tag \"" << tag << "\" index " << index;
std::abort();
}
};
enum class CollectionStorage { kStoreValue = 0, kStorePointer };
// A collection of objects of type T.
//
// If storage == kStorePointer then T* will be stored instead of T, but
// the accessor functions will still return T types. The T objects must
// be owned elsewhere and remain alive as long as the collection is used.
// To set the pointers use the GetPtr() function.
//
// The ErrorHandler object allows errors to be deferred to a later time.
//
// This class is thread compatible as long as the ErrorHandler object is also
// thread compatible.
template <typename T,
CollectionStorage storage = CollectionStorage::kStoreValue,
typename ErrorHandler = CollectionErrorHandlerFatal<T>>
class Collection {
private:
template <typename ItType>
class DoubleDerefIterator;
public:
using value_type = T;
// The iterator is over value_type, requiring a double dereference if
// storage == kStorePointer.
using iterator =
typename std::conditional<storage == CollectionStorage::kStorePointer,
DoubleDerefIterator<value_type>,
value_type*>::type;
using const_iterator =
typename std::conditional<storage == CollectionStorage::kStorePointer,
DoubleDerefIterator<const value_type>,
const value_type*>::type;
using difference_type = ptrdiff_t;
using size_type = size_t;
using pointer = value_type*;
using reference = value_type&;
// The type that is stored by data_;
using stored_type =
typename std::conditional<storage == CollectionStorage::kStorePointer,
value_type*, value_type>::type;
// Collection must be initialized on construction.
Collection() = delete;
Collection(const Collection&) = delete;
Collection& operator=(const Collection&) = delete;
// Makes a Collection using the given TagMap (which should be shared
// between collections).
// Refer to mediapipe::tool::CreateTagMap for examples of how to construct a
// collection from a vector of "TAG:<index>:name" strings, or from an integer
// number of indexes, etc.
explicit Collection(std::shared_ptr<tool::TagMap> tag_map);
// Makes a Collection using the information in the TagAndNameInfo.
ABSL_DEPRECATED("Use Collection(tool::TagMap)")
explicit Collection(const tool::TagAndNameInfo& info);
// Convenience constructor which initializes a collection to use
// indexes and have num_entries inputs.
ABSL_DEPRECATED("Use Collection(tool::TagMap)")
explicit Collection(int num_entries);
// Convenience constructor which initializes a collection to use tags
// with the given names.
// Note: initializer_list constructor should not be marked explicit.
ABSL_DEPRECATED("Use Collection(tool::TagMap)")
Collection(const std::initializer_list<std::string>& tag_names);
// Access the data at a given CollectionItemId. This is the most efficient
// way to access data within the collection.
//
// Do not assume that Index(2) == Get(collection.TagMap()->BeginId() + 2).
value_type& Get(CollectionItemId id);
const value_type& Get(CollectionItemId id) const;
// Convenience functions.
value_type& Get(absl::string_view tag, int index);
const value_type& Get(absl::string_view tag, int index) const;
// Equivalent to Get("", index);
value_type& Index(int index);
const value_type& Index(int index) const;
// Equivalent to Get(tag, 0);
value_type& Tag(absl::string_view tag);
const value_type& Tag(absl::string_view tag) const;
// These functions only exist for collections with storage ==
// kStorePointer. GetPtr returns the stored ptr value rather than
// the value_type. The non-const version returns a reference so that
// the pointer can be set.
value_type*& GetPtr(CollectionItemId id);
// Const version returns a pointer to a const value (a const-ref to
// a pointer wouldn't be useful in this context).
const value_type* GetPtr(CollectionItemId id) const;
// Returns true if the collection has a tag other than "".
// TODO Deprecate and remove this function.
bool UsesTags() const;
// Returns a description of the collection.
std::string DebugString() const;
// Return the tag_map.
const std::shared_ptr<tool::TagMap>& TagMap() const;
// Iteration functions for use of the collection in a range based
// for loop. The items are provided in sorted tag order with indexes
// sequential within tags.
iterator begin();
iterator end();
const_iterator begin() const;
const_iterator end() const;
// Returns the error handler object.
const ErrorHandler& GetErrorHandler() const { return error_handler_; }
// The remaining public functions directly call their equivalent
// in tool::TagMap. They are guaranteed to be equivalent for any
// Collection initialized using an equivalent tool::TagMap.
// Returns true if the provided tag is available (not necessarily set yet).
bool HasTag(const absl::string_view tag) const {
return tag_map_->HasTag(tag);
}
// Returns the number of entries in this collection.
int NumEntries() const { return tag_map_->NumEntries(); }
// Returns the number of entries with the provided tag.
int NumEntries(const absl::string_view tag) const {
return tag_map_->NumEntries(tag);
}
// Get the id for the tag and index. This id is guaranteed valid for
// any Collection which was initialized with an equivalent tool::TagMap.
// If the tag or index are invalid then an invalid CollectionItemId
// is returned (with id.IsValid() == false).
//
// The id for indexes within the same tag are guaranteed to
// be sequential. Meaning, if tag "BLAH" has 3 indexes, then
// ++GetId("BLAH", 1) == GetId("BLAH", 2)
// However, be careful in using this fact, as it circumvents the
// validity checks in GetId() (i.e. ++GetId("BLAH", 2) looks like it
// is valid, while GetId("BLAH", 3) is not valid).
CollectionItemId GetId(const absl::string_view tag, int index) const {
return tag_map_->GetId(tag, index);
}
// Returns the names of the tags in this collection.
std::set<std::string> GetTags() const { return tag_map_->GetTags(); }
// Get a tag and index for the specified id. If the id is not valid,
// then {"", -1} will be returned.
std::pair<std::string, int> TagAndIndexFromId(CollectionItemId id) const {
return tag_map_->TagAndIndexFromId(id);
}
// The CollectionItemId corresponding to the first element in the collection.
// Looping over all elements can be done as follows.
// for (CollectionItemId id = collection.BeginId();
// id < collection.EndId(); ++id) {
// }
// However, if only one collection is involved, prefer using a range
// based for loop.
// for (Packet packet : Inputs()) {
// }
CollectionItemId BeginId() const { return tag_map_->BeginId(); }
// The CollectionItemId corresponding to an element immediately after
// the last element of the collection.
CollectionItemId EndId() const { return tag_map_->EndId(); }
// Same as BeginId()/EndId() but for only one tag. If the tag doesn't
// exist then an invalid CollectionItemId is returned. It is guaranteed
// that a loop constructed in this way will successfully not be entered
// for invalid tags.
// for (CollectionItemId id = collection.BeginId(tag);
// id < collection.EndId(tag); ++id) {
// }
CollectionItemId BeginId(const absl::string_view tag) const {
return tag_map_->BeginId(tag);
}
CollectionItemId EndId(const absl::string_view tag) const {
return tag_map_->EndId(tag);
}
// Equal Collections contain equal mappings and equal elements.
bool operator==(const Collection<T>& other) const {
if (tag_map_->Mapping() != other.TagMap()->Mapping()) {
return false;
}
for (CollectionItemId id = BeginId(); id < EndId(); ++id) {
if (Get(id) != other.Get(id)) {
return false;
}
}
return true;
}
bool operator!=(const Collection<T>& other) const {
return !(*this == other);
}
private:
// An iterator which is identical to ItType** except that the
// dereference operator (operator*) does a double dereference and
// returns an ItType.
//
// This class is thread compatible.
template <typename ItType>
class DoubleDerefIterator {
public:
using iterator_category = std::random_access_iterator_tag;
using value_type = ItType;
using difference_type = std::ptrdiff_t;
using pointer = ItType*;
using reference = ItType&;
DoubleDerefIterator() : ptr_(nullptr) {}
reference operator*() { return **ptr_; }
pointer operator->() { return *ptr_; }
reference operator[](difference_type d) { return **(ptr_ + d); }
// Member operators.
DoubleDerefIterator& operator++() {
++ptr_;
return *this;
}
DoubleDerefIterator operator++(int) {
DoubleDerefIterator output(ptr_);
++ptr_;
return output;
}
DoubleDerefIterator& operator--() {
--ptr_;
return *this;
}
DoubleDerefIterator operator--(int) {
DoubleDerefIterator output(ptr_);
--ptr_;
return output;
}
DoubleDerefIterator& operator+=(difference_type d) {
ptr_ += d;
return *this;
}
DoubleDerefIterator& operator-=(difference_type d) {
ptr_ -= d;
return *this;
}
// Non-member binary operators.
friend bool operator==(DoubleDerefIterator lhs, DoubleDerefIterator rhs) {
return lhs.ptr_ == rhs.ptr_;
}
friend bool operator!=(DoubleDerefIterator lhs, DoubleDerefIterator rhs) {
return lhs.ptr_ != rhs.ptr_;
}
friend bool operator<(DoubleDerefIterator lhs, DoubleDerefIterator rhs) {
return lhs.ptr_ < rhs.ptr_;
}
friend bool operator<=(DoubleDerefIterator lhs, DoubleDerefIterator rhs) {
return lhs.ptr_ <= rhs.ptr_;
}
friend bool operator>(DoubleDerefIterator lhs, DoubleDerefIterator rhs) {
return lhs.ptr_ > rhs.ptr_;
}
friend bool operator>=(DoubleDerefIterator lhs, DoubleDerefIterator rhs) {
return lhs.ptr_ >= rhs.ptr_;
}
friend DoubleDerefIterator operator+(DoubleDerefIterator lhs,
difference_type d) {
return lhs.ptr_ + d;
}
friend DoubleDerefIterator operator+(difference_type d,
DoubleDerefIterator rhs) {
return rhs.ptr_ + d;
}
friend DoubleDerefIterator& operator-(DoubleDerefIterator lhs,
difference_type d) {
return lhs.ptr_ - d;
}
friend difference_type operator-(DoubleDerefIterator lhs,
DoubleDerefIterator rhs) {
return lhs.ptr_ - rhs.ptr_;
}
private:
explicit DoubleDerefIterator(ItType* const* data) : ptr_(data) {}
ItType* const* ptr_;
friend class Collection;
};
// TagMap for the collection.
std::shared_ptr<tool::TagMap> tag_map_;
// Indexed by Id. Use an array directly so that the type does not
// have to be copy constructable. The array has tag_map_->NumEntries()
// elements.
std::unique_ptr<stored_type[]> data_;
// A class which allows errors to be reported flexibly. The default
// instantiation performs a LOG(FATAL) and does not have any member
// variables (zero size).
ErrorHandler error_handler_;
};
// Definitions of templated functions for Collection.
template <typename T, CollectionStorage storage, typename ErrorHandler>
Collection<T, storage, ErrorHandler>::Collection(
std::shared_ptr<tool::TagMap> tag_map)
: tag_map_(std::move(tag_map)) {
if (tag_map_->NumEntries() != 0) {
data_ = absl::make_unique<stored_type[]>(tag_map_->NumEntries());
}
}
template <typename T, CollectionStorage storage, typename ErrorHandler>
Collection<T, storage, ErrorHandler>::Collection(
const tool::TagAndNameInfo& info)
: Collection(tool::TagMap::Create(info).value()) {}
template <typename T, CollectionStorage storage, typename ErrorHandler>
Collection<T, storage, ErrorHandler>::Collection(const int num_entries)
: Collection(tool::CreateTagMap(num_entries).value()) {}
template <typename T, CollectionStorage storage, typename ErrorHandler>
Collection<T, storage, ErrorHandler>::Collection(
const std::initializer_list<std::string>& tag_names)
: Collection(tool::CreateTagMapFromTags(tag_names).value()) {}
template <typename T, CollectionStorage storage, typename ErrorHandler>
bool Collection<T, storage, ErrorHandler>::UsesTags() const {
auto& mapping = tag_map_->Mapping();
if (mapping.size() > 1) {
// At least one tag is not "".
return true;
}
if (mapping.empty()) {
// The mapping is empty, it doesn't use tags.
return false;
}
// If the one tag present is non-empty then we are using tags.
return !mapping.begin()->first.empty();
}
template <typename T, CollectionStorage storage, typename ErrorHandler>
typename Collection<T, storage, ErrorHandler>::value_type&
Collection<T, storage, ErrorHandler>::Get(CollectionItemId id) {
CHECK_LE(BeginId(), id);
CHECK_LT(id, EndId());
return begin()[id.value()];
}
template <typename T, CollectionStorage storage, typename ErrorHandler>
const typename Collection<T, storage, ErrorHandler>::value_type&
Collection<T, storage, ErrorHandler>::Get(CollectionItemId id) const {
CHECK_LE(BeginId(), id);
CHECK_LT(id, EndId());
return begin()[id.value()];
}
template <typename T, CollectionStorage storage, typename ErrorHandler>
typename Collection<T, storage, ErrorHandler>::value_type*&
Collection<T, storage, ErrorHandler>::GetPtr(CollectionItemId id) {
static_assert(storage == CollectionStorage::kStorePointer,
"mediapipe::internal::Collection<T>::GetPtr() is only "
"available for collections that were defined with template "
"argument storage == CollectionStorage::kStorePointer.");
CHECK_LE(BeginId(), id);
CHECK_LT(id, EndId());
return data_[id.value()];
}
template <typename T, CollectionStorage storage, typename ErrorHandler>
const typename Collection<T, storage, ErrorHandler>::value_type*
Collection<T, storage, ErrorHandler>::GetPtr(CollectionItemId id) const {
static_assert(storage == CollectionStorage::kStorePointer,
"mediapipe::internal::Collection<T>::GetPtr() is only "
"available for collections that were defined with template "
"argument storage == CollectionStorage::kStorePointer.");
CHECK_LE(BeginId(), id);
CHECK_LT(id, EndId());
return data_[id.value()];
}
template <typename T, CollectionStorage storage, typename ErrorHandler>
typename Collection<T, storage, ErrorHandler>::value_type&
Collection<T, storage, ErrorHandler>::Get(const absl::string_view tag,
int index) {
CollectionItemId id = GetId(tag, index);
if (!id.IsValid()) {
return error_handler_.GetFallback(tag, index);
}
return begin()[id.value()];
}
template <typename T, CollectionStorage storage, typename ErrorHandler>
const typename Collection<T, storage, ErrorHandler>::value_type&
Collection<T, storage, ErrorHandler>::Get(const absl::string_view tag,
int index) const {
CollectionItemId id = GetId(tag, index);
if (!id.IsValid()) {
return error_handler_.GetFallback(tag, index);
}
return begin()[id.value()];
}
template <typename T, CollectionStorage storage, typename ErrorHandler>
typename Collection<T, storage, ErrorHandler>::value_type&
Collection<T, storage, ErrorHandler>::Index(int index) {
return Get("", index);
}
template <typename T, CollectionStorage storage, typename ErrorHandler>
const typename Collection<T, storage, ErrorHandler>::value_type&
Collection<T, storage, ErrorHandler>::Index(int index) const {
return Get("", index);
}
template <typename T, CollectionStorage storage, typename ErrorHandler>
typename Collection<T, storage, ErrorHandler>::value_type&
Collection<T, storage, ErrorHandler>::Tag(const absl::string_view tag) {
return Get(tag, 0);
}
template <typename T, CollectionStorage storage, typename ErrorHandler>
const typename Collection<T, storage, ErrorHandler>::value_type&
Collection<T, storage, ErrorHandler>::Tag(const absl::string_view tag) const {
return Get(tag, 0);
}
template <typename T, CollectionStorage storage, typename ErrorHandler>
std::string Collection<T, storage, ErrorHandler>::DebugString() const {
std::string output =
absl::StrCat("Collection of \"", MediaPipeTypeStringOrDemangled<T>(),
"\" with\n", tag_map_->DebugString());
return output;
}
template <typename T, CollectionStorage storage, typename ErrorHandler>
const std::shared_ptr<tool::TagMap>&
Collection<T, storage, ErrorHandler>::TagMap() const {
return tag_map_;
}
template <typename T, CollectionStorage storage, typename ErrorHandler>
typename Collection<T, storage, ErrorHandler>::iterator
Collection<T, storage, ErrorHandler>::begin() {
return iterator(data_.get());
}
template <typename T, CollectionStorage storage, typename ErrorHandler>
typename Collection<T, storage, ErrorHandler>::iterator
Collection<T, storage, ErrorHandler>::end() {
return iterator(data_.get() + tag_map_->NumEntries());
}
template <typename T, CollectionStorage storage, typename ErrorHandler>
typename Collection<T, storage, ErrorHandler>::const_iterator
Collection<T, storage, ErrorHandler>::begin() const {
return const_iterator(data_.get());
}
template <typename T, CollectionStorage storage, typename ErrorHandler>
typename Collection<T, storage, ErrorHandler>::const_iterator
Collection<T, storage, ErrorHandler>::end() const {
return const_iterator(data_.get() + tag_map_->NumEntries());
}
} // namespace internal
// Returns c.HasTag(tag) && !Tag(tag)->IsEmpty() (just for convenience).
// This version is used with Calculator.
template <class S>
bool HasTagValue(const internal::Collection<S*>& c,
const absl::string_view tag) {
return c.HasTag(tag) && !c.Tag(tag)->IsEmpty();
}
// Returns c.HasTag(tag) && !Tag(tag).IsEmpty() (just for convenience).
// This version is used with CalculatorBase.
template <class S>
bool HasTagValue(const internal::Collection<S>& c,
const absl::string_view tag) {
return c.HasTag(tag) && !c.Tag(tag).IsEmpty();
}
// Returns c.HasTag(tag) && !Tag(tag).IsEmpty() (just for convenience).
// This version is used with Calculator or CalculatorBase.
template <class C>
bool HasTagValue(const C& c, const absl::string_view tag) {
return HasTagValue(c->Inputs(), tag);
}
} // namespace mediapipe
#endif // MEDIAPIPE_FRAMEWORK_COLLECTION_H_
2 mediapipe 流水线
下面将根据Graph数据流走向深入源码部分 需要一定mediapie基础知识储备
mediapipe源码中大量使用boost absel库等apii及智能指针提升性能或者增强鲁棒性
2.1 GpuBufferToImageFrameCalculator
2.1.1 GPU
根据宏定义 MEDIAPIPE_GPU_BUFFER_USE_CV_PIXEL_BUFFER 决定调用 opecv cpu处理还是Gpu处理
GPU 渲染管线运行在opengl上下文GLContext openglThread中
absl::Status GpuBufferToImageFrameCalculator::Process(CalculatorContext* cc) {
if (cc->Inputs().Index(0).Value().ValidateAsType<ImageFrame>().ok()) {
cc->Outputs().Index(0).AddPacket(cc->Inputs().Index(0).Value());
return absl::OkStatus();
}
#ifdef HAVE_GPU_BUFFER
if (cc->Inputs().Index(0).Value().ValidateAsType<GpuBuffer>().ok()) {
const auto& input = cc->Inputs().Index(0).Get<GpuBuffer>();
#if MEDIAPIPE_GPU_BUFFER_USE_CV_PIXEL_BUFFER
std::unique_ptr<ImageFrame> frame =
CreateImageFrameForCVPixelBuffer(GetCVPixelBufferRef(input));
cc->Outputs().Index(0).Add(frame.release(), cc->InputTimestamp());
#else
helper_.RunInGlContext([this, &input, &cc]() {
auto src = helper_.CreateSourceTexture(input);
std::unique_ptr<ImageFrame> frame = absl::make_unique<ImageFrame>(
ImageFormatForGpuBufferFormat(input.format()), src.width(),
src.height(), ImageFrame::kGlDefaultAlignmentBoundary);
helper_.BindFramebuffer(src);
const auto info = GlTextureInfoForGpuBufferFormat(input.format(), 0,
helper_.GetGlVersion());
glReadPixels(0, 0, src.width(), src.height(), info.gl_format,
info.gl_type, frame->MutablePixelData());
glFlush();
cc->Outputs().Index(0).Add(frame.release(), cc->InputTimestamp());
src.Release();
});
#endif // MEDIAPIPE_GPU_BUFFER_USE_CV_PIXEL_BUFFER
return absl::OkStatus();
}
#endif // defined(HAVE_GPU_BUFFER)
return absl::Status(absl::StatusCode::kInvalidArgument,
"Input packets must be ImageFrame or GpuBuffer.");
}
2.1.2 CPU
…
2.2 FlowLimiterCalculator
节流图像流向下游流量控制。它穿过第一个传入的图像不变,并等待 tflitetensorstodetectioncalculator 下游图完成在它通过另一个之前生成相应的检测形象。所有在等待期间进入的图像都将被删除,从而限制了图像的数量在这个计算器和。之间的飞行图像的数目 tflitetensorstodetectioncalculator到1。这防止了中间的节点从传入的图像和数据排队过多,这导致增加延迟和内存使用,在实时移动应用程序中不需要。它还消除不必要的计算,例如,ImageTransformationCalculator可能会被拖到下游,如果后续的tfliteeconvertercalculator或tfliteinterencecalculator仍在忙处理之前的输入。
2.2.1 FlowLimiterCalculator
flow_limiter_calculator.cc
FlowLimiterCalculator类是MediaPipe框架中用于控制数据流的一个组件。它主要负责限制输入数据的流速,以避免模型过载或运算资源不足的情况。
在源代码中,FlowLimiterCalculator类通常会定义一个数据结构来存储和管理输入数据的流速限制信息。它包含以下内容:
数据缓冲区:用于存储输入数据,以便在下一帧图像可用之前进行运算。
计时器:用于计算输入数据到达的时间间隔,并根据设定的流速限制来决定是否将下一帧图像送入模型。
流速限制参数:这些参数可以设定输入数据的最大流速,例如每秒处理的帧数。
状态变量:用于记录当前处理的帧数和已处理的帧数,以便在达到流速限制时停止处理新的帧。
FlowLimiterCalculator类的核心功能如下:接收输入数据:每当有新的帧图像可用时,FlowLimiterCalculator会接收并存储在数据缓冲区中。
计算时间间隔:计时器会记录当前帧与上一帧之间的时间间隔。
判断是否达到流速限制:根据计时器记录的时间间隔和设定的流速限制参数,FlowLimiterCalculator会判断是否达到最大流速。如果达到流速限制,将停止处理新的帧,直到当前处理的帧数达到已处理的帧数为止。
处理帧:如果当前帧没有被丢弃(即未达到流速限制),FlowLimiterCalculator会将该帧送入模型进行运算处理。
更新状态变量:每次处理完一帧后,FlowLimiterCalculator会更新已处理的帧数状态变量,以便在达到流速限制时正确停止处理新的帧。
// Releases input packets allowed by the max_in_flight constraint.
absl::Status Process(CalculatorContext* cc) final {
options_ = tool::RetrieveOptions(options_, cc->Inputs());
// Process the FINISHED input stream.
Packet finished_packet = cc->Inputs().Tag(kFinishedTag).Value();
if (finished_packet.Timestamp() == cc->InputTimestamp()) {
while (!frames_in_flight_.empty() &&
frames_in_flight_.front() <= finished_packet.Timestamp()) {
frames_in_flight_.pop_front();
}
}
// Process the frame input streams.
for (int i = 0; i < cc->Inputs().NumEntries(""); ++i) {
Packet packet = cc->Inputs().Get("", i).Value();
if (!packet.IsEmpty()) {
input_queues_[i].push_back(packet);
}
}
// Abandon expired frames in flight. Note that old frames are abandoned
// when much newer frame timestamps arrive regardless of elapsed time.
TimestampDiff timeout = options_.in_flight_timeout();
Timestamp latest_ts = cc->Inputs().Get("", 0).Value().Timestamp();
if (timeout > 0 && latest_ts == cc->InputTimestamp() &&
latest_ts < Timestamp::Max()) {
while (!frames_in_flight_.empty() &&
(latest_ts - frames_in_flight_.front()) > timeout) {
frames_in_flight_.pop_front();
}
}
// Release allowed frames from the main input queue.
auto& input_queue = input_queues_[0];
while (ProcessingAllowed() && !input_queue.empty()) {
Packet packet = input_queue.front();
input_queue.pop_front();
cc->Outputs().Get("", 0).AddPacket(packet);
SendAllow(true, packet.Timestamp(), cc);
frames_in_flight_.push_back(packet.Timestamp());
}
// Limit the number of queued frames.
// Note that frames can be dropped after frames are released because
// frame-packets and FINISH-packets never arrive in the same Process call.
while (input_queue.size() > options_.max_in_queue()) {
Packet packet = input_queue.front();
input_queue.pop_front();
SendAllow(false, packet.Timestamp(), cc);
}
// Propagate the input timestamp bound.
if (!input_queue.empty()) {
Timestamp bound = input_queue.front().Timestamp();
SetNextTimestampBound(bound, &cc->Outputs().Get("", 0));
} else {
Timestamp bound =
cc->Inputs().Get("", 0).Value().Timestamp().NextAllowedInStream();
SetNextTimestampBound(bound, &cc->Outputs().Get("", 0));
if (cc->Outputs().HasTag(kAllowTag)) {
SetNextTimestampBound(bound, &cc->Outputs().Tag(kAllowTag));
}
}
ProcessAuxiliaryInputs(cc);
return absl::OkStatus();
}
code fragment
just flag tag
// Outputs a packet indicating whether a frame was sent or dropped.
void SendAllow(bool allow, Timestamp ts, CalculatorContext* cc) {
if (cc->Outputs().HasTag(kAllowTag)) {
cc->Outputs().Tag(kAllowTag).AddPacket(MakePacket<bool>(allow).At(ts));
}
}
调度器如何处理呢
3 输入变换
image_transformation_calculator.cc
ImageTransformationCalculator 类是一个用于图像处理的类,它主要负责应用各种图像变换。这些变换可以包括旋转、缩放、剪切、扭曲等。此类通常用于图像增强、图像恢复以及计算机视觉任务。
3.1 ImageTransformationCalculator
图像操作功能:ImageTransformationCalculator 类应该具有能够读取、写入和处理图像的功能。这可能包括对图像进行解码和编码,处理图像文件,以及在内存中操作图像数据。
变换计算功能:此类应该具有能够计算和应用各种图像变换的功能。这可能包括旋转、缩放、剪切、扭曲、平移等变换。这些变换的计算可能需要使用一些数学和计算机视觉库,如OpenCV。
可配置性:此类可能具有一些配置选项,允许用户指定变换的类型、参数以及其他选项。这使得用户可以根据自己的需求定制变换的计算和应用方式。
线程安全性:此类可能需要支持多线程操作。这可能涉及到在多个线程之间共享图像数据和变换状态,以及同步访问共享资源的问题。
错误处理和异常处理:此类可能需要具有一些错误处理和异常处理的机制,以处理例如无法读取图像文件、无法应用某些变换等情况。
性能优化:由于图像处理可能是一个计算密集型的任务,因此此类可能需要使用一些性能优化技术来提高计算效率,例如使用并行计算、缓存等技术。
3.2 process
absl::Status ImageTransformationCalculator::Process(CalculatorContext* cc) {
// First update the video header if it is given, based on the rotation and
// dimensions specified as side packets or options. This will only be done
// once, so streaming transformation changes will not be reflected in
// the header.
if (cc->Inputs().HasTag(kVideoPrestreamTag) &&
!cc->Inputs().Tag(kVideoPrestreamTag).IsEmpty() &&
cc->Outputs().HasTag(kVideoPrestreamTag)) {
mediapipe::VideoHeader header =
cc->Inputs().Tag(kVideoPrestreamTag).Get<mediapipe::VideoHeader>();
// Update the header's width and height if needed.
ComputeOutputDimensions(header.width, header.height, &header.width,
&header.height);
cc->Outputs()
.Tag(kVideoPrestreamTag)
.AddPacket(mediapipe::MakePacket<mediapipe::VideoHeader>(header).At(
mediapipe::Timestamp::PreStream()));
}
// Override values if specified so.
if (cc->Inputs().HasTag("ROTATION_DEGREES") &&
!cc->Inputs().Tag("ROTATION_DEGREES").IsEmpty()) {
rotation_ =
DegreesToRotationMode(cc->Inputs().Tag("ROTATION_DEGREES").Get<int>());
}
if (cc->Inputs().HasTag("FLIP_HORIZONTALLY") &&
!cc->Inputs().Tag("FLIP_HORIZONTALLY").IsEmpty()) {
flip_horizontally_ = cc->Inputs().Tag("FLIP_HORIZONTALLY").Get<bool>();
}
if (cc->Inputs().HasTag("FLIP_VERTICALLY") &&
!cc->Inputs().Tag("FLIP_VERTICALLY").IsEmpty()) {
flip_vertically_ = cc->Inputs().Tag("FLIP_VERTICALLY").Get<bool>();
}
if (cc->Inputs().HasTag("OUTPUT_DIMENSIONS")) {
if (cc->Inputs().Tag("OUTPUT_DIMENSIONS").IsEmpty()) {
return absl::OkStatus();
} else {
const auto& image_size =
cc->Inputs().Tag("OUTPUT_DIMENSIONS").Get<std::pair<int, int>>();
output_width_ = image_size.first;
output_height_ = image_size.second;
}
}
if (use_gpu_) {
#if !MEDIAPIPE_DISABLE_GPU
if (cc->Inputs().Tag(kGpuBufferTag).IsEmpty()) {
return absl::OkStatus();
}
return gpu_helper_.RunInGlContext(
[this, cc]() -> absl::Status { return RenderGpu(cc); });
#endif // !MEDIAPIPE_DISABLE_GPU
} else {
if (cc->Inputs().Tag(kImageFrameTag).IsEmpty()) {
return absl::OkStatus();
}
return RenderCpu(cc);
}
return absl::OkStatus();
}
这段代码定义了一个名为RunInGlContext的模板函数,它接受一个函数作为参数,并在一个lambda表达式中执行该函数,然后返回一个absl::OkStatus(),即使函数本身没有返回任何结果。
这段代码的主要目的是方便那些需要在OpenGL上下文中执行函数的情况,尤其是当这些函数没有返回结果(即返回类型为void)时。由于std::function<void(void)>不能正确处理返回void的函数,因此在这里使用了模板以避免歧义。
3.2.1 Cpu opencv process
cpu porocess opencv api
absl::Status ImageTransformationCalculator::RenderCpu(CalculatorContext* cc) {
cv::Mat input_mat;
mediapipe::ImageFormat::Format format;
const auto& input = cc->Inputs().Tag(kImageFrameTag).Get<ImageFrame>();
input_mat = formats::MatView(&input);
format = input.Format();
const int input_width = input_mat.cols;
const int input_height = input_mat.rows;
int output_width;
int output_height;
ComputeOutputDimensions(input_width, input_height, &output_width,
&output_height);
if (output_width_ > 0 && output_height_ > 0) {
cv::Mat scaled_mat;
if (scale_mode_ == mediapipe::ScaleMode_Mode_STRETCH) {
int scale_flag =
input_mat.cols > output_width_ && input_mat.rows > output_height_
? cv::INTER_AREA
: cv::INTER_LINEAR;
cv::resize(input_mat, scaled_mat, cv::Size(output_width_, output_height_),
0, 0, scale_flag);
} else {
const float scale =
std::min(static_cast<float>(output_width_) / input_width,
static_cast<float>(output_height_) / input_height);
const int target_width = std::round(input_width * scale);
const int target_height = std::round(input_height * scale);
int scale_flag = scale < 1.0f ? cv::INTER_AREA : cv::INTER_LINEAR;
if (scale_mode_ == mediapipe::ScaleMode_Mode_FIT) {
cv::Mat intermediate_mat;
cv::resize(input_mat, intermediate_mat,
cv::Size(target_width, target_height), 0, 0, scale_flag);
const int top = (output_height_ - target_height) / 2;
const int bottom = output_height_ - target_height - top;
const int left = (output_width_ - target_width) / 2;
const int right = output_width_ - target_width - left;
cv::copyMakeBorder(intermediate_mat, scaled_mat, top, bottom, left,
right,
options_.constant_padding() ? cv::BORDER_CONSTANT
: cv::BORDER_REPLICATE);
} else {
cv::resize(input_mat, scaled_mat, cv::Size(target_width, target_height),
0, 0, scale_flag);
output_width = target_width;
output_height = target_height;
}
}
input_mat = scaled_mat;
}
3.2.2 Gpu
该部分区分了android ios opengl 跨平台库,平台的opengl上下文已在开头 gpu_server内初始化完成,该部分构建render类 通过gpu处理二维数据 且以FBO方式 ,然后将GpuBuffer 设置到输出packet 送往下一节点
TEXTURE_EXTERNAL_OES
TEXTURE_EXTERNAL_OES 和普通纹理的主要区别在于它们的定义和用途。
普通纹理完全由 OpenGL ES 定义、分配和管理。它们是在 OpenGL ES 上下文中创建和使用的纹理。
TEXTURE_EXTERNAL_OES 是一种特殊类型的纹理,它在别处定义和分配,并以某种实现定义的方式导入 OpenGL ES。这种纹理主要用于导入 YUV 视频数据。系统中的一些外部实体定义了格式——它对应用程序不可见,颜色空间转换由驱动程序堆栈神奇地处理。具体支持哪些格式是实现定义的。这种纹理的主要优势是它们能够直接从 BufferQueue 数据进行渲染。例如,在 Android 平台上,BufferQueue 是连接图形数据生产方和消费方的队列,也就表示 OES 纹理能直接拿到某些生产方产生的图形数据进行渲染。
absl::Status ImageTransformationCalculator::RenderGpu(CalculatorContext* cc) {
#if !MEDIAPIPE_DISABLE_GPU
const auto& input = cc->Inputs().Tag(kGpuBufferTag).Get<GpuBuffer>();
const int input_width = input.width();
const int input_height = input.height();
int output_width;
int output_height;
ComputeOutputDimensions(input_width, input_height, &output_width,
&output_height);
if (scale_mode_ == mediapipe::ScaleMode_Mode_FILL_AND_CROP) {
const float scale =
std::min(static_cast<float>(output_width_) / input_width,
static_cast<float>(output_height_) / input_height);
output_width = std::round(input_width * scale);
output_height = std::round(input_height * scale);
}
if (cc->Outputs().HasTag("LETTERBOX_PADDING")) {
auto padding = absl::make_unique<std::array<float, 4>>();
ComputeOutputLetterboxPadding(input_width, input_height, output_width,
output_height, padding.get());
cc->Outputs()
.Tag("LETTERBOX_PADDING")
.Add(padding.release(), cc->InputTimestamp());
}
QuadRenderer* renderer = nullptr;
GlTexture src1;
#if defined(MEDIAPIPE_IOS)
if (input.format() == GpuBufferFormat::kBiPlanar420YpCbCr8VideoRange ||
input.format() == GpuBufferFormat::kBiPlanar420YpCbCr8FullRange) {
if (!yuv_renderer_) {
yuv_renderer_ = absl::make_unique<QuadRenderer>();
MP_RETURN_IF_ERROR(
yuv_renderer_->GlSetup(::mediapipe::kYUV2TexToRGBFragmentShader,
{"video_frame_y", "video_frame_uv"}));
}
renderer = yuv_renderer_.get();
src1 = gpu_helper_.CreateSourceTexture(input, 0);
} else // NOLINT(readability/braces)
#endif // iOS
{
src1 = gpu_helper_.CreateSourceTexture(input);
#if defined(TEXTURE_EXTERNAL_OES)
if (src1.target() == GL_TEXTURE_EXTERNAL_OES) {
if (!ext_rgb_renderer_) {
ext_rgb_renderer_ = absl::make_unique<QuadRenderer>();
MP_RETURN_IF_ERROR(ext_rgb_renderer_->GlSetup(
::mediapipe::kBasicTexturedFragmentShaderOES, {"video_frame"}));
}
renderer = ext_rgb_renderer_.get();
} else // NOLINT(readability/braces)
#endif // TEXTURE_EXTERNAL_OES
{
if (!rgb_renderer_) {
rgb_renderer_ = absl::make_unique<QuadRenderer>();
MP_RETURN_IF_ERROR(rgb_renderer_->GlSetup());
}
renderer = rgb_renderer_.get();
}
}
RET_CHECK(renderer) << "Unsupported input texture type";
mediapipe::FrameScaleMode scale_mode = mediapipe::FrameScaleModeFromProto(
scale_mode_, mediapipe::FrameScaleMode::kStretch);
mediapipe::FrameRotation rotation =
mediapipe::FrameRotationFromDegrees(RotationModeToDegrees(rotation_));
auto dst = gpu_helper_.CreateDestinationTexture(output_width, output_height,
input.format());
gpu_helper_.BindFramebuffer(dst);
glActiveTexture(GL_TEXTURE1);
glBindTexture(src1.target(), src1.name());
MP_RETURN_IF_ERROR(renderer->GlRender(
src1.width(), src1.height(), dst.width(), dst.height(), scale_mode,
rotation, flip_horizontally_, flip_vertically_,
/*flip_texture=*/false));
glActiveTexture(GL_TEXTURE1);
glBindTexture(src1.target(), 0);
// Execute GL commands, before getting result.
glFlush();
auto output = dst.template GetFrame<GpuBuffer>();
cc->Outputs().Tag(kGpuBufferTag).Add(output.release(), cc->InputTimestamp());
#endif // !MEDIAPIPE_DISABLE_GPU
return absl::OkStatus();
}