流媒体学习之路(WebRTC)——Pacer与GCC(5)

news2024/12/26 12:30:57

流媒体学习之路(WebRTC)——Pacer与GCC(5)

——
我正在的github给大家开发一个用于做实验的项目 —— github.com/qw225967/Bifrost

目标:可以让大家熟悉各类Qos能力、带宽估计能力,提供每个环节关键参数调节接口并实现一个json全配置,提供全面的可视化算法观察能力。

欢迎大家使用
——

文章目录

  • 流媒体学习之路(WebRTC)——Pacer与GCC(5)
  • 一、PacingController
    • 1.1 背景介绍
    • 1.2 代码
  • 二、IntervalBudget
    • 2.1 背景
    • 2.2 代码
  • 三、PacedSender
  • 四、总结


  在讲具体内容之前插一句嘴,从GCC分析(3)开始,我们将针对GCC的实现细节去分析它设计的原理,让我们理解这些类存在的意义,不再带大家去串具体的流程了。

一、PacingController

1.1 背景介绍

  Pacer(Packet Pacing)的作用是在传输数据时能平滑的发送出去,减少对网络冲击和抖动的产生,提高通信质量。在一次数据传输中,如果所有包几乎同时发送,网络就可能会遭遇到冲击,这就可能导致网络拥塞,数据包丢失等问题。为了避免这样的问题,需要通过一个定时器均匀分散发送数据包。
  特别是在音视频传输中,PACER更是非常重要的一部分。因为音视频的传输对于网络的稳定性和实时性要求非常高,任何形式的网络抖动或者丢包都会造成音视频的卡顿,延迟等问题。所以在WebRTC中使用Pacer,就是为了使音视频传输更加平滑,减少由于网络抖动造成的影响,从而达到提高实时音视频通信质量的目的。

  提到WebRTC的Pacer就需要讲述它码率控制的逻辑:
在这里插入图片描述
  从GCC输出的码率会设置给编码器以及pacer。pacer并不是完全严格设置多少就发多少,而是留有2.5倍的空间去发送。真正控制发送码率的则是输出给编码器的部分,期望控制编码器的输出码率。同时,pacer还对所有数据设置了优先级,优先级如下:

int GetPriorityForType(RtpPacketToSend::Type type) {
  // Lower number takes priority over higher.
  switch (type) {
    case RtpPacketToSend::Type::kAudio:
      // Audio is always prioritized over other packet types.
      return kFirstPriority + 1;
    case RtpPacketToSend::Type::kRetransmission:
      // Send retransmissions before new media.
      return kFirstPriority + 2;
    case RtpPacketToSend::Type::kVideo:
    case RtpPacketToSend::Type::kForwardErrorCorrection:
      // Video has "normal" priority, in the old speak.
      // Send redundancy concurrently to video. If it is delayed it might have a
      // lower chance of being useful.
      return kFirstPriority + 3;
    case RtpPacketToSend::Type::kPadding:
      // Packets that are in themselves likely useless, only sent to keep the
      // BWE high.
      return kFirstPriority + 4;
  }
}

  Pacer之所设计成这样,是因为我们向编码器设置码率之后想要保证丝滑清晰的画面,不可能完全控制输出码率,有时候画面复杂码率就大一些,画面简单码率就小一些。所以Pacer为了保证延迟预留了2.5倍的发送空间,也就是说真正控制码率的位置其实是编码器的输出

1.2 代码

  接下来我看看看pacer的核心代码——PacingController。这个类包含了优先级设置以及发送的逻辑,前面提到了优先级的内容下面只介绍发送逻辑:

void PacingController::ProcessPackets() {
  Timestamp now = CurrentTime(); // 当前时间
  TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now); // 与上次process的间隔

  // 发送保活,每500ms发送一个padding包,一旦发送的数据大于拥塞窗口则不发送
  if (ShouldSendKeepalive(now)) {
    DataSize keepalive_data_sent = DataSize::Zero();
    // 产生padding包
    std::vector<std::unique_ptr<RtpPacketToSend>> keepalive_packets =
        packet_sender_->GeneratePadding(DataSize::bytes(1));
    for (auto& packet : keepalive_packets) {
      keepalive_data_sent +=
          DataSize::bytes(packet->payload_size() + packet->padding_size());
      packet_sender_->SendRtpPacket(std::move(packet), PacedPacketInfo());
    }
    OnPaddingSent(keepalive_data_sent);
  }

  // 处于暂停直接返回
  if (paused_)
    return;
  
  // 进入发送间隔开始计算
  if (elapsed_time > TimeDelta::Zero()) {
    DataRate target_rate = pacing_bitrate_;
    DataSize queue_size_data = packet_queue_.Size();
    // 队列中有数据才能发送
    if (queue_size_data > DataSize::Zero()) {
      // Assuming equal size packets and input/output rate, the average packet
      // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
      // time constraint shall be met. Determine bitrate needed for that.
      // 
      packet_queue_.UpdateQueueTime(CurrentTime());
      if (drain_large_queues_) {
        // 平均发送时间 = 最大队列时长(2s)- 平均排队时间
        TimeDelta avg_time_left =
            std::max(TimeDelta::ms(1),
                     queue_time_limit - packet_queue_.AverageQueueTime());
        DataRate min_rate_needed = queue_size_data / avg_time_left;
        // 最发送码率大于目标码率,则目标码率等于最小需求码率
        if (min_rate_needed > target_rate) {
          target_rate = min_rate_needed;
          RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps="
                              << target_rate.kbps();
        }
      }
    }

    // 设置媒体桶
    media_budget_.set_target_rate_kbps(target_rate.kbps());
    UpdateBudgetWithElapsedTime(elapsed_time);
  }

  bool first_packet_in_probe = false;
  bool is_probing = prober_.IsProbing();
  PacedPacketInfo pacing_info;
  absl::optional<DataSize> recommended_probe_size;
  // 正在探测则获取探测数据信息
  if (is_probing) {
    pacing_info = prober_.CurrentCluster();
    first_packet_in_probe = pacing_info.probe_cluster_bytes_sent == 0;
    recommended_probe_size = DataSize::bytes(prober_.RecommendedMinProbeSize());
  }

  DataSize data_sent = DataSize::Zero();
  // The paused state is checked in the loop since it leaves the critical
  // section allowing the paused state to be changed from other code.
  // 
  while (!paused_) {
    if (small_first_probe_packet_ && first_packet_in_probe) {
      // If first packet in probe, insert a small padding packet so we have a
      // more reliable start window for the rate estimation.
      // 产生padding包
      auto padding = packet_sender_->GeneratePadding(DataSize::bytes(1));
      // If no RTP modules sending media are registered, we may not get a
      // padding packet back.
      if (!padding.empty()) {
        // Insert with high priority so larger media packets don't preempt it.
        EnqueuePacketInternal(std::move(padding[0]), kFirstPriority);
        // We should never get more than one padding packets with a requested
        // size of 1 byte.
        RTC_DCHECK_EQ(padding.size(), 1u);
      }
      first_packet_in_probe = false;
    }

    // 获取待发送包
    auto* packet = GetPendingPacket(pacing_info);
    // 一旦产生不了数据,证明队列为空,则放入padding数据
    if (packet == nullptr) {
      // No packet available to send, check if we should send padding.
      DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);
      if (padding_to_add > DataSize::Zero()) {
        std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =
            packet_sender_->GeneratePadding(padding_to_add);
        if (padding_packets.empty()) {
          // No padding packets were generated, quite send loop.
          break;
        }
        for (auto& packet : padding_packets) {
          EnqueuePacket(std::move(packet));
        }
        // Continue loop to send the padding that was just added.
        continue;
      }

      // Can't fetch new packet and no padding to send, exit send loop.
      break;
    }

    // 发送数据
    std::unique_ptr<RtpPacketToSend> rtp_packet = packet->ReleasePacket();
    RTC_DCHECK(rtp_packet);
    packet_sender_->SendRtpPacket(std::move(rtp_packet), pacing_info);

    data_sent += packet->size();
    // Send succeeded, remove it from the queue.
    OnPacketSent(packet);
    if (recommended_probe_size && data_sent > *recommended_probe_size)
      break;
  }
  
  if (is_probing) {
    probing_send_failure_ = data_sent == DataSize::Zero();
    if (!probing_send_failure_) {
      prober_.ProbeSent(CurrentTime().ms(), data_sent.bytes());
    }
  }
}

RoundRobinPacketQueue::QueuedPacket* PacingController::GetPendingPacket(
    const PacedPacketInfo& pacing_info) {
  if (packet_queue_.Empty()) {
    return nullptr;
  }

  // Since we need to release the lock in order to send, we first pop the
  // element from the priority queue but keep it in storage, so that we can
  // reinsert it if send fails.
  
  // 取出第一个包
  RoundRobinPacketQueue::QueuedPacket* packet = packet_queue_.BeginPop();
  bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
  bool apply_pacing = !audio_packet || pace_audio_;
  // 如果处于拥塞状态或者剩余数据为0则取消弹出
  if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 &&
                                       pacing_info.probe_cluster_id ==
                                           PacedPacketInfo::kNotAProbe))) {
    
    packet_queue_.CancelPop();
    return nullptr;
  }
  return packet;
}

二、IntervalBudget

2.1 背景

  PacingController上述用到了IntervalBudget这个类,这个类用于做数据统计和预估。并且它作为一个抽象预估类,并不会真正的存数据,只是做了数据统计,每次排出数据后都按时间更新一次桶的容量,发送时则会把已发送的数据更新到桶数据中。
在这里插入图片描述

2.2 代码

  头文件:

class IntervalBudget {
 public:
  explicit IntervalBudget(int initial_target_rate_kbps);
  IntervalBudget(int initial_target_rate_kbps, bool can_build_up_underuse);
  void set_target_rate_kbps(int target_rate_kbps);

  // TODO(tschumim): Unify IncreaseBudget and UseBudget to one function.
  void IncreaseBudget(int64_t delta_time_ms);
  void UseBudget(size_t bytes);

  size_t bytes_remaining() const;
  double budget_ratio() const;
  int target_rate_kbps() const;

 private:
  int target_rate_kbps_;
  int64_t max_bytes_in_budget_;
  int64_t bytes_remaining_;
  bool can_build_up_underuse_;
};

  CPP文件:

constexpr int64_t kWindowMs = 500;
}

IntervalBudget::IntervalBudget(int initial_target_rate_kbps)
    : IntervalBudget(initial_target_rate_kbps, false) {}

IntervalBudget::IntervalBudget(int initial_target_rate_kbps,
                               bool can_build_up_underuse)
    : bytes_remaining_(0), can_build_up_underuse_(can_build_up_underuse) {
  set_target_rate_kbps(initial_target_rate_kbps);
}

void IntervalBudget::set_target_rate_kbps(int target_rate_kbps) {
  target_rate_kbps_ = target_rate_kbps;
  // 默认按500ms计算最大桶码率
  max_bytes_in_budget_ = (kWindowMs * target_rate_kbps_) / 8;
  // 计算剩余码率
  bytes_remaining_ = std::min(std::max(-max_bytes_in_budget_, bytes_remaining_),
                              max_bytes_in_budget_);
}

void IntervalBudget::IncreaseBudget(int64_t delta_time_ms) {
  // 按时换算桶的码率
  int64_t bytes = target_rate_kbps_ * delta_time_ms / 8;
  if (bytes_remaining_ < 0 || can_build_up_underuse_) {
    // We overused last interval, compensate this interval.
    // 把当前的码率加上
    bytes_remaining_ = std::min(bytes_remaining_ + bytes, max_bytes_in_budget_);
  } else {
    // If we underused last interval we can't use it this interval.
    // 一旦剩余码率为负则重新使用新计算的码率
    bytes_remaining_ = std::min(bytes, max_bytes_in_budget_);
  }
}

void IntervalBudget::UseBudget(size_t bytes) {
  // 把使用的数据进行统计
  bytes_remaining_ = std::max(bytes_remaining_ - static_cast<int>(bytes),
                              -max_bytes_in_budget_);
}

size_t IntervalBudget::bytes_remaining() const {
  return rtc::saturated_cast<size_t>(std::max<int64_t>(0, bytes_remaining_));
}

double IntervalBudget::budget_ratio() const {
  if (max_bytes_in_budget_ == 0)
    return 0.0;
  return static_cast<double>(bytes_remaining_) / max_bytes_in_budget_;
}

int IntervalBudget::target_rate_kbps() const {
  return target_rate_kbps_;
}

三、PacedSender

  上述的PacingController把具体的发送数据进行具体的计算,WebRTC把发送的逻辑和控制逻辑抽离了出来,其实PacingSender在构造时创建了PacingController并传入了this指针。因此对于PacingController来说PacingSender作为控制器在内部进行了回调。

  其他的函数我们不做具体的描述,只介绍定时函数:

int64_t PacedSender::TimeUntilNextProcess() {
  rtc::CritScope cs(&critsect_);

  // When paused we wake up every 500 ms to send a padding packet to ensure
  // we won't get stuck in the paused state due to no feedback being received.
  // 从controller中获取间隔
  TimeDelta elapsed_time = pacing_controller_.TimeElapsedSinceLastProcess();
  if (pacing_controller_.IsPaused()) {
    // 最大间隔为500ms
    return std::max(PacingController::kPausedProcessInterval - elapsed_time,
                    TimeDelta::Zero())
        .ms();
  }

  auto next_probe = pacing_controller_.TimeUntilNextProbe();
  if (next_probe) {
    return next_probe->ms();
  }

  const TimeDelta min_packet_limit = TimeDelta::ms(5);
  return std::max(min_packet_limit - elapsed_time, TimeDelta::Zero()).ms();
}

四、总结

  本文介绍了Pacer相关的内容,但我们的目的是通过Pacer去理解GCC的逻辑,在经过多个版本的迭代,Pacer与GCC的配合已经非常娴熟,同时耦合也是非常严重的:

  1. 每次Pacer的溢出发送,都需要GCC兜底(GCC的灵敏可以有效地检测到网络的排队,任何一个溢出的数据都能快速的下调码率,在遇到瓶颈带宽的时候出现了明显的锯齿状发送曲线);
    在这里插入图片描述

  2. 码率不足与拥塞探测的矛盾(编码器的输出往往会收到一定的限制不可能无线地上涨,在当今环境下很难探测到带宽瓶颈。Pacer的做法是提供Padding的数据作为补充探测,但大部分厂商为了避免流量过度消耗,就把探测的逻辑关闭了。在这方面来看,Pacer真是没有完全听GCC的话);

  也正是因为这样,WebRTC的Pacer是GCC的Pacer其他的拥塞算法来了,估计都水土不服,参考BBR被移除可知。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1353319.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

单片机外设矩阵键盘之线反转法识别原理与示例

单片机外设矩阵键盘之线反转法识别原理与示例 1.概述 这篇文章主要介绍单片机接收 4X4矩阵键盘发出的指令&#xff0c;做出对应的反馈。其中主要介绍矩阵键盘线反转方式的识别原理和实操。 2.矩阵键盘线反转识别原理 2.1.矩阵键盘硬件接线原理 矩阵键盘的硬件接线方式有多种…

MySQL第三战:CRUD,函数1以及unionunion all

前言 在当今的数字化时代&#xff0c;数据库已经成为信息管理的重要工具。其中&#xff0c;MySQL作为一种流行的关系型数据库管理系统&#xff0c;已经广泛应用于各种业务场景。在本文中&#xff0c;我们将深入探讨MySQL中的核心概念&#xff0c;包括创建&#xff08;Create&a…

感恩客户相伴23载,泛微2024持续向上!

2023年&#xff0c;国家大力推动数字经济发展&#xff0c;各行各业在加速数字化转型&#xff0c;在这一年&#xff0c;泛微保持持续增长&#xff0c;引领行业发展&#xff0c;为组织的数字化转型助力。感恩客户与伙伴朋友的支持与信任&#xff01; 01.泛微中大客户总量突破8万余…

burpsuite模块介绍之extender(扩展)

extender Burp提供了对第三方拓展插件的支持,使用户能够编写自定义插件或从插件商店中安装拓展插件。这些Burp扩展程序可以以多种方式定制Burp的行为,包括修改HTTP请求和响应、自定义UI、添加自定义扫描程序检查以及访问关键的运行时信息,如代理历史记录、目标站点地图和扫…

Ubuntu Server 22.04 连接Wifi并配置静态IP

Ubuntu Server 22.04 连接Wifi并配置静态IP 前言&#xff1a;我家最近好几台电脑&#xff0c;我都想跑着Ubuntu Server做服务器&#xff0c;但是近几年的超级本已经不自带网口了&#xff0c;所以我就考虑用Wifi来联网&#xff0c;速度也还可以&#xff0c;但是既然是跑服务&…

工作中redis相关知识总结

这里写目录标题 一、Redis数据持久化概念二、redis数据类型三、redis缓存的应用流程四、什么样的数据适合存放到redis中&#xff1f;1、什么情况下&#xff0c;redis中会没有数据&#xff1f;2、redis缓存项目在测试中的注意事项a、更新缓存b、淘汰缓存 五、什么是缓存击穿1、缓…

【力扣题解】P236-二叉树的最近公共祖先-Java题解

&#x1f468;‍&#x1f4bb;博客主页&#xff1a;花无缺 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! 本文由 花无缺 原创 收录于专栏 【力扣题解】 文章目录 【力扣题解】P236-二叉树的最近公共祖先-Java题解&#x1f30f;题目描述&#x1f4a1;题解&#x…

Vue.js 3.4版本发布:解析速度提升2倍,双向绑定革新等新功能

引言 随着2024年的来临,Vue团队的领军人物Evan You宣布了Vue.js 3.4的发布。这个版本不仅仅是修复了一些bug,还带来了一些非常实用的新功能和性能提升。 解析速度提升2倍 这次更新中,Vue.js 3.4实现了解析速度的大幅提升。尤其是在构建模板和脚本的源代码映射时,单文件组…

Python等高线图的绘制(Matplotlib篇-11)

Python等高线图的绘制(Matplotlib篇-11)         🍹博主 侯小啾 感谢您的支持与信赖。☀️ 🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ…

k8s 之7大CNI 网络插件

一、介绍 网络架构是Kubernetes中较为复杂、让很多用户头疼的方面之一。Kubernetes网络模型本身对某些特定的网络功能有一定要求&#xff0c;但在实现方面也具有一定的灵活性。因此&#xff0c;业界已有不少不同的网络方案&#xff0c;来满足特定的环境和要求。 CNI意为容器网络…

Java基础-----集合类(三)

文章目录 1. Arraylist2. Arraylist常用方法 今天主要学习集合类框架 1. Arraylist Collection:是List和Set的父接口&#xff0c;里面包含了一些公用的方法 List:是一个有序的、不唯一的接口 ArrayList&#xff1a;是List的一个实现类&#xff0c;底层数据结构是数组 public…

终于学会听英文歌了:A Sad Me In Your Eyes

A Sad Me In Your Eyes 来源&#xff1a; https://lyricstranslate.com/en/ln-party-sad-me-your-eyes-lyrics.html Fire can’t burn in my eyes If without your smile Snow can cover your smile If without your love When you think of me, I’ve gone too far I can’t …

八怪:再谈 MySQL 8 这两个精准的时间戳

MySQL 8.0 的 binlog 中多了 immediate_commit_timestamp 和 original_commit_timestamp 的信息&#xff0c;网上也有很多文章进行解释&#xff0c;最近也刚好遇到相关问题&#xff0c;刚好稍微学习一下。 作者&#xff1a;高鹏&#xff08;八怪&#xff09;&#xff0c;《MySQ…

手把手将ReactJS项目部署到Ubuntu

我的新书《Android App开发入门与实战》已于2020年8月由人民邮电出版社出版&#xff0c;欢迎购买。点击进入详情 1.构建项目 npm run build 生成build目录&#xff1a; 2.上传项目 将build目录上传到Ubuntu。 可以使用Xftp工具。 3.启动项目 npm install -g serve serve -s …

原生JS做别踩白块游戏

思路 创建初始一个按钮并为他添加点击监听开始创建随机方块&#xff0c;并样式_box.offsetTop speed px结合setInterval使得方块不断下移创建和删除方块的原则&#xff1a;box.offsetTop>0&#xff08;可视区上部没有方块了&#xff09;时候需要创建一行方块&#xff0c;…

Apache DolphinScheduler 社区 2023 年度工作报告

随着 2023 年的日历逐渐翻至最后一页&#xff0c;我们欣喜地回顾 Apache DolphinScheduler 社区在这一年中所取得的成就和进步。这一年&#xff0c;我们不仅在社区规模和技术发展上取得了显著成就&#xff0c;还发布了大量的技术文章和博客&#xff0c;进一步丰富了我们的知识库…

【Java进阶篇】Java中Timer实现定时调度的原理(解析)

Java中Timer实现定时调度的原理 ✔️ 引言✔️JDK 中Timer类的定义✔️拓展知识仓✔️优缺点 ✔️ 引言 Java中的Timer类是用于计划执行一项任务一次或重复固定延迟执行的简单工具。它使用一个名为TaskQueue的内部类来存储要执行的任务&#xff0c;这些任务被封装为TimerTask对…

条款16:成对使用 new 和 delete 时要采用相同形式

下面程序的行为是未定义的。至少&#xff0c;stringArray指向的100个string对象中有99个不太可能被正确地析构。 被delete的指针指向单个对象还是一个对象数组&#xff1f;内存数组通常包括数组的大小&#xff0c;delete可以知道需要调用多少个析构函数。 使用delete时使用了方…

SpringBoot 接口对枚举类型的入参以及出参的转换处理

目录 1、在项目中使用枚举类型2、不做任何处理的演示效果2.1、接口出参2.2、接口入参 3、用枚举的code作为参数和返回值3.1 代码案例3.1.1、定义枚举基础接口BaseEnum&#xff0c;每个枚举都实现该接口3.1.2、性别Sex枚举并实现接口BaseEnum3.1.3、定义BaseEnum枚举接口序列化3…

前端工程化回顾-vite 构建神器

1.构建vite 项目 pnpm create vite2.常用的配置&#xff1a; 1.公共资源路径配置&#xff1a; base: ./, 默认是/2.路径别名配置&#xff1a; resolve: {alias: {: path.resolve(__dirname, ./src),ass: path.resolve(__dirname, ./src/assets),comp: path.resolve(__dirnam…