目录
一. 前言
二. WebRTC Pacer
1. 数据包传入Pacer模块的队列
2. Pacer模块取出队列的包发送
(1)什么时候取出数据包发送
(2)每次发送多少数据量
(3)避免引入较大延时的处理方法
一. 前言
实时音视频通信发送端需要一个平滑发送模块(Pacer),因为视频的关键帧比非关键帧大很多,一般一个关键帧需要打到多个 RTP 报文中,此时如果直接把所有 RTP 报文发送到网络,很容易造成网络拥塞。
WebRTC Pacer 模块的作用就是让数据在网络上发送更加平滑,防止因为数据量的突增造成网络发生拥塞,效果如下。
假设是只发送音频数据包,平滑发送模块作用较小,因为音频帧产生的时间间隔是固定的,而且一个音频帧编码后的数据不大,一般一个 RTP 报文可以承载。
二. WebRTC Pacer
1. 数据包传入Pacer模块的队列
我们以发送视频包的函数调用栈为例说明数据包是如何被传入 Pacer 模块的。
编码器完成帧图片编码后会回调 RtpVideoSender::OnEncodedImage 函数,然后调用 RTPSenderVideo::SendEncodedImage,再调用 RTPSenderVideo::SendVideo,之后调用 RTPSenderVideo::LogAndSendToNetwork,该函数会调用到 PacedSender::EnqueuePackets 将数据包通过 PacingController::EnqueuePacket 传入 Pacer 模块。
不止是视频包,对于音频包,重传包,FEC 包等都要统一通过 Pacer 模块发送,不同类型报文会区分优先级。
PacingController 中最重要的成员是 RoundRobinPacketQueue packet_queue_,packet_queue_ 是一个优先级排序的多维队列,其中重要的成员是 std::map<uint32_t, Stream> streams_,即对于相同 SSRC 的包会塞到同一个 Stream 变量中处理,Stream 结构包含了一个 PriorityPacketQueue packet_queue,它是一个按包优先级排序的队列,越优先的包越靠前,能越先发送出去。
包优先级排序规则如下:
1. 先比较包的 priority 等级,等级越小越优先
2. 如果 1 判断相等,再判断是否属于重传包,重传包比非重传包优先
3. 如果 2 判断相等,再判断包进入 Pacer 模块的顺序,例如对于视频包,越早产生的视频帧的包越早进入 Pacer 模块,同一帧的多个包,前面的包会比后面的包越早进入 Pacer 模块
包的 priority 等级定义如下,音频包 priority 最小,优先级最高,其次是重传包,接下来是视频包和 FEC 包,最低优先级的是 Padding 包。
2. Pacer模块取出队列的包发送
将数据包传入 Pacer 模块只需要在数据产生后塞入模块即可,但是从 Pacer 模块队列取出数据包需要考虑两个问题:什么时候取出数据包发送,每次发送的数据量大小如何确定。
(1)什么时候取出数据包发送
PacingController::NextSendTime 和 PacingController::ProcessPackets 是 Pacer 模块控制什么时候取出,取出多少数据的核心函数。
Pacer 有两种工作模式,一是周期模式(kPeriodic),二是动态模式(kDynamic),默认情况下使用周期模式。
在周期模式下,每个 5ms 执行 ProcessPackets 取出报文进行发送。
(2)每次发送多少数据量
WebRTC 预估带宽后会将预估带宽值传给 Pacer 模块,知道了预估带宽值以及当前时间与上次发送时间的差值就可以知道本次可以发送的数据量大小。例如预估带宽为 300kbps,周期是 5ms,那么可以发送的数据量大小为 300kbps * 5ms / 8 == 187.5 byte。
WebRTC 引入了 IntervalBudget 类用于处理每个周期可以发送多少数据量的问题,set_target_rate_kbps 用于传入预估带宽,IncreseBudget 用于增加可以发送的预留空间,例如每经过 delta_time_ms 时间,就可以增加 target_rate_kbps * delta_time_ms / 8 (bytes) 大小的数据发送,UseBudget 用于每次真正发送数据包后减少对应可以发送的预留空间,当剩余可以发送的字节小于 0,表示本周期不可以再继续发送包了。
确定了每次可以发送的数据量大小,我们再看 PacingController::ProcessPackets 是如何发送数据包的,主要代码如下,首先是为 media_budget 设置对应的目标码率以及更新可以发送的预留空间大小。
之后通过 GetPendingPacket 从 RoundRobinPacketQueue packet_queue_ 取出报文,如果成功取到数据包,则通过 packet_sender_->SendRtpPacket(std::move(rtp_packet), pacing_info) 发送,并在 PacingController::OnPacketSent 更新已经消耗的预留空间大小,如果没有取到数据包(rtp_packet == nullptr),说明本周期已经不能再发送数据包了(可能是已经达到了本周期可以发送的最大数据量,当然也可能是队列没数据或者拥塞了)。
DataSize data_sent = DataSize::Zero();
// The paused state is checked in the loop since it leaves the critical
// section allowing the paused state to be changed from other code.
while (!paused_) {
if (small_first_probe_packet_ && first_packet_in_probe) {
// If first packet in probe, insert a small padding packet so we have a
// more reliable start window for the rate estimation.
auto padding = packet_sender_->GeneratePadding(DataSize::Bytes(1));
// If no RTP modules sending media are registered, we may not get a
// padding packet back.
if (!padding.empty()) {
// Insert with high priority so larger media packets don't preempt it.
EnqueuePacketInternal(std::move(padding[0]), kFirstPriority);
// We should never get more than one padding packets with a requested
// size of 1 byte.
RTC_DCHECK_EQ(padding.size(), 1u);
}
first_packet_in_probe = false;
}
if (mode_ == ProcessMode::kDynamic &&
previous_process_time < target_send_time) {
// Reduce buffer levels with amount corresponding to time between last
// process and target send time for the next packet.
// If the process call is late, that may be the time between the optimal
// send times for two packets we should already have sent.
UpdateBudgetWithElapsedTime(target_send_time - previous_process_time);
previous_process_time = target_send_time;
}
// Fetch the next packet, so long as queue is not empty or budget is not
// exhausted.
std::unique_ptr<RtpPacketToSend> rtp_packet =
GetPendingPacket(pacing_info, target_send_time, now);
if (rtp_packet == nullptr) {
// No packet available to send, check if we should send padding.
DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);
if (padding_to_add > DataSize::Zero()) {
std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =
packet_sender_->GeneratePadding(padding_to_add);
if (padding_packets.empty()) {
// No padding packets were generated, quite send loop.
break;
}
for (auto& packet : padding_packets) {
EnqueuePacket(std::move(packet));
}
// Continue loop to send the padding that was just added.
continue;
}
// Can't fetch new packet and no padding to send, exit send loop.
break;
}
RTC_DCHECK(rtp_packet);
RTC_DCHECK(rtp_packet->packet_type().has_value());
const RtpPacketMediaType packet_type = *rtp_packet->packet_type();
DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() +
rtp_packet->padding_size());
if (include_overhead_) {
packet_size += DataSize::Bytes(rtp_packet->headers_size()) +
transport_overhead_per_packet_;
}
packet_sender_->SendRtpPacket(std::move(rtp_packet), pacing_info);
data_sent += packet_size;
// Send done, update send/process time to the target send time.
OnPacketSent(packet_type, packet_size, target_send_time);
if (recommended_probe_size && data_sent > *recommended_probe_size)
break;
if (mode_ == ProcessMode::kDynamic) {
// Update target send time in case that are more packets that we are late
// in processing.
Timestamp next_send_time = NextSendTime();
if (next_send_time.IsMinusInfinity()) {
target_send_time = now;
} else {
target_send_time = std::min(now, next_send_time);
}
}
}
last_process_time_ = std::max(last_process_time_, previous_process_time);
(3)避免引入较大延时的处理方法
引入 Pacer 模块相当于缓冲区等待发送,缓冲区的引入必然会增加延时,理想情况下预估带宽传入 Pacer 模块以及编码器模块后,编码器码率会快速收敛到预估带宽值,但是如果是短时间内依然有较大的数据量也会导致 Pacer 模块队列的包等待时间越来越大,增加了端到端延时。
WebRTC PacingController 有一个 queue_time_limit 变量(默认值为 2000ms),PacingController 会结合队列包的平均等待时间 AverageQueueTime,查看目标码率是否小于剩余包需要在 queue_time_limit-AverageQueueTime 时间内发完的码率,如果是则调高目标码率值,避免包引入较大延时。