拥塞控制对于不同网络条件下保证音视频传输质量非常重要。mediasoup 移植了 WebRTC 的 GCC 模块,嵌入到服务器,使得 mediasoup 具备了和 WebRTC 客户端一样的拥塞控制能力。为了使 GCC 能够与 mediasoup 框架良好交互,mediasoup 做了很多适配工作,包括如何驱动 GCC 以及如何使用 GCC 输出,本文主要分析这部分的设计与实现。
1. 方案分析
在《深入浅出WebRTC-GCC》中有讲过,GCC 实现非常复杂,但 GCC 是通过 RtpTransportControllerSend 嵌入到 WebRTC 整体框架中的,因此,集成 GCC 只需要与 RtpTransportControllerSend 交互即可,这就大大降低了集成的复杂度。
mediasoup 源码中 RtpTransportControllerSend 接口如下。RtpTransportControllerSend 接口可以分为 3 类:设置、报文和定时器。码率、网络、ALR、平滑增益等属于设置类接口;发送报文、接收 RR、接收 TransportFeedback 等属于报文类接口;最后再加上一个定时器驱动的 Process 接口。
// 设置分配的全局发送码率限制,内会设置带宽探测和平滑发送模块参数
void SetAllocatedSendBitrateLimits(int min_send_bitrate_bps,
int max_padding_bitrate_bps,
int max_total_bitrate_bps) override;
// 设置码率偏好[min, start, max],内部会设置带宽评估器参数
void SetClientBitratePreferences(const TargetRateConstraints& constraints);
// 设置平滑增益
void SetPacingFactor(float pacing_factor) override;
// 网络连接和断开
void OnNetworkAvailability(bool network_available) override;
// 是否设置周期性 ALR 探测
void EnablePeriodicAlrProbing(bool enable) override;
// 报文发送到网络后回调
void OnSentPacket(const rtc::SentPacket& sent_packet, size_t size) override;
// 这个接口可以不用关注(默认没有变化)
void OnTransportOverheadChanged(
size_t transport_overhead_per_packet) override;
// 这个接口不用关注(这是 REMB 实现,我们只分析 TCC 实现)
void OnReceivedEstimatedBitrate(uint32_t bitrate) override;
// 收到 RTCP RR 处理,RTT 以及统计的丢包需要更新到 GCC
void OnReceivedRtcpReceiverReport(const ReportBlockList& report_blocks,
int64_t rtt,
int64_t now_ms) override;
// 报文发送要调用到这里,内部会保存发送报文信息
void OnAddPacket(const RtpPacketSendInfo& packet_info) override;
// 收到 TransportFeedback 处理,内部会关联之前保存的发送报文,并传递到 GCC 处理
void OnTransportFeedback(const RTC::RTCP::FeedbackRtpTransportPacket& feedback) override;
// 定时器驱动
void Process();
是不是适配 RtpTransportControllerSend 就可以了呢?还不行,还要考虑平滑发送模块的适配。mediasoup 没有实现平滑发送,原因是 mediasoup 作为一个媒体转发服务器, 主要任务是转发数据包,调整数据包的发送速率依赖于 WebRTC 客户端的能力。又因为平滑发送模块与带宽探测功能联系在一起,为此,mediasoup 重新实现了 PacedSender,保留了带宽探测能力。
2. 静态结构
GCC 功能实现需要发送端和接收端的配合,新版本使用 TransportCC 代替了之前的 REMB 方案。TransportCC 方案绝大部分计算都放在了发送端,因此,本文只讨论发送端的实现。
mediasoup 使用 TransportCongestionControlClient 来实现 GCC 发送端的功能,如下图所示。TransportCongestionControlClient 作为 mediasoup 与 GCC 之间的适配层,它继承了 webrtc::PacketRouter 接口,用生成和发送探测报文;继承了 webrtc::TargetTransferRateObserver, 用来接收 GCC 的估计带宽。
2.1. 重要属性
1)listener
Transport 继承了 RTC::TransportCongestionControlClient::Listener 接口,接收 GCC 评估的带宽;另外,带宽探测报文也需要通过 Transport 发送。
2)RtpProbationGenerator
用来生成探测用的 padding 报文。
3)TrendCalculator
用来平滑期望带宽,防止期望带宽剧烈抖动。
4)Bitrates
记录所有码率值。
2.2. 重要方法
1)TransportConnected
Transport 连接成功调用,内部会启动拥塞控制算法。
2)TransportDisconnected
Transport 连接断开调用,内部会关闭拥塞控制算法。
3)SetMaxOutgoingBitrate
设置 Transport 的最大发送码率。
4)SetMinOutgoingBitrate
设置 Transport 的最小发送码率。
5)SetDesiredBitrate
设置 Transport 期望的发送码率,conumser 的加入和退出、分层切换等都会设置新的期望带宽。
6)RescheduleNextAvailableBitrateEvent
用来控制通知 Transport 码率变化的频率,太频繁的码率变化通知,容易导致振荡。
7)InsertPacket
模拟通过 PacedSender 发送报文,PacedSender 内部不会真正发送报文。
8)PacketSent
模拟报文发送到网络后的回调。
9)ReceiveRtcpReceiverReport
处理 RTCP RR 报文,RTT 和 丢包统计用来更新 GCC 状态。
10)ReceiveRtcpTransportFeedback
处理 TransportFeedback 报文,这是 GCC 所需的最重要报文。
11)OnTimer
定时器回调,用来驱动 GCC 和 PacedSender。
3. 调用流程
3.1. 探测发包
探测报文是通过定时器驱动 PacedSender 控制发送,最终还是要回调到 Transport 发送到网络。InsertPacket 相当于模拟 WebRTC 的 EnqueuePackets,更新 GCC 内部组件的状态。
1)TransportFeedbackAdapter 需要记录发送报文,以供后面码率统计。
2)PacedSender 需要更新内部的 budget,用来控制探测码率。
3)BitrateProber 有一个奇怪行为,它需要等待报文发送才能启动带宽探测任务。
3.2. 发包回调
不管是普通的媒体报文,还是探测报文,发送到网络后都需要回调 PacketSent,更新 GCC 内部状态。
1)TransportFeedbackAdapter 更新报文发送数据。
2)GoogCcNetworkController 会更新 ALR、CongestionWindowPushbackController 等组件状态。
3)PacedSender 更新 outstanding 数据,用来判断链路拥塞状态。
3.3. TransportFeedback
TransportFeedback 是最重要的协议报文。
1)先送到 TransportFeedbackAdapter 进行预处理,匹配报文发送信息(PacingInfo)。
2)GoogCcNetworkController 会使用 TransportFeedback 包括延迟带宽估计器、丢包带宽估计器、ACK 码率估计器等在内的一堆组件。
3)PacedSender 需要将确认的报文从 outstanding 中删除。
3.4. ReceiverReport
RR 报文的处理相对简单一些,通过 RR 报文计算得到的丢包率和 RTT 会用来更新 GoogCcNetworkController。
4. 带宽应用
GCC 的评估带宽会通过 OnTargetTransferRate 回调 TransportCongestionControlClient。Transport 会基于评估带宽按照优先级重新为每个 consumer 分配码率,最终分配的码率又会设置到 GCC,限制 GCC 带宽探测和带宽评估的范围。这是有必要的,否则服务端大量带宽探测可能会导致网络拥塞。
5. PacedSender
前面提到,mediasoup 不需要实现平滑发送,但 GCC 的带宽探测的控制放在平滑发送模块,为了减少对 GCC 中带宽探测实现逻辑的破坏,mediasoup 需要实现一个仅有带宽探测功能的 PacedSender。
5.1. 实现分析
在《深入浅出WebRTC—Pacer》中有总结平滑发包模块的逻辑架构,如下图所示(WebRTC 最新源码)。具体细节就不再赘述,有需要了解的可以移步之前的文章。
mediasoup 实现的 PacedSender 相比 WebRTC 的平滑发包模块做了非常的大的简化,如下图所示。
简化的地方有以下几个方面:
1)使用定时器代替 TaskQueue 驱动发包逻辑,不是固定周期,而是动态获取超时时间。
2)由于不需要平滑发包逻辑,因此也就不需要发送队列。带宽探测是由 BitrateProber 控制,因此也不需要 media_debt_。(mediasoup 的源码中还有 media_debt_ 更新逻辑,但其实没有用到,不知道什么原因)
3)Padding 报文的生成,WebRTC 支持用媒体报文做 padding 报文,mediasoup 使用固定报文,大大简化了这部分逻辑。
5.2. 源码分析
PacedSender 实现比较简单,重要代码做了注释。不太理解的是,PacedSender 中和带宽探测无关的发送控制逻辑都没有存在的意义,但代码中还保留。
// 传入探测码率和 ID 创建探测任务
void PacedSender::CreateProbeCluster(int bitrate_bps, int cluster_id) {
prober_.CreateProbeCluster(bitrate_bps, DepLibUV::GetTimeMsInt64(), cluster_id);
}
// 暂停和恢复探测
void PacedSender::Pause() {
paused_ = true;
}
void PacedSender::Resume() {
paused_ = false;
}
// 以下三个函数用来判断链路拥塞状态
void PacedSender::SetCongestionWindow(int64_t congestion_window_bytes) {
congestion_window_bytes_ = congestion_window_bytes;
}
void PacedSender::UpdateOutstandingData(int64_t outstanding_bytes) {
outstanding_bytes_ = outstanding_bytes;
}
bool PacedSender::Congested() const {
if (congestion_window_bytes_ == kNoCongestionWindow)
return false;
return outstanding_bytes_ >= congestion_window_bytes_;
}
// 开关带宽探测
void PacedSender::SetProbingEnabled(bool enabled) {
if (packet_counter_ != 0) {
return;
}
prober_.SetEnabled(enabled);
}
// 设置平滑发送码率(其实没有意义)
void PacedSender::SetPacingRates(uint32_t pacing_rate_bps,
uint32_t padding_rate_bps) {
if (pacing_rate_bps == 0) {
return;
}
pacing_bitrate_kbps_ = pacing_rate_bps / 1000;
padding_budget_.set_target_rate_kbps(padding_rate_bps / 1000);
}
// 模拟发送报文
void PacedSender::InsertPacket(size_t bytes) {
if (pacing_bitrate_kbps_ <= 0) {
return;
}
// BitrateProber 要看到报文发送才会启动带宽探测
prober_.OnIncomingPacket(bytes);
packet_counter_++;
OnPacketSent(bytes);
}
// 不会调用
void PacedSender::SetAccountForAudioPackets(bool account_for_audio) {
account_for_audio_ = account_for_audio;
}
// 获取下一次报文发送时间
int64_t PacedSender::TimeUntilNextProcess() {
// 过去了多少时间,换算成毫秒
int64_t elapsed_time_us = DepLibUV::GetTimeUsInt64() - time_last_process_us_;
int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;
// 当暂停时,每500毫秒发送一个 keep-alive 填充包
if (paused_)
return std::max<int64_t>(kPausedProcessIntervalMs - elapsed_time_ms, 0);
// 如果我们正在探测,则返回下一个探测的时间
if (prober_.IsProbing()) {
int64_t ret = prober_.TimeUntilNextProbe(DepLibUV::GetTimeMsInt64());
if (ret > 0 || (ret == 0 && !probing_send_failure_))
return ret;
}
// 走到这里说明没有探测,那么就返回下一个处理的时间(5ms)
return std::max<int64_t>(min_packet_limit_ms_ - elapsed_time_ms, 0);
}
int64_t PacedSender::UpdateTimeAndGetElapsedMs(int64_t now_us) {
// 计算过去的时间
int64_t elapsed_time_ms = (now_us - time_last_process_us_ + 500) / 1000;
// 更新最近一次处理的时间
time_last_process_us_ = now_us;
// 如果过去的时间超过了最大时间限制,则限制为最大时间(2000ms)
if (elapsed_time_ms > kMaxElapsedTimeMs) {
elapsed_time_ms = kMaxElapsedTimeMs;
}
// 返回过去的时间
return elapsed_time_ms;
}
void PacedSender::Process() {
int64_t now_us = DepLibUV::GetTimeUsInt64();
int64_t elapsed_time_ms = UpdateTimeAndGetElapsedMs(now_us);
if (paused_) return;
// 使用平滑发送码率更新 media_budget_.
if (elapsed_time_ms > 0) {
int target_bitrate_kbps = pacing_bitrate_kbps_;
media_budget_.set_target_rate_kbps(target_bitrate_kbps);
UpdateBudgetWithElapsedTime(elapsed_time_ms);
}
// 只处理带宽探测
if (!prober_.IsProbing()) return;
PacedPacketInfo pacing_info;
absl::optional<size_t> recommended_probe_size;
// 获取探测任务数据
pacing_info = prober_.CurrentCluster().value_or(PacedPacketInfo());
recommended_probe_size = prober_.RecommendedMinProbeSize();
size_t bytes_sent = 0;
RTC::RtpPacket* padding_packet{ nullptr };
// 发送指定大小的探测报文
while (true) {
// 获取发送报文大小
size_t padding_bytes_to_add = PaddingBytesToAdd(recommended_probe_size, bytes_sent);
// 已经没有报文要发送,退出
if (padding_bytes_to_add == 0)
break;
// 生成 padding 报文
padding_packet = packet_router_->GeneratePadding(padding_bytes_to_add);
// 发送报文
packet_router_->SendPacket(padding_packet, pacing_info);
// 更新发送报文大小
bytes_sent += padding_packet->GetSize();
// 报文发送够了就退出
if (recommended_probe_size && bytes_sent > *recommended_probe_size)
break;
}
// 更新 BitrateProber
if (bytes_sent != 0) {
auto now = DepLibUV::GetTimeUsInt64();
OnPaddingSent(now, bytes_sent);
prober_.ProbeSent((now + 500) / 1000, bytes_sent);
}
}
size_t PacedSender::PaddingBytesToAdd(absl::optional<size_t> recommended_probe_size,
size_t bytes_sent) {
if (Congested()) {
return 0; // 如果链路拥塞,探测报文也不能发送
}
if (recommended_probe_size) {
if (*recommended_probe_size > bytes_sent) {
return *recommended_probe_size - bytes_sent;
}
return 0;
}
return padding_budget_.bytes_remaining();
}
void PacedSender::OnPacketSent(size_t size) {
if (first_sent_packet_ms_ == -1)
first_sent_packet_ms_ = DepLibUV::GetTimeMsInt64();
UpdateBudgetWithBytesSent(size);
}
PacedPacketInfo PacedSender::GetPacingInfo() {
PacedPacketInfo pacing_info;
// 如果没有探测,则返回空;如果有探测,则返回当前探测任务信息
if (prober_.IsProbing()) {
pacing_info = prober_.CurrentCluster().value_or(PacedPacketInfo());;
}
return pacing_info;
}
void PacedSender::OnPaddingSent(int64_t now, size_t bytes_sent) {
if (bytes_sent > 0) {
UpdateBudgetWithBytesSent(bytes_sent);
}
}
// 基于过去时间增加 budget
void PacedSender::UpdateBudgetWithElapsedTime(int64_t delta_time_ms) {
delta_time_ms = std::min(kMaxIntervalTimeMs, delta_time_ms);
media_budget_.IncreaseBudget(delta_time_ms);
padding_budget_.IncreaseBudget(delta_time_ms);
}
// 基于报文大小降低 budget
void PacedSender::UpdateBudgetWithBytesSent(size_t bytes_sent) {
outstanding_bytes_ += bytes_sent;
media_budget_.UseBudget(bytes_sent);
padding_budget_.UseBudget(bytes_sent);
}
6. 总结
本文分析了 mediasoup 拥塞控制实现,重点阐述了 TransportCongestionControlClient 如何对 GCC 进行封装和适配,并对 PacedSender 实现进行了深入分析。当然还有很多细节没有涉及,比如带宽分配逻辑,后面有时间再补充。大家在自己的项目中可以借鉴 mediasoup 集成 GCC 的方案。