深入浅出mediasoup—拥塞控制

news2025/1/15 23:32:49

拥塞控制对于不同网络条件下保证音视频传输质量非常重要。mediasoup 移植了 WebRTC 的 GCC 模块,嵌入到服务器,使得 mediasoup 具备了和 WebRTC 客户端一样的拥塞控制能力。为了使 GCC 能够与 mediasoup 框架良好交互,mediasoup 做了很多适配工作,包括如何驱动 GCC 以及如何使用 GCC 输出,本文主要分析这部分的设计与实现。

1. 方案分析

在《深入浅出WebRTC-GCC》中有讲过,GCC 实现非常复杂,但 GCC 是通过 RtpTransportControllerSend 嵌入到 WebRTC 整体框架中的,因此,集成 GCC 只需要与 RtpTransportControllerSend 交互即可,这就大大降低了集成的复杂度。

mediasoup 源码中 RtpTransportControllerSend 接口如下。RtpTransportControllerSend 接口可以分为 3 类:设置、报文和定时器。码率、网络、ALR、平滑增益等属于设置类接口;发送报文、接收 RR、接收 TransportFeedback 等属于报文类接口;最后再加上一个定时器驱动的 Process 接口。


// 设置分配的全局发送码率限制,内会设置带宽探测和平滑发送模块参数
void SetAllocatedSendBitrateLimits(int min_send_bitrate_bps,
								 int max_padding_bitrate_bps,
								 int max_total_bitrate_bps) override;

// 设置码率偏好[min, start, max],内部会设置带宽评估器参数
void SetClientBitratePreferences(const TargetRateConstraints& constraints);

// 设置平滑增益
void SetPacingFactor(float pacing_factor) override;

// 网络连接和断开
void OnNetworkAvailability(bool network_available) override;

// 是否设置周期性 ALR 探测
void EnablePeriodicAlrProbing(bool enable) override;

// 报文发送到网络后回调
void OnSentPacket(const rtc::SentPacket& sent_packet, size_t size) override;

// 这个接口可以不用关注(默认没有变化)
void OnTransportOverheadChanged(
  size_t transport_overhead_per_packet) override;

// 这个接口不用关注(这是 REMB 实现,我们只分析 TCC 实现)
void OnReceivedEstimatedBitrate(uint32_t bitrate) override;

// 收到 RTCP RR 处理,RTT 以及统计的丢包需要更新到 GCC
void OnReceivedRtcpReceiverReport(const ReportBlockList& report_blocks,
								int64_t rtt,
								int64_t now_ms) override;

// 报文发送要调用到这里,内部会保存发送报文信息
void OnAddPacket(const RtpPacketSendInfo& packet_info) override;

// 收到 TransportFeedback 处理,内部会关联之前保存的发送报文,并传递到 GCC 处理
void OnTransportFeedback(const RTC::RTCP::FeedbackRtpTransportPacket& feedback) override;

// 定时器驱动
void Process();

是不是适配 RtpTransportControllerSend 就可以了呢?还不行,还要考虑平滑发送模块的适配。mediasoup 没有实现平滑发送,原因是 mediasoup 作为一个媒体转发服务器, 主要任务是转发数据包,调整数据包的发送速率依赖于 WebRTC 客户端的能力。又因为平滑发送模块与带宽探测功能联系在一起,为此,mediasoup 重新实现了 PacedSender,保留了带宽探测能力。

2. 静态结构

GCC 功能实现需要发送端和接收端的配合,新版本使用 TransportCC 代替了之前的 REMB 方案。TransportCC 方案绝大部分计算都放在了发送端,因此,本文只讨论发送端的实现。

mediasoup 使用 TransportCongestionControlClient 来实现 GCC 发送端的功能,如下图所示。TransportCongestionControlClient 作为 mediasoup 与 GCC 之间的适配层,它继承了 webrtc::PacketRouter 接口,用生成和发送探测报文;继承了 webrtc::TargetTransferRateObserver, 用来接收 GCC 的估计带宽。

2.1. 重要属性

1)listener

Transport 继承了 RTC::TransportCongestionControlClient::Listener 接口,接收 GCC 评估的带宽;另外,带宽探测报文也需要通过 Transport 发送。

2)RtpProbationGenerator

用来生成探测用的 padding 报文。

3)TrendCalculator

用来平滑期望带宽,防止期望带宽剧烈抖动。

4)Bitrates

记录所有码率值。

2.2. 重要方法

1)TransportConnected

Transport 连接成功调用,内部会启动拥塞控制算法。

2)TransportDisconnected

Transport 连接断开调用,内部会关闭拥塞控制算法。

3)SetMaxOutgoingBitrate

设置 Transport 的最大发送码率。

4)SetMinOutgoingBitrate

设置 Transport 的最小发送码率。

5)SetDesiredBitrate

设置 Transport 期望的发送码率,conumser 的加入和退出、分层切换等都会设置新的期望带宽。

6)RescheduleNextAvailableBitrateEvent

用来控制通知 Transport 码率变化的频率,太频繁的码率变化通知,容易导致振荡。

7)InsertPacket

模拟通过 PacedSender 发送报文,PacedSender 内部不会真正发送报文。

8)PacketSent

模拟报文发送到网络后的回调。

9)ReceiveRtcpReceiverReport

处理 RTCP RR 报文,RTT 和 丢包统计用来更新 GCC 状态。

10)ReceiveRtcpTransportFeedback

处理 TransportFeedback 报文,这是 GCC 所需的最重要报文。

11)OnTimer

定时器回调,用来驱动 GCC 和 PacedSender。

3. 调用流程

3.1. 探测发包

探测报文是通过定时器驱动 PacedSender 控制发送,最终还是要回调到 Transport 发送到网络。InsertPacket 相当于模拟 WebRTC 的 EnqueuePackets,更新 GCC 内部组件的状态。

1)TransportFeedbackAdapter 需要记录发送报文,以供后面码率统计。

2)PacedSender 需要更新内部的 budget,用来控制探测码率。

3)BitrateProber 有一个奇怪行为,它需要等待报文发送才能启动带宽探测任务。

3.2. 发包回调

不管是普通的媒体报文,还是探测报文,发送到网络后都需要回调 PacketSent,更新 GCC 内部状态。

1)TransportFeedbackAdapter 更新报文发送数据。

2)GoogCcNetworkController 会更新 ALR、CongestionWindowPushbackController 等组件状态。

3)PacedSender 更新 outstanding 数据,用来判断链路拥塞状态。

3.3. TransportFeedback

TransportFeedback 是最重要的协议报文。

1)先送到 TransportFeedbackAdapter 进行预处理,匹配报文发送信息(PacingInfo)。

2)GoogCcNetworkController 会使用 TransportFeedback 包括延迟带宽估计器、丢包带宽估计器、ACK 码率估计器等在内的一堆组件。

3)PacedSender 需要将确认的报文从 outstanding 中删除。

3.4. ReceiverReport

RR 报文的处理相对简单一些,通过 RR 报文计算得到的丢包率和 RTT 会用来更新 GoogCcNetworkController。

4. 带宽应用

GCC 的评估带宽会通过 OnTargetTransferRate 回调 TransportCongestionControlClient。Transport 会基于评估带宽按照优先级重新为每个 consumer 分配码率,最终分配的码率又会设置到 GCC,限制 GCC 带宽探测和带宽评估的范围。这是有必要的,否则服务端大量带宽探测可能会导致网络拥塞。

5. PacedSender

前面提到,mediasoup 不需要实现平滑发送,但 GCC 的带宽探测的控制放在平滑发送模块,为了减少对 GCC 中带宽探测实现逻辑的破坏,mediasoup 需要实现一个仅有带宽探测功能的 PacedSender。

5.1. 实现分析

在《深入浅出WebRTC—Pacer》中有总结平滑发包模块的逻辑架构,如下图所示(WebRTC 最新源码)。具体细节就不再赘述,有需要了解的可以移步之前的文章。

mediasoup 实现的 PacedSender 相比 WebRTC 的平滑发包模块做了非常的大的简化,如下图所示。

简化的地方有以下几个方面:

1)使用定时器代替 TaskQueue 驱动发包逻辑,不是固定周期,而是动态获取超时时间。

2)由于不需要平滑发包逻辑,因此也就不需要发送队列。带宽探测是由 BitrateProber 控制,因此也不需要 media_debt_。(mediasoup 的源码中还有 media_debt_ 更新逻辑,但其实没有用到,不知道什么原因)

3)Padding 报文的生成,WebRTC 支持用媒体报文做 padding 报文,mediasoup 使用固定报文,大大简化了这部分逻辑。

5.2. 源码分析

PacedSender 实现比较简单,重要代码做了注释。不太理解的是,PacedSender 中和带宽探测无关的发送控制逻辑都没有存在的意义,但代码中还保留。

// 传入探测码率和 ID 创建探测任务
void PacedSender::CreateProbeCluster(int bitrate_bps, int cluster_id) {
  prober_.CreateProbeCluster(bitrate_bps, DepLibUV::GetTimeMsInt64(), cluster_id);
}

// 暂停和恢复探测
void PacedSender::Pause() {
  paused_ = true;
}
void PacedSender::Resume() {
  paused_ = false;
}

// 以下三个函数用来判断链路拥塞状态
void PacedSender::SetCongestionWindow(int64_t congestion_window_bytes) {
  congestion_window_bytes_ = congestion_window_bytes;
}
void PacedSender::UpdateOutstandingData(int64_t outstanding_bytes) {
  outstanding_bytes_ = outstanding_bytes;
}
bool PacedSender::Congested() const {
  if (congestion_window_bytes_ == kNoCongestionWindow)
    return false;
  return outstanding_bytes_ >= congestion_window_bytes_;
}

// 开关带宽探测
void PacedSender::SetProbingEnabled(bool enabled) {
  if (packet_counter_ != 0) {
    return;
  }
  prober_.SetEnabled(enabled);
}

// 设置平滑发送码率(其实没有意义)
void PacedSender::SetPacingRates(uint32_t pacing_rate_bps,
                                 uint32_t padding_rate_bps) {
  if (pacing_rate_bps == 0) {
    return;
  }

  pacing_bitrate_kbps_ = pacing_rate_bps / 1000;
  padding_budget_.set_target_rate_kbps(padding_rate_bps / 1000);
}

// 模拟发送报文
void PacedSender::InsertPacket(size_t bytes) {
  if (pacing_bitrate_kbps_ <= 0) {
    return;
  }
	// BitrateProber 要看到报文发送才会启动带宽探测
  prober_.OnIncomingPacket(bytes);
  packet_counter_++;
  OnPacketSent(bytes);
}

// 不会调用
void PacedSender::SetAccountForAudioPackets(bool account_for_audio) {
  account_for_audio_ = account_for_audio;
}

// 获取下一次报文发送时间
int64_t PacedSender::TimeUntilNextProcess() {
  // 过去了多少时间,换算成毫秒
  int64_t elapsed_time_us = DepLibUV::GetTimeUsInt64() - time_last_process_us_;
  int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;

  // 当暂停时,每500毫秒发送一个 keep-alive 填充包
  if (paused_)
    return std::max<int64_t>(kPausedProcessIntervalMs - elapsed_time_ms, 0);

  // 如果我们正在探测,则返回下一个探测的时间
  if (prober_.IsProbing()) {
    int64_t ret = prober_.TimeUntilNextProbe(DepLibUV::GetTimeMsInt64());
    if (ret > 0 || (ret == 0 && !probing_send_failure_))
      return ret;
  }

  // 走到这里说明没有探测,那么就返回下一个处理的时间(5ms)
  return std::max<int64_t>(min_packet_limit_ms_ - elapsed_time_ms, 0);
}

int64_t PacedSender::UpdateTimeAndGetElapsedMs(int64_t now_us) {
  // 计算过去的时间
  int64_t elapsed_time_ms = (now_us - time_last_process_us_ + 500) / 1000;

  // 更新最近一次处理的时间
  time_last_process_us_ = now_us;

  // 如果过去的时间超过了最大时间限制,则限制为最大时间(2000ms)
  if (elapsed_time_ms > kMaxElapsedTimeMs) {
    elapsed_time_ms = kMaxElapsedTimeMs;
  }

  // 返回过去的时间
  return elapsed_time_ms;
}

void PacedSender::Process() {
  int64_t now_us = DepLibUV::GetTimeUsInt64();
  int64_t elapsed_time_ms = UpdateTimeAndGetElapsedMs(now_us);

  if (paused_) return;

  // 使用平滑发送码率更新 media_budget_.
  if (elapsed_time_ms > 0) {
    int target_bitrate_kbps = pacing_bitrate_kbps_;
    media_budget_.set_target_rate_kbps(target_bitrate_kbps);
    UpdateBudgetWithElapsedTime(elapsed_time_ms);
  }

  // 只处理带宽探测
  if (!prober_.IsProbing()) return;

  PacedPacketInfo pacing_info;
  absl::optional<size_t> recommended_probe_size;

  // 获取探测任务数据
  pacing_info = prober_.CurrentCluster().value_or(PacedPacketInfo());
  recommended_probe_size = prober_.RecommendedMinProbeSize();

  size_t bytes_sent = 0;
  RTC::RtpPacket* padding_packet{ nullptr };

  // 发送指定大小的探测报文
  while (true) {
		// 获取发送报文大小
    size_t padding_bytes_to_add = PaddingBytesToAdd(recommended_probe_size, bytes_sent);

		// 已经没有报文要发送,退出
    if (padding_bytes_to_add == 0)
      break;

		// 生成 padding 报文
    padding_packet = packet_router_->GeneratePadding(padding_bytes_to_add);

		// 发送报文
    packet_router_->SendPacket(padding_packet, pacing_info);

		// 更新发送报文大小
    bytes_sent += padding_packet->GetSize();

		// 报文发送够了就退出
    if (recommended_probe_size && bytes_sent > *recommended_probe_size)
      break;
  }

	// 更新 BitrateProber
  if (bytes_sent != 0) {
    auto now = DepLibUV::GetTimeUsInt64();
    OnPaddingSent(now, bytes_sent);
    prober_.ProbeSent((now + 500) / 1000, bytes_sent);
  }
}

size_t PacedSender::PaddingBytesToAdd(absl::optional<size_t> recommended_probe_size,
    size_t bytes_sent) {
  if (Congested()) {
    return 0; // 如果链路拥塞,探测报文也不能发送
  }

  if (recommended_probe_size) {
    if (*recommended_probe_size > bytes_sent) {
      return *recommended_probe_size - bytes_sent;
    }
    return 0;
  }

  return padding_budget_.bytes_remaining();
}

void PacedSender::OnPacketSent(size_t size) {
  if (first_sent_packet_ms_ == -1)
    first_sent_packet_ms_ = DepLibUV::GetTimeMsInt64();

  UpdateBudgetWithBytesSent(size);
}

PacedPacketInfo PacedSender::GetPacingInfo() {
  PacedPacketInfo pacing_info;
  // 如果没有探测,则返回空;如果有探测,则返回当前探测任务信息
  if (prober_.IsProbing()) {
    pacing_info = prober_.CurrentCluster().value_or(PacedPacketInfo());;
  }
  return pacing_info;
}

void PacedSender::OnPaddingSent(int64_t now, size_t bytes_sent) {
  if (bytes_sent > 0) {
    UpdateBudgetWithBytesSent(bytes_sent);
  }
}

// 基于过去时间增加 budget
void PacedSender::UpdateBudgetWithElapsedTime(int64_t delta_time_ms) {
  delta_time_ms = std::min(kMaxIntervalTimeMs, delta_time_ms);
  media_budget_.IncreaseBudget(delta_time_ms);
  padding_budget_.IncreaseBudget(delta_time_ms);
}

// 基于报文大小降低 budget
void PacedSender::UpdateBudgetWithBytesSent(size_t bytes_sent) {
  outstanding_bytes_ += bytes_sent;
  media_budget_.UseBudget(bytes_sent);
  padding_budget_.UseBudget(bytes_sent);
}

6. 总结

本文分析了 mediasoup 拥塞控制实现,重点阐述了 TransportCongestionControlClient 如何对 GCC 进行封装和适配,并对 PacedSender 实现进行了深入分析。当然还有很多细节没有涉及,比如带宽分配逻辑,后面有时间再补充。大家在自己的项目中可以借鉴 mediasoup 集成 GCC 的方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1961511.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Large Models for Time Series and Spatio-Temporal Data: A Survey and Outlook

基本信息 博客贡献人 谷雨 作者 Ming Jin, Qingsong Wen, et al. 标签 大语言模型、预训练基础模型、大模型、时间序列、时空数据、时态数据 摘要 时态数据&#xff0c;包括时间序列和时空数据&#xff0c;在现实世界的应用中极为广泛。这些数据类型记录了动态系统随时…

2024电赛H题可能用到的代码——自动行驶小车

目录 前言 一、MPU6050零漂处理 二、MPU6050的Yaw&#xff08;180&#xff09;误差处理 三、PID算法&#xff08;增量式位置式&#xff09; 四、灰度传感器&#xff08;以8路为例&#xff09; 1、获取黑线偏差 2、判断ABCD点&#xff08;有无黑线交点&#xff09; 五、总结 前言…

内存原生CRAM技术将会颠覆计算存储的未来?

近期&#xff0c;一项刚刚发布的最新研究表明&#xff0c;一种名为计算随机存取存储器&#xff08;Computational Random-Access Memory, CRAM&#xff09;的新技术能够极大地减少人工智能&#xff08;AI&#xff09;处理所需的能量消耗。这项技术由明尼苏达大学双城分校的一组…

SmartInitializingSingleton和InitializingBean的区别

SmartInitializingSingleton&#xff1a;接口里面就一个方法afterSingletonsInstantiated&#xff0c;它是spring容器将所有bean都初始化完成之后&#xff0c;才会去调用&#xff0c;要求实现它接口的bean必须是单例的。 应用场景&#xff1a;可以在服务启动之后去处理一些逻辑…

红酒与电影:银幕上的醉人瞬间

在光影交织的银幕世界里&#xff0c;红酒不仅是品味生活的象征&#xff0c;更是情感交流的媒介。当定制红酒与电影相遇&#xff0c;它们共同编织出一个个醉人的瞬间&#xff0c;让观众在品味红酒的同时&#xff0c;也沉醉于电影的魅力之中。今天&#xff0c;就让我们一起走进红…

JS小应用:从图床获取的html代码中提取IMG标签并提取图片复制到剪贴板

JS小应用&#xff1a;从图床获取的html代码中提取IMG标签并提取图片复制到剪贴板 问题产生 自己做站长&#xff0c;为了节省银子&#xff0c;难免要用到图床。有的图床可以直接给你URL&#xff0c;这当然是最好的情况&#xff1a; 而有的图床&#xff0c;却禁用了鼠标右键&am…

“论数据分片技术及其应用”写作框架软考高级论文系统架构设计师论文

论文真题 数据分片就是按照一定的规则&#xff0c;将数据集划分成相互独立、正交的数据子集&#xff0c;然后将数据子集分布到不同的节点上。通过设计合理的数据分片规则&#xff0c;可将系统中的数据分布在不同的物理数据库中&#xff0c;达到提升应用系统数据处理速度的目的…

FP分数规划在无线通信中的应用(II)

3. 具体例子 3.1-3.3都只需要用第一章concave-convex方法求解&#xff0c;3.4-3.6需要用到第二章的拉格朗日对偶变换&#xff0c;而且具体解 x \mathbf{x} x时需要对离散变量单独开发算法。 3.1 多小区SISO能量分配 第一个例子是具有一组单天线基站&#xff08;BSs&#xff…

Python面向对象浅析

目录 面向对象基本概念 一、类和对象 类和对象是面向对象骗程的两个核心概念。 在程序开发中&#xff0c;要设计一个类&#xff0c;通常需要满足一下三个要素: self详解&#xff1a; 对象&#xff08;Object&#xff09; 魔法方法&#xff1a; 类里的一些特殊方法 __in…

RK3568笔记四十八:ADC驱动开发测试

若该文为原创文章&#xff0c;转载请注明原文出处。 一、ADC介绍 RK3568集成了一个逐次逼近模数转换器&#xff08;Successive Approximation ADC&#xff09;&#xff0c;通常简称为SAR ADC。 这种转换器能够将连续的模拟信号转换为离散的数字信号&#xff0c;其特点在于具有…

nginx转发netty长链接(nginx负载tcp长链接配置)

首先要清楚一点&#xff0c;netty是长链接是tcp连接不同于http中负载在http中配置server监听。长连接需要开启nginx的stream模块(和http是并列关系) 安装nginx时注意开启stream&#xff0c;编译时加上参数 --with-stream &#xff08;其他参数根据自己所需来加&#xff09; …

rem实现屏幕适配(jQuery)

一、rem换算 1.根据视口宽度动态计算字体大小&#xff0c;如果宽度大于750px&#xff0c;则将字体大小设置为100px&#xff0c;否则按比例缩小。 tips:使用时记得引入jQuery.js // 在文档加载完成后执行函数&#xff0c;确保DOM已经准备就绪$(function () {// 定义一个自执行…

增量学习中Task incremental、Domain incremental、Class incremental 三种学习模式的概念及代表性数据集?

1 概念 在持续学习领域&#xff0c;Task incremental、Domain incremental、Class incremental 是三种主要的学习模式&#xff0c;它们分别关注不同类型的任务序列和数据分布变化。 1.1 Task Incremental Learning (Task-incremental) 任务增量学习&#xff0c;也称为任务增…

盐分反演关键:批量计算常用的盐分指数反演变量

盐分反演关键&#xff1a;批量计算常用的盐分指数反演变量 一、引言 盐分指数反演是遥感应用中的一个重要方面&#xff0c;尤其在农业和环境监测中有着广泛的应用。通过遥感影像&#xff0c;研究人员可以高效地获取和分析地表盐分信息&#xff0c;为土地管理和作物生产提供重…

YOLOX+PyQt5交通路口智能监测平台设计与实现

1.概述 交通要道的路口上人车穿行&#xff0c;特别是上下班早高峰&#xff0c;且时常发生交通事故。因此对交通路口的车流量和人流量的监测必不可少。 2.检测模型 使用的检测模型为YOLOX模型&#xff0c;模型权重为训练VOC数据集得来&#xff0c;其中包括了二十个类别&#…

ONLYOFFICE 协作空间 2.6 已发布:表单填写房间、LDAP、优化房间和文件管理等

更新后的 ONLYOFFICE 协作空间带来了超过 20 项新功能和优化&#xff0c;让工作更加高效和舒适。阅读本文了解详情。 表单填写房间 这次更新增加了一种新的房间类型&#xff0c;可在 ONLYOFFICE 协作空间中组织简单的表单填写流程。 通过表单填写房间&#xff0c;目前可以完成…

仓库物品与装备物品位置更换

一、装备物品与选中的仓库物品位置交换 1、准备工作 2、Inventory Items 3、给Warehouse添加Grid Layout Group组件 4、复制Inventory Items&#xff0c;设置Grid Layout Group组件 5、创建文本ItemName和ItemDescription 6、设置物品数据 (1) 创建 ItemData.cs using Syst…

Spring boot tomcat 读写超时时间设置

yaml配置 connection-timeout: 20000 server:port: 9898servlet:context-path: /testtomcat:connection-timeout: 20000max-connections: 250accept-count: 300 spring源码设置自定义tomcat参数 customizeConnector(connector); Overridepublic WebServer getWebServer(Serv…

【MySQL】表的约束{ 常见约束 空属性 默认值 列描述comment zerofill 主键 复合主键 自增长 唯一键 外键 }

文章目录 常见约束空属性默认值列描述commentzerofill主键复合主键自增长唯一键外键 2.总结 真正约束字段的是数据类型&#xff0c;但是数据类型约束很单一&#xff0c;需要有一些额外的约束&#xff0c;更好的保证数据的合法性&#xff0c;从业务逻辑角度保证数据的正确性。比…

MySQL基础练习题12-使用唯一标识码替换员工ID

题目&#xff1a;展示每位用户的 唯一标识码&#xff08;unique ID &#xff09;&#xff1b;如果某位员工没有唯一标识码&#xff0c;使用 null 填充即可。 准备数据 分析数据 题目&#xff1a;展示每位用户的 唯一标识码&#xff08;unique ID &#xff09;&#xff1b;如果…