mediapipe流水线分析 一

2025/2/22

object detection Graph



一 流水线上游输入处理

1 Calculator

算子 它是在MediaPipe框架中用于创建插件/算子 机制的基础

在MediaPipe中,插件是一种可扩展的计算模块,可以用于实现各种不同的计算功能。calculator_base.h 文件定义了一个基类,所有插件都需要继承这个基类,并实现其中的函数或方法。通过使用这个基类,MediaPipe可以统一管理插件的接口和功能,使得在创建复杂的多媒体处理程序时更加灵活和可扩展。插件可以像拼积木一样组合和排列,以实现不同的功能和效果。

calculator_base.h 文件通常会包含一些基本的函数和属性,例如插件的初始化、更新、清理等操作,以及插件之间的通信和数据交换等。这个基类为插件的实现提供了一个统一的框架和规范,使得开发者可以根据自己的需求和创意来创建自定义的插件,并将其集成到MediaPipe的多媒体处理程序中。

它 计算图里的每个node都是calculator,是计算图的逻辑计算的载体,一个calculator可以接受0或多个stream或side packet, 输出0或多个stream或side packet. Calculator需要继承相同的基类并实现所需要的接口,并且要在framework中进行注册,以便可以通过配置文件进行构建。

1.1 CalculatorBase

calculator_base.h 头文件,它定义了MediaPipe框架中用于创建插件/算子 机制的基础类。

namespace mediapipe {
namespace internal {

// A class to handle errors that occur in Collection.  For most
// collections, these errors should be fatal.  However, for a collection
// more like PacketTypeSet, the errors should be deferred and handled
// later.
// This class is thread compatible.
template <typename T>
struct CollectionErrorHandlerFatal {
  // An error occurred during object lookup for the provided tag and
  // index.  The returned object reference will be provided instead.
  // Since there isn't any state and we're not returning anything, we
  // get away with only one version of this function (which is const
  // but returns a non-const reference).
  T& GetFallback(const absl::string_view tag, int index) const {
    LOG(FATAL) << "Failed to get tag \"" << tag << "\" index " << index;

enum class CollectionStorage { kStoreValue = 0, kStorePointer };

// A collection of objects of type T.
// If storage == kStorePointer then T* will be stored instead of T, but
// the accessor functions will still return T types.  The T objects must
// be owned elsewhere and remain alive as long as the collection is used.
// To set the pointers use the GetPtr() function.
// The ErrorHandler object allows errors to be deferred to a later time.
// This class is thread compatible as long as the ErrorHandler object is also
// thread compatible.
template <typename T,
          CollectionStorage storage = CollectionStorage::kStoreValue,
          typename ErrorHandler = CollectionErrorHandlerFatal<T>>
class Collection {
  template <typename ItType>
  class DoubleDerefIterator;

  using value_type = T;

  // The iterator is over value_type, requiring a double dereference if
  // storage == kStorePointer.
  using iterator =
      typename std::conditional<storage == CollectionStorage::kStorePointer,
  using const_iterator =
      typename std::conditional<storage == CollectionStorage::kStorePointer,
                                DoubleDerefIterator<const value_type>,
                                const value_type*>::type;
  using difference_type = ptrdiff_t;
  using size_type = size_t;
  using pointer = value_type*;
  using reference = value_type&;

  // The type that is stored by data_;
  using stored_type =
      typename std::conditional<storage == CollectionStorage::kStorePointer,
                                value_type*, value_type>::type;

  // Collection must be initialized on construction.
  Collection() = delete;
  Collection(const Collection&) = delete;
  Collection& operator=(const Collection&) = delete;
  // Makes a Collection using the given TagMap (which should be shared
  // between collections).
  // Refer to mediapipe::tool::CreateTagMap for examples of how to construct a
  // collection from a vector of "TAG:<index>:name" strings, or from an integer
  // number of indexes, etc.
  explicit Collection(std::shared_ptr<tool::TagMap> tag_map);
  // Makes a Collection using the information in the TagAndNameInfo.
  ABSL_DEPRECATED("Use Collection(tool::TagMap)")
  explicit Collection(const tool::TagAndNameInfo& info);
  // Convenience constructor which initializes a collection to use
  // indexes and have num_entries inputs.
  ABSL_DEPRECATED("Use Collection(tool::TagMap)")
  explicit Collection(int num_entries);
  // Convenience constructor which initializes a collection to use tags
  // with the given names.
  // Note: initializer_list constructor should not be marked explicit.
  ABSL_DEPRECATED("Use Collection(tool::TagMap)")
  Collection(const std::initializer_list<std::string>& tag_names);

  // Access the data at a given CollectionItemId.  This is the most efficient
  // way to access data within the collection.
  // Do not assume that Index(2) == Get(collection.TagMap()->BeginId() + 2).
  value_type& Get(CollectionItemId id);
  const value_type& Get(CollectionItemId id) const;

  // Convenience functions.
  value_type& Get(absl::string_view tag, int index);
  const value_type& Get(absl::string_view tag, int index) const;

  // Equivalent to Get("", index);
  value_type& Index(int index);
  const value_type& Index(int index) const;

  // Equivalent to Get(tag, 0);
  value_type& Tag(absl::string_view tag);
  const value_type& Tag(absl::string_view tag) const;

  // These functions only exist for collections with storage ==
  // kStorePointer.  GetPtr returns the stored ptr value rather than
  // the value_type.  The non-const version returns a reference so that
  // the pointer can be set.
  value_type*& GetPtr(CollectionItemId id);
  // Const version returns a pointer to a const value (a const-ref to
  // a pointer wouldn't be useful in this context).
  const value_type* GetPtr(CollectionItemId id) const;

  // Returns true if the collection has a tag other than "".
  // TODO Deprecate and remove this function.
  bool UsesTags() const;

  // Returns a description of the collection.
  std::string DebugString() const;

  // Return the tag_map.
  const std::shared_ptr<tool::TagMap>& TagMap() const;

  // Iteration functions for use of the collection in a range based
  // for loop.  The items are provided in sorted tag order with indexes
  // sequential within tags.
  iterator begin();
  iterator end();
  const_iterator begin() const;
  const_iterator end() const;

  // Returns the error handler object.
  const ErrorHandler& GetErrorHandler() const { return error_handler_; }

  // The remaining public functions directly call their equivalent
  // in tool::TagMap.  They are guaranteed to be equivalent for any
  // Collection initialized using an equivalent tool::TagMap.

  // Returns true if the provided tag is available (not necessarily set yet).
  bool HasTag(const absl::string_view tag) const {
    return tag_map_->HasTag(tag);

  // Returns the number of entries in this collection.
  int NumEntries() const { return tag_map_->NumEntries(); }

  // Returns the number of entries with the provided tag.
  int NumEntries(const absl::string_view tag) const {
    return tag_map_->NumEntries(tag);

  // Get the id for the tag and index.  This id is guaranteed valid for
  // any Collection which was initialized with an equivalent tool::TagMap.
  // If the tag or index are invalid then an invalid CollectionItemId
  // is returned (with id.IsValid() == false).
  // The id for indexes within the same tag are guaranteed to
  // be sequential.  Meaning, if tag "BLAH" has 3 indexes, then
  // ++GetId("BLAH", 1) == GetId("BLAH", 2)
  // However, be careful in using this fact, as it circumvents the
  // validity checks in GetId() (i.e. ++GetId("BLAH", 2) looks like it
  // is valid, while GetId("BLAH", 3) is not valid).
  CollectionItemId GetId(const absl::string_view tag, int index) const {
    return tag_map_->GetId(tag, index);

  // Returns the names of the tags in this collection.
  std::set<std::string> GetTags() const { return tag_map_->GetTags(); }

  // Get a tag and index for the specified id.  If the id is not valid,
  // then {"", -1} will be returned.
  std::pair<std::string, int> TagAndIndexFromId(CollectionItemId id) const {
    return tag_map_->TagAndIndexFromId(id);

  // The CollectionItemId corresponding to the first element in the collection.
  // Looping over all elements can be done as follows.
  //   for (CollectionItemId id = collection.BeginId();
  //        id < collection.EndId(); ++id) {
  //   }
  // However, if only one collection is involved, prefer using a range
  // based for loop.
  //   for (Packet packet : Inputs()) {
  //   }
  CollectionItemId BeginId() const { return tag_map_->BeginId(); }
  // The CollectionItemId corresponding to an element immediately after
  // the last element of the collection.
  CollectionItemId EndId() const { return tag_map_->EndId(); }

  // Same as BeginId()/EndId() but for only one tag.  If the tag doesn't
  // exist then an invalid CollectionItemId is returned.  It is guaranteed
  // that a loop constructed in this way will successfully not be entered
  // for invalid tags.
  //   for (CollectionItemId id = collection.BeginId(tag);
  //        id < collection.EndId(tag); ++id) {
  //   }
  CollectionItemId BeginId(const absl::string_view tag) const {
    return tag_map_->BeginId(tag);
  CollectionItemId EndId(const absl::string_view tag) const {
    return tag_map_->EndId(tag);

  // Equal Collections contain equal mappings and equal elements.
  bool operator==(const Collection<T>& other) const {
    if (tag_map_->Mapping() != other.TagMap()->Mapping()) {
      return false;
    for (CollectionItemId id = BeginId(); id < EndId(); ++id) {
      if (Get(id) != other.Get(id)) {
        return false;
    return true;
  bool operator!=(const Collection<T>& other) const {
    return !(*this == other);

  // An iterator which is identical to ItType** except that the
  // dereference operator (operator*) does a double dereference and
  // returns an ItType.
  // This class is thread compatible.
  template <typename ItType>
  class DoubleDerefIterator {
    using iterator_category = std::random_access_iterator_tag;
    using value_type = ItType;
    using difference_type = std::ptrdiff_t;
    using pointer = ItType*;
    using reference = ItType&;

    DoubleDerefIterator() : ptr_(nullptr) {}

    reference operator*() { return **ptr_; }

    pointer operator->() { return *ptr_; }

    reference operator[](difference_type d) { return **(ptr_ + d); }

    // Member operators.
    DoubleDerefIterator& operator++() {
      return *this;
    DoubleDerefIterator operator++(int) {
      DoubleDerefIterator output(ptr_);
      return output;
    DoubleDerefIterator& operator--() {
      return *this;
    DoubleDerefIterator operator--(int) {
      DoubleDerefIterator output(ptr_);
      return output;
    DoubleDerefIterator& operator+=(difference_type d) {
      ptr_ += d;
      return *this;
    DoubleDerefIterator& operator-=(difference_type d) {
      ptr_ -= d;
      return *this;

    // Non-member binary operators.
    friend bool operator==(DoubleDerefIterator lhs, DoubleDerefIterator rhs) {
      return lhs.ptr_ == rhs.ptr_;
    friend bool operator!=(DoubleDerefIterator lhs, DoubleDerefIterator rhs) {
      return lhs.ptr_ != rhs.ptr_;
    friend bool operator<(DoubleDerefIterator lhs, DoubleDerefIterator rhs) {
      return lhs.ptr_ < rhs.ptr_;
    friend bool operator<=(DoubleDerefIterator lhs, DoubleDerefIterator rhs) {
      return lhs.ptr_ <= rhs.ptr_;
    friend bool operator>(DoubleDerefIterator lhs, DoubleDerefIterator rhs) {
      return lhs.ptr_ > rhs.ptr_;
    friend bool operator>=(DoubleDerefIterator lhs, DoubleDerefIterator rhs) {
      return lhs.ptr_ >= rhs.ptr_;

    friend DoubleDerefIterator operator+(DoubleDerefIterator lhs,
                                         difference_type d) {
      return lhs.ptr_ + d;
    friend DoubleDerefIterator operator+(difference_type d,
                                         DoubleDerefIterator rhs) {
      return rhs.ptr_ + d;
    friend DoubleDerefIterator& operator-(DoubleDerefIterator lhs,
                                          difference_type d) {
      return lhs.ptr_ - d;
    friend difference_type operator-(DoubleDerefIterator lhs,
                                     DoubleDerefIterator rhs) {
      return lhs.ptr_ - rhs.ptr_;

    explicit DoubleDerefIterator(ItType* const* data) : ptr_(data) {}

    ItType* const* ptr_;

    friend class Collection;

  // TagMap for the collection.
  std::shared_ptr<tool::TagMap> tag_map_;

  // Indexed by Id.  Use an array directly so that the type does not
  // have to be copy constructable.  The array has tag_map_->NumEntries()
  // elements.
  std::unique_ptr<stored_type[]> data_;

  // A class which allows errors to be reported flexibly.  The default
  // instantiation performs a LOG(FATAL) and does not have any member
  // variables (zero size).
  ErrorHandler error_handler_;

// Definitions of templated functions for Collection.

template <typename T, CollectionStorage storage, typename ErrorHandler>
Collection<T, storage, ErrorHandler>::Collection(
    std::shared_ptr<tool::TagMap> tag_map)
    : tag_map_(std::move(tag_map)) {
  if (tag_map_->NumEntries() != 0) {
    data_ = absl::make_unique<stored_type[]>(tag_map_->NumEntries());

template <typename T, CollectionStorage storage, typename ErrorHandler>
Collection<T, storage, ErrorHandler>::Collection(
    const tool::TagAndNameInfo& info)
    : Collection(tool::TagMap::Create(info).value()) {}

template <typename T, CollectionStorage storage, typename ErrorHandler>
Collection<T, storage, ErrorHandler>::Collection(const int num_entries)
    : Collection(tool::CreateTagMap(num_entries).value()) {}

template <typename T, CollectionStorage storage, typename ErrorHandler>
Collection<T, storage, ErrorHandler>::Collection(
    const std::initializer_list<std::string>& tag_names)
    : Collection(tool::CreateTagMapFromTags(tag_names).value()) {}

template <typename T, CollectionStorage storage, typename ErrorHandler>
bool Collection<T, storage, ErrorHandler>::UsesTags() const {
  auto& mapping = tag_map_->Mapping();
  if (mapping.size() > 1) {
    // At least one tag is not "".
    return true;
  if (mapping.empty()) {
    // The mapping is empty, it doesn't use tags.
    return false;
  // If the one tag present is non-empty then we are using tags.
  return !mapping.begin()->first.empty();

template <typename T, CollectionStorage storage, typename ErrorHandler>
typename Collection<T, storage, ErrorHandler>::value_type&
Collection<T, storage, ErrorHandler>::Get(CollectionItemId id) {
  CHECK_LE(BeginId(), id);
  CHECK_LT(id, EndId());
  return begin()[id.value()];

template <typename T, CollectionStorage storage, typename ErrorHandler>
const typename Collection<T, storage, ErrorHandler>::value_type&
Collection<T, storage, ErrorHandler>::Get(CollectionItemId id) const {
  CHECK_LE(BeginId(), id);
  CHECK_LT(id, EndId());
  return begin()[id.value()];

template <typename T, CollectionStorage storage, typename ErrorHandler>
typename Collection<T, storage, ErrorHandler>::value_type*&
Collection<T, storage, ErrorHandler>::GetPtr(CollectionItemId id) {
  static_assert(storage == CollectionStorage::kStorePointer,
                "mediapipe::internal::Collection<T>::GetPtr() is only "
                "available for collections that were defined with template "
                "argument storage == CollectionStorage::kStorePointer.");
  CHECK_LE(BeginId(), id);
  CHECK_LT(id, EndId());
  return data_[id.value()];

template <typename T, CollectionStorage storage, typename ErrorHandler>
const typename Collection<T, storage, ErrorHandler>::value_type*
Collection<T, storage, ErrorHandler>::GetPtr(CollectionItemId id) const {
  static_assert(storage == CollectionStorage::kStorePointer,
                "mediapipe::internal::Collection<T>::GetPtr() is only "
                "available for collections that were defined with template "
                "argument storage == CollectionStorage::kStorePointer.");
  CHECK_LE(BeginId(), id);
  CHECK_LT(id, EndId());
  return data_[id.value()];

template <typename T, CollectionStorage storage, typename ErrorHandler>
typename Collection<T, storage, ErrorHandler>::value_type&
Collection<T, storage, ErrorHandler>::Get(const absl::string_view tag,
                                          int index) {
  CollectionItemId id = GetId(tag, index);
  if (!id.IsValid()) {
    return error_handler_.GetFallback(tag, index);
  return begin()[id.value()];

template <typename T, CollectionStorage storage, typename ErrorHandler>
const typename Collection<T, storage, ErrorHandler>::value_type&
Collection<T, storage, ErrorHandler>::Get(const absl::string_view tag,
                                          int index) const {
  CollectionItemId id = GetId(tag, index);
  if (!id.IsValid()) {
    return error_handler_.GetFallback(tag, index);
  return begin()[id.value()];

template <typename T, CollectionStorage storage, typename ErrorHandler>
typename Collection<T, storage, ErrorHandler>::value_type&
Collection<T, storage, ErrorHandler>::Index(int index) {
  return Get("", index);

template <typename T, CollectionStorage storage, typename ErrorHandler>
const typename Collection<T, storage, ErrorHandler>::value_type&
Collection<T, storage, ErrorHandler>::Index(int index) const {
  return Get("", index);

template <typename T, CollectionStorage storage, typename ErrorHandler>
typename Collection<T, storage, ErrorHandler>::value_type&
Collection<T, storage, ErrorHandler>::Tag(const absl::string_view tag) {
  return Get(tag, 0);

template <typename T, CollectionStorage storage, typename ErrorHandler>
const typename Collection<T, storage, ErrorHandler>::value_type&
Collection<T, storage, ErrorHandler>::Tag(const absl::string_view tag) const {
  return Get(tag, 0);

template <typename T, CollectionStorage storage, typename ErrorHandler>
std::string Collection<T, storage, ErrorHandler>::DebugString() const {
  std::string output =
      absl::StrCat("Collection of \"", MediaPipeTypeStringOrDemangled<T>(),
                   "\" with\n", tag_map_->DebugString());
  return output;

template <typename T, CollectionStorage storage, typename ErrorHandler>
const std::shared_ptr<tool::TagMap>&
Collection<T, storage, ErrorHandler>::TagMap() const {
  return tag_map_;

template <typename T, CollectionStorage storage, typename ErrorHandler>
typename Collection<T, storage, ErrorHandler>::iterator
Collection<T, storage, ErrorHandler>::begin() {
  return iterator(data_.get());

template <typename T, CollectionStorage storage, typename ErrorHandler>
typename Collection<T, storage, ErrorHandler>::iterator
Collection<T, storage, ErrorHandler>::end() {
  return iterator(data_.get() + tag_map_->NumEntries());

template <typename T, CollectionStorage storage, typename ErrorHandler>
typename Collection<T, storage, ErrorHandler>::const_iterator
Collection<T, storage, ErrorHandler>::begin() const {
  return const_iterator(data_.get());

template <typename T, CollectionStorage storage, typename ErrorHandler>
typename Collection<T, storage, ErrorHandler>::const_iterator
Collection<T, storage, ErrorHandler>::end() const {
  return const_iterator(data_.get() + tag_map_->NumEntries());

}  // namespace internal

// Returns c.HasTag(tag) && !Tag(tag)->IsEmpty() (just for convenience).
// This version is used with Calculator.
template <class S>
bool HasTagValue(const internal::Collection<S*>& c,
                 const absl::string_view tag) {
  return c.HasTag(tag) && !c.Tag(tag)->IsEmpty();

// Returns c.HasTag(tag) && !Tag(tag).IsEmpty() (just for convenience).
// This version is used with CalculatorBase.
template <class S>
bool HasTagValue(const internal::Collection<S>& c,
                 const absl::string_view tag) {
  return c.HasTag(tag) && !c.Tag(tag).IsEmpty();

// Returns c.HasTag(tag) && !Tag(tag).IsEmpty() (just for convenience).
// This version is used with Calculator or CalculatorBase.
template <class C>
bool HasTagValue(const C& c, const absl::string_view tag) {
  return HasTagValue(c->Inputs(), tag);

}  // namespace mediapipe


2 mediapipe 流水线

下面将根据Graph数据流走向深入源码部分 需要一定mediapie基础知识储备

mediapipe源码中大量使用boost absel库等apii及智能指针提升性能或者增强鲁棒性

2.1 GpuBufferToImageFrameCalculator
2.1.1 GPU

根据宏定义 MEDIAPIPE_GPU_BUFFER_USE_CV_PIXEL_BUFFER 决定调用 opecv cpu处理还是Gpu处理
GPU 渲染管线运行在opengl上下文GLContext openglThread中

absl::Status GpuBufferToImageFrameCalculator::Process(CalculatorContext* cc) {
  if (cc->Inputs().Index(0).Value().ValidateAsType<ImageFrame>().ok()) {
    return absl::OkStatus();

  if (cc->Inputs().Index(0).Value().ValidateAsType<GpuBuffer>().ok()) {
    const auto& input = cc->Inputs().Index(0).Get<GpuBuffer>();
    std::unique_ptr<ImageFrame> frame =
    cc->Outputs().Index(0).Add(frame.release(), cc->InputTimestamp());
    helper_.RunInGlContext([this, &input, &cc]() {
      auto src = helper_.CreateSourceTexture(input);
      std::unique_ptr<ImageFrame> frame = absl::make_unique<ImageFrame>(
          ImageFormatForGpuBufferFormat(input.format()), src.width(),
          src.height(), ImageFrame::kGlDefaultAlignmentBoundary);
      const auto info = GlTextureInfoForGpuBufferFormat(input.format(), 0,
      glReadPixels(0, 0, src.width(), src.height(), info.gl_format,
                   info.gl_type, frame->MutablePixelData());
      cc->Outputs().Index(0).Add(frame.release(), cc->InputTimestamp());
    return absl::OkStatus();
#endif  // defined(HAVE_GPU_BUFFER)

  return absl::Status(absl::StatusCode::kInvalidArgument,
                      "Input packets must be ImageFrame or GpuBuffer.");

2.1.2 CPU

2.2 FlowLimiterCalculator

节流图像流向下游流量控制。它穿过第一个传入的图像不变,并等待 tflitetensorstodetectioncalculator 下游图完成在它通过另一个之前生成相应的检测形象。所有在等待期间进入的图像都将被删除,从而限制了图像的数量在这个计算器和。之间的飞行图像的数目 tflitetensorstodetectioncalculator到1。这防止了中间的节点从传入的图像和数据排队过多,这导致增加延迟和内存使用,在实时移动应用程序中不需要。它还消除不必要的计算,例如,ImageTransformationCalculator可能会被拖到下游,如果后续的tfliteeconvertercalculatortfliteinterencecalculator仍在忙处理之前的输入。

2.2.1 FlowLimiterCalculator




// Releases input packets allowed by the max_in_flight constraint.
  absl::Status Process(CalculatorContext* cc) final {
    options_ = tool::RetrieveOptions(options_, cc->Inputs());

    // Process the FINISHED input stream.
    Packet finished_packet = cc->Inputs().Tag(kFinishedTag).Value();
    if (finished_packet.Timestamp() == cc->InputTimestamp()) {
      while (!frames_in_flight_.empty() &&
             frames_in_flight_.front() <= finished_packet.Timestamp()) {

    // Process the frame input streams.
    for (int i = 0; i < cc->Inputs().NumEntries(""); ++i) {
      Packet packet = cc->Inputs().Get("", i).Value();
      if (!packet.IsEmpty()) {

    // Abandon expired frames in flight.  Note that old frames are abandoned
    // when much newer frame timestamps arrive regardless of elapsed time.
    TimestampDiff timeout = options_.in_flight_timeout();
    Timestamp latest_ts = cc->Inputs().Get("", 0).Value().Timestamp();
    if (timeout > 0 && latest_ts == cc->InputTimestamp() &&
        latest_ts < Timestamp::Max()) {
      while (!frames_in_flight_.empty() &&
             (latest_ts - frames_in_flight_.front()) > timeout) {

    // Release allowed frames from the main input queue.
    auto& input_queue = input_queues_[0];
    while (ProcessingAllowed() && !input_queue.empty()) {
      Packet packet = input_queue.front();
      cc->Outputs().Get("", 0).AddPacket(packet);
      SendAllow(true, packet.Timestamp(), cc);

    // Limit the number of queued frames.
    // Note that frames can be dropped after frames are released because
    // frame-packets and FINISH-packets never arrive in the same Process call.
    while (input_queue.size() > options_.max_in_queue()) {
      Packet packet = input_queue.front();
      SendAllow(false, packet.Timestamp(), cc);

    // Propagate the input timestamp bound.
    if (!input_queue.empty()) {
      Timestamp bound = input_queue.front().Timestamp();
      SetNextTimestampBound(bound, &cc->Outputs().Get("", 0));
    } else {
      Timestamp bound =
          cc->Inputs().Get("", 0).Value().Timestamp().NextAllowedInStream();
      SetNextTimestampBound(bound, &cc->Outputs().Get("", 0));
      if (cc->Outputs().HasTag(kAllowTag)) {
        SetNextTimestampBound(bound, &cc->Outputs().Tag(kAllowTag));

    return absl::OkStatus();

code fragment

just flag tag

  // Outputs a packet indicating whether a frame was sent or dropped.
  void SendAllow(bool allow, Timestamp ts, CalculatorContext* cc) {
    if (cc->Outputs().HasTag(kAllowTag)) {


3 输入变换

ImageTransformationCalculator 类是一个用于图像处理的类,它主要负责应用各种图像变换。这些变换可以包括旋转、缩放、剪切、扭曲等。此类通常用于图像增强、图像恢复以及计算机视觉任务。

3.1 ImageTransformationCalculator

图像操作功能:ImageTransformationCalculator 类应该具有能够读取、写入和处理图像的功能。这可能包括对图像进行解码和编码,处理图像文件,以及在内存中操作图像数据。

3.2 process
absl::Status ImageTransformationCalculator::Process(CalculatorContext* cc) {
  // First update the video header if it is given, based on the rotation and
  // dimensions specified as side packets or options. This will only be done
  // once, so streaming transformation changes will not be reflected in
  // the header.
  if (cc->Inputs().HasTag(kVideoPrestreamTag) &&
      !cc->Inputs().Tag(kVideoPrestreamTag).IsEmpty() &&
      cc->Outputs().HasTag(kVideoPrestreamTag)) {
    mediapipe::VideoHeader header =
    // Update the header's width and height if needed.
    ComputeOutputDimensions(header.width, header.height, &header.width,

  // Override values if specified so.
  if (cc->Inputs().HasTag("ROTATION_DEGREES") &&
      !cc->Inputs().Tag("ROTATION_DEGREES").IsEmpty()) {
    rotation_ =
  if (cc->Inputs().HasTag("FLIP_HORIZONTALLY") &&
      !cc->Inputs().Tag("FLIP_HORIZONTALLY").IsEmpty()) {
    flip_horizontally_ = cc->Inputs().Tag("FLIP_HORIZONTALLY").Get<bool>();
  if (cc->Inputs().HasTag("FLIP_VERTICALLY") &&
      !cc->Inputs().Tag("FLIP_VERTICALLY").IsEmpty()) {
    flip_vertically_ = cc->Inputs().Tag("FLIP_VERTICALLY").Get<bool>();
  if (cc->Inputs().HasTag("OUTPUT_DIMENSIONS")) {
    if (cc->Inputs().Tag("OUTPUT_DIMENSIONS").IsEmpty()) {
      return absl::OkStatus();
    } else {
      const auto& image_size =
          cc->Inputs().Tag("OUTPUT_DIMENSIONS").Get<std::pair<int, int>>();
      output_width_ = image_size.first;
      output_height_ = image_size.second;

  if (use_gpu_) {
    if (cc->Inputs().Tag(kGpuBufferTag).IsEmpty()) {
      return absl::OkStatus();
    return gpu_helper_.RunInGlContext(
        [this, cc]() -> absl::Status { return RenderGpu(cc); });
  } else {
    if (cc->Inputs().Tag(kImageFrameTag).IsEmpty()) {
      return absl::OkStatus();
    return RenderCpu(cc);
  return absl::OkStatus();



3.2.1 Cpu opencv process

cpu porocess opencv api

absl::Status ImageTransformationCalculator::RenderCpu(CalculatorContext* cc) {
  cv::Mat input_mat;
  mediapipe::ImageFormat::Format format;

  const auto& input = cc->Inputs().Tag(kImageFrameTag).Get<ImageFrame>();
  input_mat = formats::MatView(&input);
  format = input.Format();

  const int input_width = input_mat.cols;
  const int input_height = input_mat.rows;
  int output_width;
  int output_height;
  ComputeOutputDimensions(input_width, input_height, &output_width,

  if (output_width_ > 0 && output_height_ > 0) {
    cv::Mat scaled_mat;
    if (scale_mode_ == mediapipe::ScaleMode_Mode_STRETCH) {
      int scale_flag =
          input_mat.cols > output_width_ && input_mat.rows > output_height_
              ? cv::INTER_AREA
              : cv::INTER_LINEAR;
      cv::resize(input_mat, scaled_mat, cv::Size(output_width_, output_height_),
                 0, 0, scale_flag);
    } else {
      const float scale =
          std::min(static_cast<float>(output_width_) / input_width,
                   static_cast<float>(output_height_) / input_height);
      const int target_width = std::round(input_width * scale);
      const int target_height = std::round(input_height * scale);
      int scale_flag = scale < 1.0f ? cv::INTER_AREA : cv::INTER_LINEAR;
      if (scale_mode_ == mediapipe::ScaleMode_Mode_FIT) {
        cv::Mat intermediate_mat;
        cv::resize(input_mat, intermediate_mat,
                   cv::Size(target_width, target_height), 0, 0, scale_flag);
        const int top = (output_height_ - target_height) / 2;
        const int bottom = output_height_ - target_height - top;
        const int left = (output_width_ - target_width) / 2;
        const int right = output_width_ - target_width - left;
        cv::copyMakeBorder(intermediate_mat, scaled_mat, top, bottom, left,
                           options_.constant_padding() ? cv::BORDER_CONSTANT
                                                       : cv::BORDER_REPLICATE);
      } else {
        cv::resize(input_mat, scaled_mat, cv::Size(target_width, target_height),
                   0, 0, scale_flag);
        output_width = target_width;
        output_height = target_height;
    input_mat = scaled_mat;
3.2.2 Gpu

该部分区分了android ios opengl 跨平台库,平台的opengl上下文已在开头 gpu_server内初始化完成,该部分构建render类 通过gpu处理二维数据 且以FBO方式 ,然后将GpuBuffer 设置到输出packet 送往下一节点


TEXTURE_EXTERNAL_OES 和普通纹理的主要区别在于它们的定义和用途。

普通纹理完全由 OpenGL ES 定义、分配和管理。它们是在 OpenGL ES 上下文中创建和使用的纹理。
TEXTURE_EXTERNAL_OES 是一种特殊类型的纹理,它在别处定义和分配,并以某种实现定义的方式导入 OpenGL ES。这种纹理主要用于导入 YUV 视频数据。系统中的一些外部实体定义了格式——它对应用程序不可见,颜色空间转换由驱动程序堆栈神奇地处理。具体支持哪些格式是实现定义的。这种纹理的主要优势是它们能够直接从 BufferQueue 数据进行渲染。例如,在 Android 平台上,BufferQueue 是连接图形数据生产方和消费方的队列,也就表示 OES 纹理能直接拿到某些生产方产生的图形数据进行渲染。

absl::Status ImageTransformationCalculator::RenderGpu(CalculatorContext* cc) {
  const auto& input = cc->Inputs().Tag(kGpuBufferTag).Get<GpuBuffer>();
  const int input_width = input.width();
  const int input_height = input.height();

  int output_width;
  int output_height;
  ComputeOutputDimensions(input_width, input_height, &output_width,

  if (scale_mode_ == mediapipe::ScaleMode_Mode_FILL_AND_CROP) {
    const float scale =
        std::min(static_cast<float>(output_width_) / input_width,
                 static_cast<float>(output_height_) / input_height);
    output_width = std::round(input_width * scale);
    output_height = std::round(input_height * scale);

  if (cc->Outputs().HasTag("LETTERBOX_PADDING")) {
    auto padding = absl::make_unique<std::array<float, 4>>();
    ComputeOutputLetterboxPadding(input_width, input_height, output_width,
                                  output_height, padding.get());
        .Add(padding.release(), cc->InputTimestamp());

  QuadRenderer* renderer = nullptr;
  GlTexture src1;

#if defined(MEDIAPIPE_IOS)
  if (input.format() == GpuBufferFormat::kBiPlanar420YpCbCr8VideoRange ||
      input.format() == GpuBufferFormat::kBiPlanar420YpCbCr8FullRange) {
    if (!yuv_renderer_) {
      yuv_renderer_ = absl::make_unique<QuadRenderer>();
                                 {"video_frame_y", "video_frame_uv"}));
    renderer = yuv_renderer_.get();
    src1 = gpu_helper_.CreateSourceTexture(input, 0);
  } else  // NOLINT(readability/braces)
#endif    // iOS
    src1 = gpu_helper_.CreateSourceTexture(input);
      if (!ext_rgb_renderer_) {
        ext_rgb_renderer_ = absl::make_unique<QuadRenderer>();
            ::mediapipe::kBasicTexturedFragmentShaderOES, {"video_frame"}));
      renderer = ext_rgb_renderer_.get();
    } else  // NOLINT(readability/braces)
      if (!rgb_renderer_) {
        rgb_renderer_ = absl::make_unique<QuadRenderer>();
      renderer = rgb_renderer_.get();
  RET_CHECK(renderer) << "Unsupported input texture type";

  mediapipe::FrameScaleMode scale_mode = mediapipe::FrameScaleModeFromProto(
      scale_mode_, mediapipe::FrameScaleMode::kStretch);
  mediapipe::FrameRotation rotation =

  auto dst = gpu_helper_.CreateDestinationTexture(output_width, output_height,


      src1.width(), src1.height(), dst.width(), dst.height(), scale_mode,
      rotation, flip_horizontally_, flip_vertically_,

  glBindTexture(, 0);

  // Execute GL commands, before getting result.

  auto output = dst.template GetFrame<GpuBuffer>();
  cc->Outputs().Tag(kGpuBufferTag).Add(output.release(), cc->InputTimestamp());


  return absl::OkStatus();





