深入浅出WebRTC—Pacer

news2024/9/22 17:32:12

平滑发包(Pacer)是 WebRTC 实现高质量实时通信不可或缺的一部分。在视频通信中,单帧视频可能包含大量的数据,如果未经控制地立即发送,可能瞬间对网络造成巨大压力。Pacer 能够根据网络条件动态调整发送速率,确保数据包以均匀且可控的速度发送,避免突发的大批量数据造成网络拥塞和数据包丢失。这样可以提升传输的稳定性,减少延迟和抖动,从而改善视频和音频的流畅度与质量。

1. 总体架构

1.1. 静态结构

1)TaskQueuePacedSender

TaskQueuePacedSender 是 PacingController 的包装器,其大部分接口直接透传到 PacingController。同时,TaskQueuePacedSender 是平滑发包的驱动器,内部使用 TaskQueue 驱动不断循环发包。

2)PacingController

PacingController 是平滑发包的控制器,用来实现指定速率的平滑发包,包含较复杂的控制逻辑,其内部使用 PrioritizedPacketQueue 缓存待发送报文。

3)BitRateProber

BitRateProber 和带宽探测相关。其接受带宽探测任务,使用平滑发包机制,按照带宽探测任务要求,控制带宽探测发包速率、发包时长、发包数量等参数。

1.2. 调用流程

下图展示的是一个典型的发包调用流程,包含一长两短三条路径,这三条路径配合实现平滑发包和带宽探测功能。

1)长路径是由 TaskQueueBase 驱动将报文插入发送队列,然后调用 NextSendTime 和 ProcessPackets 发送到期该发送的报文,然后创建一个调度任务 Post 到 TaskQueueBase 驱动循环报文发送。

2)其中一条短路径,是外部模块调用 EnqueuePackets 发送报文,TaskQueuePacedSender 将其封装成一个任务,调用 PostTask 转换到内部线程执行。

3)另外一条短路径,是外部模块调用 CreateProbeCluster 向 BitrateProber 创建带宽探测任务。

1.3. 逻辑架构

下图展示的是平滑发包逻辑架构。

1)平滑发包以 TaskQueue 进行驱动,它接受 Controller 和 BitrateProber 的控制,从 packet_queue_ 抓取报文经 PacketSender 发送到网络。

2)Controller 主要控制发包时间和发包数量,即什么时候发包以及发多少包,其控制逻辑受多个方面影响,比如外部设置的发包间隔、平滑发送码率、网络缓冲区状态、发包队列长度以及平均排队时长等。

3)BitrateProber 接受创建的带宽探测任务,它可以控制 TaskQueue 优先发送探测报文,来实现带宽探测功能。探测报文可以是媒体报文,也可以是 Padding 报文,Padding 报文需要调用 PacketSender 接口生成。

WebRTC 平滑发包是一个典型的“发送-等待”模型,如下图所示。基本逻辑是,将发送时间切分成一段一段的时间片(时间片长度不一定相等),在每个时间片的开始按照指定码率发送一定数量的报文,等待网络管道排空,然后继续下一轮发送,循环往复。这样做可以尽量保持比较均匀的发送码率,不会对网络造成冲击,同时,可以获得尽可能低延迟。

以上发包模型最关键的是计算每个时间片发送多少数据。最简单的思路就是使用固定时间片长度,发送固定数量的报文。WebRTC 最新代码已经没有使用固定周期发包模式了,采用的是动态发包周期,原因可能是固定发包周期无法满足不同发包要求:

1)带宽探测需要使用更小的发送时间间隔,以实现更准确的带宽探测。

2)高优先级报文需要立即发送,不能等待平滑发送时间片。

3)如果平滑发送速率太大,要适当调低发送时间间隔,否则有可能导致网络缓冲区溢出。

4)没有报文发送的时候,发送 keep-alive 的时间间隔不需要像发送媒体报文那么小。

动态周期发包模式引入 media_debt_ 变量来控制发包节奏,如下图所示。media_debt_ 可以认为是存在于网络缓冲区中的报文数量的一个计算值而非测量值。每次发送报文都会增加 media_debt_,增加的大小等于发送报文的大小;每经过一段时间,都会减少 media_debt_,减少的大小等于adjusted_media_rate * delta_t。当 media_debt_ 为0时,认为网络管道已经排空。

1.4. 报文类型

平滑发包会发送三类报文,分别是媒体报文、保活(keep-alive)报文和探测报文。如果基于Payload Type划分,可以分为 Audio、Video、RTX、FEC、Padding 五种类型的报文。

Keep-alive 报文属于 Padding 报文;探测报文可能是 Padding 报文,也可能是 RTX 报文。Audio、Video、RTX、FEC 都属于媒体报文。

2. TaskQueuePacedSender

2.1. 静态结构

2.1.1. 重要属性

1)pacing_controller_

执行具体平滑发包控制,包括什么时候发包,发多少包。

2)task_queue_

单线程驱动 pacing_controller_ 循环发包。

3)packet_size_

一个指数加权平均算法,用于获取平滑后的报文大小。平滑后的报文大小主要用来计算保持窗口大小。

2.1.2. 重要方法

1)EnqueuePackets

外部模块调用此接口发送报文。

2)CreateProbeClusters

创建带宽探测簇,内部调用 PacingController 对应方法。

3)SetCongested

设置链路拥塞状态,内部调用 PacingController 对应方法。

4)SetPacingRates

设置平滑发送速率,内部调用 PacingController 对应方法。

5)SetSendBurstInterval

设置平滑发包间隔,内部调用 PacingController 对应方法。

6)SetQueueTimeLimit

设置报文最大排队时间,内部调用 PacingController 对应方法。

2.2. 源码分析

2.2.1. EnqueuePackets

创建一个任务 Post 到内部线程,循环遍历所有要发送的报文,调用 PacingController 接口将报文插入发送队列。报文入队列完毕后,可能有报文需要发送,立即触发一次发包处理。

void TaskQueuePacedSender::EnqueuePackets(
  std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
    task_queue_->PostTask(
      SafeTask(safety_.flag(), [this, packets = std::move(packets)]() mutable {
        // 循环遍历所有报文
        for (auto& packet : packets) {
          // 计算报文大小:header + payload + padding
          size_t packet_size = packet->payload_size() + packet->padding_size();
          if (include_overhead_) {
            packet_size += packet->headers_size();
          }
          // 计算平滑后的报文大小:y(k) = 0.9 * y(k-1) + 0.1 * sample
          packet_size_.Apply(1, packet_size);
          // 调用 PacingController::EnqueuePacket
          pacing_controller_.EnqueuePacket(std::move(packet));
        }
        // 插入报文后立即触发一次处理
        MaybeProcessPackets(Timestamp::MinusInfinity());
      }));
}

2.2.2. MaybeProcessPackets

先检测是否有报文需要发送,如果有则循环发送这个时间片需要发送的报文,然后获取下一次报文发送时间,创建一个新的调度任务,继续下一轮报文发送,形成一个发包循环。

MaybeProcessPackets 除了自循环外,如果执行了可能导致 PacingController 内部状态变化的操作,也会被调用来触发报文发送,包括 CreateProbeClusters、SetCongested、SetPacingRates 等操作。

除了主干流程,还有几个处理细节值得分析:

1)early_execute_margin

对于探测报文,允许提前 1ms 发送和提前 1ms 调度,代码中没有明确说明这么做的原因。理论上,探测发包是按照探测目标码率进行调度,没必要特殊处理。猜测可能的原因是定时器精度存在误差,而带宽探测对发包速率要求比较高,提前发送和调度可以有效保证探测发送速率。比如本来是第 100ms 回调 MaybeProcessPackets,结果是 99ms 回调,发现未到发包时间,需要重新创建一个调度任务,如果是探测发包,索性提前发送。

2)hold_back_window

对于非探测发包,会设置一个小于 5ms 的保持窗口,两次发包时间间隔不能低于保持窗口。保持窗口受 pacing_rate 影响,pacing_rate 越高则窗口越小。由于 send_burst_interval_ 已经可以用来控制发包间隔,这里附加的发包间隔控制,更像是一个额外保护。

3)scheduled_process_time

这个参数用来区分 MaybeProcessPackets 是自循环调度还是其他调度,对于自循环调度,scheduled_process_time 是一个有效值,其他调度这个值是 Timestamp::MinusInfinity。如果是自循环调度,但是 next_process_time_ 已经改变,说明在此之前发生了其他调用,当前调度已经过时,没必要继续执行。

void TaskQueuePacedSender::MaybeProcessPackets(Timestamp scheduled_process_time) {
	...

	// 获取下次发送时间
  Timestamp next_send_time = pacing_controller_.NextSendTime();
  const Timestamp now = clock_->CurrentTime();

	// 获取提前发送间隔(探测为什么要加1ms的允许提前发送?)
  TimeDelta early_execute_margin = pacing_controller_.IsProbing()
		  ? PacingController::kMaxEarlyProbeProcessing : TimeDelta::Zero();

	// 循环发送所有需要发送的报文
  while (next_send_time <= now + early_execute_margin) {
		// 执行发送
    pacing_controller_.ProcessPackets();
		// 获取下次发送时间
    next_send_time = pacing_controller_.NextSendTime();
		// 获取提前发送间隔
    early_execute_margin = pacing_controller_.IsProbing()
				? PacingController::kMaxEarlyProbeProcessing : TimeDelta::Zero();
  }

	// 如果 scheduled_process_time 有值,说明是 TaskQueuePacedSender 循环调度
  if (scheduled_process_time.IsFinite()) {
		// 新任务修改了next_process_time_,此任务没必要再往下执行了
    if (scheduled_process_time != next_process_time_) {
      return;
    }
		// 匹配到任务时间戳,重置表示任务已经执行,没有待执行任务了
    next_process_time_ = Timestamp::MinusInfinity();
  }

  // hold_back_window 用来避免非探测状态下发送间隔太小
  TimeDelta hold_back_window = TimeDelta::Zero();
  if (!pacing_controller_.IsProbing()) {
		// 保持窗口初始化为 5ms
    hold_back_window = max_hold_back_window_;
		// 获取平滑速率
    DataRate pacing_rate = pacing_controller_.pacing_rate();		
    if (max_hold_back_window_in_packets_ != kNoPacketHoldback &&
        !pacing_rate.IsZero() &&
        packet_size_.filtered() != rtc::ExpFilter::kValueUndefined) {
			// 计算发送一个报文需要多长时间
      TimeDelta avg_packet_send_time =
        DataSize::Bytes(packet_size_.filtered()) / pacing_rate;
			// 取发送 3 个报文所需时间与 5ms 之间的更小值
      hold_back_window = std::min(hold_back_window,
        avg_packet_send_time * max_hold_back_window_in_packets_);
    }
  }

  // 计算下次发送时间,发送间隔不能小于 hold_back_window
  TimeDelta time_to_next_process =
      std::max(hold_back_window, next_send_time - now - early_execute_margin);
  next_send_time = now + time_to_next_process;

  // 尝试启动一个新任务
  if (next_process_time_.IsMinusInfinity() || // 没有待执行任务
      next_process_time_ > next_send_time) {  // 存在待执行任务,但执行时间更靠后
    task_queue_->PostDelayedHighPrecisionTask(
        SafeTask(safety_.flag(),
            [this, next_send_time]() { MaybeProcessPackets(next_send_time); }),
        time_to_next_process.RoundUpTo(TimeDelta::Millis(1)));
		// 更新下一次处理时间
    next_process_time_ = next_send_time;
  }
}

3. PacingController

3.1. 静态结构

3.1.1. 重要属性

1)packet_sender_

发送报文的对象,生成 padding 报文也是靠它。

2)prober_

带宽探测控制对象,基于平滑发送机制执行带宽探测任务。

3)packet_queue_

优先级队列,报文发送前会先插入此队列,能够保证高优先级报文先发送。

3.1.2. 重要方法

1)EnqueuePacket

将报文插入发送队列。

2)CreateProbeClusters

创建探测任务,内部调用 BitrateProber 方法。

3)SetCongested

设置拥塞状态,拥塞状态下需要停止发包,但需要继续发送 keep-alive 报文。

4)SetPacingRates

设置平滑发包速率。

5)SetSendBurstInterval

设置平滑发包时间间隔,内部可能根据状态和反馈调整发包间隔。

6)SetQueueTimeLimit

设置报文最大排队时长,如果发现按照当前发包速率,报文在队列中的排队时长会超过此设置值,则会提高发包速率。

7)NextSendTime

获取下次发包时间,TaskQueuePacedSender 调用。

8)ProcessPackets

执行发包流程,TaskQueuePacedSender 调用。

3.2. 源码分析

3.2.1. EnqueuePacket

此方法的核心逻辑是将报文插入到优先级队列,但在插入前和插入后有做一些必要处理。

1)keyframe_flushing_

如果配置了关键帧刷新,当收到关键帧的第一个报文,会检查当前队列中是否有其他关键帧报文,如果有的话,需要将之前关键帧报文全部移除,包括 RTX 报文。因为,正常情况两个关键帧相隔时间较长,如果新来的关键帧在队列中能看到上一个关键帧,说明发生网络阻塞,再发送老的关键帧会引起更大的延迟。

2)队列为空处理

如果发现队列为空,说明当前已经没有要发送的报文,立即更新 budget,提前消耗 media_dbet_,使得下次 TaskQueuePacedSender 调用 NextSendTime 时能够尽快发送报文。

3)MaybeUpdateMediaRateDueToLongQueue

报文入队列后,队列长度增加,检查是否需要调整发送码率以尽快发送队列中的报文。具体参考 MaybeUpdateMediaRateDueToLongQueue 源码分析。

void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
  // 为了尽快输出新的关键帧,清空该流中当前待处理的所有数据包。
	if (keyframe_flushing_ && // 如果配置了关键帧刷新
      packet->packet_type() == RtpPacketMediaType::kVideo && // 视频报文
      packet->is_key_frame() && // 属于关键帧报文
		  packet->is_first_packet_of_frame() && // 关键帧的第一个报文
      !packet_queue_.HasKeyframePackets(packet->Ssrc())) { // 当前队列没有关键帧报文
		// 先清空媒体报文
    packet_queue_.RemovePacketsForSsrc(packet->Ssrc());
		// 再清空关联的 RTX 报文,如果有的话
    absl::optional<uint32_t> rtx_ssrc =
        packet_sender_->GetRtxSsrcForMedia(packet->Ssrc());
    if (rtx_ssrc) {
      packet_queue_.RemovePacketsForSsrc(*rtx_ssrc);
    }
  }

	// prober 可能在等待一个足够大的报文来启动带宽探测任务
  prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size()));

	// 获取当前时间
  const Timestamp now = CurrentTime();

	// 当前队列为空,来了一个新的报文,希望能够尽快处理
  if (packet_queue_.Empty()) {
    Timestamp target_process_time = now;
    Timestamp next_send_time = NextSendTime();
    if (next_send_time.IsFinite()) {
      target_process_time = std::min(now, next_send_time);
    }
    UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_process_time));
  }

	// 报文入队列
  packet_queue_.Push(now, std::move(packet));
  seen_first_packet_ = true;

	// 队列长度增加了,检查是否需要改变平滑码率
  MaybeUpdateMediaRateDueToLongQueue(now);
}

3.2.2. MaybeUpdateMediaRateDueToLongQueue

正常情况,真实发送码率等于设置的平滑发送码率。但如果要考虑超长报文队列的影响,则需要在报文队列超长时适当调高发送码率,以排空报文队列,防止队列不断累积而导致报文被丢弃。

【注意】这里可能会导致真实发送码率大于设置的平滑发送码率。

void PacingController::MaybeUpdateMediaRateDueToLongQueue(Timestamp now) {
  // 更新为pacing rate
  adjusted_media_rate_ = pacing_rate_;

  // 如果不考虑排空超长队列,则直接使用设置的pacing rate
  if (!drain_large_queues_) {
    return;
  }

  DataSize queue_size_data = QueueSizeData();
  if (queue_size_data > DataSize::Zero()) {
    // 更新队列中报文排队时长
    packet_queue_.UpdateAverageQueueTime(now);
    
    // 计算当前报文已排队平均时长与配置的最大排队时长之间的差值
    TimeDelta avg_time_left =
        std::max(TimeDelta::Millis(1),
                 queue_time_limit_ - packet_queue_.AverageQueueTime());
    
    // 在剩余排队时长中将队列中所有报文都发送出去所需的最小码率(排队超时会导致报文丢弃)
    DataRate min_rate_needed = queue_size_data / avg_time_left;
    
    // 如果基于报文排队时长限制计算出来的码率大于带宽评估设置的码率,则更新调整后码率
    if (min_rate_needed > pacing_rate_) {
      adjusted_media_rate_ = min_rate_needed;
    }
  }
}

3.2.3. NextSendTime

计算 next_send_time 之前,先计算排空时间 drain_time,即使用当前 adjusted_media_rate_ 来排空 media_debt_ 需要多少时间,然后将 drain_time 与 send_burst_interval 进行比较。

如果 send_burst_interval > drain_time,如下图所示。表示按照当前的发送速率和发送时间间隔,在下次发送时间到来前,网络换缓冲区会被提前排干,这样会导致网络空闲一段时间,应该立即发送报文,因此将 next_send_time 设置为 last_process_time_,表示发送的报文数量不足,需要立即补发报文。

如果 send_burst_interval < drain_time,如下图所示。表示按照当前的发送速率和发送时间间隔,会导致网络缓冲区积压,这样会增大延时,继续积压可能会导致拥塞。所以应该等待足够多的时间让网络缓冲区排空后再继续发送报文。next_send_time 设置为 last_process_time_ + drain_time。

Timestamp PacingController::NextSendTime() const {
  const Timestamp now = CurrentTime();
  Timestamp next_send_time = Timestamp::PlusInfinity();

  // 已经暂停,但还需要发送keep-alive报文
  if (paused_) {
    return last_send_time_ + kPausedProcessInterval; // 500ms
  }

  // 带宽探测优先级更高,如果当前正在探测,由 BitrateProber 控制发送时间间隔
  if (prober_.is_probing() && !probing_send_failure_) {
    Timestamp probe_time = prober_.NextProbeTime(now);
    if (!probe_time.IsPlusInfinity()) {
      return probe_time.IsMinusInfinity() ? now : probe_time;
    }
  }

  // 音频报文和重传报文不做平滑,立即发送
  Timestamp unpaced_send_time = NextUnpacedSendTime();
  if (unpaced_send_time.IsFinite()) {
    return unpaced_send_time;
  }

  // 处于拥塞状态或者队列中未层插入任何报文,也要发送keep-alive报文
  if (congested_ || !seen_first_packet_) {
    return last_send_time_ + kCongestedPacketInterval; // 500ms
  }

  // 有待发送媒体报文,且媒体发送码率大于 0
  if (adjusted_media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) {
    // 计算按照当前速率排空media_debt_需要多长时间
    TimeDelta drain_time = media_debt_ / adjusted_media_rate_;

    // send_burst_interval = 40ms
    // 确保突发的数据包总数不超过kMaxBurstSize,以免在高码率下填满 socket 缓冲区。
    // 换种说法,如果码率太高的话,发送时间间隔要相应调小一些。
    TimeDelta send_burst_interval =
        std::min(send_burst_interval_, kMaxBurstSize / adjusted_media_rate_);

    // send_burst_interval >  drain_time:说明网络缓冲区欠载,可以立即发送报文
    // send_burst_interval <= drain_time: 说明缓冲区过载或稳定,尽量保持网络缓冲区稳定
    next_send_time = last_process_time_ +
        ((send_burst_interval > drain_time) ? TimeDelta::Zero() : drain_time);
    
  // 无待发送媒体报文,且 padding 码率大于 0
  } else if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) {
    // 计算排空时间(取媒体数据和 Padding 数据排空的最大值)
    TimeDelta drain_time = std::max(media_debt_ / adjusted_media_rate_,
                                    padding_debt_ / padding_rate_);

    if (drain_time.IsZero() && (!media_debt_.IsZero() || !padding_debt_.IsZero())) {
      // 有非零的 debt,但排空时间为 0,取最小的非零时间增量
      drain_time = TimeDelta::Micros(1); // 1us
    }
    next_send_time = last_process_time_ + drain_time;
  } else {
    // Nothing to do.
    next_send_time = last_process_time_ + kPausedProcessInterval;
  }

  if (send_padding_if_silent_) {
    next_send_time =
        std::min(next_send_time, last_send_time_ + kPausedProcessInterval);
  }

  return next_send_time;
}

3.2.4. ProcessPackets

此方法的主要逻辑是循环发送报文。对于带宽探测,发送指定大小的报文后停止;对于非带宽探测,根据 NextSendTime 退出。几个处理细节需要关注:

1)GetPendingPacket

带宽探测是先用媒体报文来探测,如果媒体报文发送完了,则会调用 PacketRouter::GeneratePadding 获取 padding 报文来填充探测码率。

2)UpdateTimeAndGetElapsed

代码中使用 target_send_time 来更新 media_dbet_而不是用 now 来更新 media_dbet_。在每一轮发送循环中,会调用 UpdateBudgetWithElapsedTime 来消耗 media_dbet_,从最终结果看,效果是一样的。 (具体原因待分析)

3)circuit_breaker_threshold_

这是一个保护阀,避免出现异常情况,进入死循环。

void PacingController::ProcessPackets() {
  ...

  TimeDelta early_execute_margin =
      prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero();

  // 获取发送时间
  target_send_time = NextSendTime();
  if (now + early_execute_margin < target_send_time) {
    // 虽然未到发送时间,但还是要处理 dbet
    UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(now));
    return;
  }

  // 到了要发送的时间,使用 target_send_time 来更新 dbet,而不是 now
  TimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_send_time);
  if (elapsed_time > TimeDelta::Zero()) {
    UpdateBudgetWithElapsedTime(elapsed_time);
  }

  PacedPacketInfo pacing_info;
  DataSize recommended_probe_size = DataSize::Zero();
  bool is_probing = prober_.is_probing();

  // 如果正在探测中,则先发送探测报文
  if (is_probing) {
    pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo());
    if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) {
      // 一轮探测至少要发送数据量
      recommended_probe_size = prober_.RecommendedMinProbeSize();
    } else {
      // No valid probe cluster returned, probe might have timed out.
      is_probing = false;
    }
  }

  DataSize data_sent = DataSize::Zero();
  int iteration = 0;
  int packets_sent = 0;
  int padding_packets_generated = 0;

  // circuit_breaker_threshold_ 是一个保护阀
  for (; iteration < circuit_breaker_threshold_; ++iteration) {
    // 从队列中获取待发送媒体报文
    std::unique_ptr<RtpPacketToSend> rtp_packet =
        GetPendingPacket(pacing_info, target_send_time, now);
    if (rtp_packet == nullptr) {
      // 没有待发送媒体报文,则发送 Padding 报文
      ...
    } else { // 获取到待发送媒体报文
      const RtpPacketMediaType packet_type = *rtp_packet->packet_type();
      DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() +
                                             rtp_packet->padding_size());

      if (include_overhead_) {
        packet_size += DataSize::Bytes(rtp_packet->headers_size()) +
                       transport_overhead_per_packet_;
      }

      // pacing_info 会与被发送的报文一起被记录下来
      packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);

      // 如果有 FEC 报文,则将 FEC 报文入队列
      for (auto& packet : packet_sender_->FetchFec()) {
        EnqueuePacket(std::move(packet));
      }

      // 更新统计数据
      data_sent += packet_size;
      ++packets_sent;

      // 发送完成,更新发送时间和 media_debt_
      OnPacketSent(packet_type, packet_size, now);

      if (is_probing) {
        // 累加已发送数据
        pacing_info.probe_cluster_bytes_sent += packet_size.bytes();
        // 发送数据以达到设定目标,退出循环
        if (data_sent >= recommended_probe_size) {
          break;
        }
      }

      // 获取下一次发送时间
      target_send_time = NextSendTime();

      // 还未到下一次发送时间
      if (target_send_time > now) {
        // 如果是媒体报文,直接退出
        if (!is_probing) {
          break;
        }
        // 探测报文继续发送
        target_send_time = now;
      }

      // 更新 debt
      UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_send_time));
    }
  }

  if (iteration >= circuit_breaker_threshold_) {
    last_send_time_ = now;
    last_process_time_ = now;
    return;
  }

  // 如果发送的是探测报文,则需要更新探测器的状态
  if (is_probing) {
    probing_send_failure_ = data_sent == DataSize::Zero();
    if (!probing_send_failure_) {
      prober_.ProbeSent(CurrentTime(), data_sent);
    }
  }

  MaybeUpdateMediaRateDueToLongQueue(CurrentTime());
}

4. PrioritizedPacketQueue

PrioritizedPacketQueue 用来存放待发送报文,支持按照优先级存取,以保证高优先级报文能够更早发送出去。比如重传报文应该尽快发送而不是在队列尾部老老实实的排队,否则可能黄花菜都凉了。

4.1. 静态结构

4.1.1. 重要属性

1)streams_

streams_ 是一个 unordered_map,key 是 SSRC,value 是一个 StreamQueue。StreamQueue 是一个包含 5 个元素的 deque 数组,数组下标即为优先级,deque 中保存的对应优先级的报文。

2)streams_by_prio_

streams_by_prio_ 也是一个包含 5 个元素的 deque 数组,数组下标也表示优先级,但是 deque 中保存的是 StreamQueue 指针,表示 StreamQueue 存在对应优先级的报文。

3)top_active_prio_level_

top_active_prio_level_是一个 int 类型数据,指向当前存在数据的最高优先级(数值越小优先级越高)。

4.1.2. 重要方法

1)Push

向队列中插入一个报文,内部会根据报文的 ssrc 和预先定义的优先级插入合适的地方。

2)Pop

从队列中取出一个报文,内部会获取最高优先级的报文返回。

4.2. 优先级定义

报文优先级定义如下,数字越小优先级越高(参考函数GetPriorityForType)。

0 - Audio
1 - Audio RTX
2 - Video RTX
3 - Video and FEC
4 - Padding

4.3. 数据结构

下图是 PrioritizedPacketQueue 的一个典型示例,streams_ 保存了两个 SSRC 的报文,SSRC1 没有 P1 和 P3 两个优先级的报文,SSRC2 没有 P0 和 P4 两个优先级的报文。streams_by_prio_中,只有 P2 优先级保存了 A、B 两个StreamQueue 的指针,其他优先级都只有一个 StreamQueue 指针。当前最高优先级报文为 P0,所以 top_active_prio_level_ 指向 P0。

Push操作

SSRC1 没有 P1 优先级的报文,所以在 streams_by_prio_ 的优先级 P1 没有保存 A 的指针,假设现在向 SSRC1 插入一个 P1 优先级的报文,则需要在 streams_by_prio_ 的 P1 优先级加上 A 的指针,根据先来后到规则,A 排在 B 之后。此时 top_active_prio_level_ 仍然指向 P0。

Pop操作

SSRC1 的 P0 优先级存在一个报文,streams_by_prio_ 中的 P0 队列中保存了 A 的指针。假设现在要取一个报文去发送,根据 top_active_prio_level_ 指向的优先级,应该从 SSRC1 的 P0 队列 pop 一个报文。pop以后,A 的 P0 队列为空,此时,要从 streams_by_prio_ 的 P0 队列中将 A 移除,这样 streams_by_prio_ 的 P0 队列也空了,top_active_prio_level_ 指向当前最高优先级 P1。

5. BitrateProber

带宽探测不能无所顾忌的发包,需要复用平滑发送机制。BitrateProber 负责带宽探测的管理和控制,它接收创建的带宽探测任务,按照任务目标和要求执行探测动作,并控制探测的生命周期。

5.1. 静态结构

5.1.1. 重要属性

1)probing_state_

带宽探测状态:{ kDisabled, kInactive, kActive },kActive 表示正在探测中。

2)clusters_

用来保存待执行的探测任务,这些探测任务会从头开始一个一个顺序执行,默认最大探测任务数为 5。

3)next_probe_time_

下一次发送探测包时间,用来控制探测发包的频率。

4)config_

探测发包相关配置。

struct BitrateProberConfig {
  FieldTrialParameter<TimeDelta> min_probe_delta; // 默认 2ms
  FieldTrialParameter<TimeDelta> max_probe_delay; // 默认 10ms
  FieldTrialParameter<DataSize> min_packet_size;  // 默认 200Bytes
};

5.1.2. 重要方法

1)OnIncomingPacket

BitrateProber 会监听所有插入发送队列的报文,当发现有足够大的发送报文时,才会启动带宽探测任务。代码注释中说这样能够得到更加准确的探测结果,具体原因未知。

2)CreateProbeClusters

创建带宽探测任务,探测任务会被插入任务队列等待执行。

3)NextProbeTime

获取下一次发送探测包的时间。

4)RecommendedMinProbeSize

获取一轮发送探测包大小。

5)ProbeSent

通知一轮探测报文发送完毕,内部会更新下一次探测时间,并判断探测任务是否已经完成。

5.2. 数据结构

5.2.1. ProbeClusterConfig

创建探测任务传入的参数,指定了探测码率、探测时长和探测次数(一个 burst 算一次)

struct ProbeClusterConfig {
  // 探测任务创建时间
  Timestamp at_time = Timestamp::PlusInfinity();
  // 要求的探测速率
  DataRate target_data_rate = DataRate::Zero();
  // 要求的探测时长(默认为 15ms,由 ProbeController 设置)
  TimeDelta target_duration = TimeDelta::Zero();
  // 探测次数(默认为 5,由 ProbeController 设置)
  int32_t target_probe_count = 0;
  // 探测任务 ID,由 ProbeController 分配
  int32_t id = 0;
};

5.2.2. ProbeCluster

BitrateProber 将 ProbeClusterConfig 转化为内部的 ProbeCluster 数据结构。BitrateProber 不再使用 target_duration,将其转换为 probe_cluster_min_bytes。

struct PacedPacketInfo {
  static constexpr int kNotAProbe = -1;
  // 来自 ProbeClusterConfig::target_data_rate
  DataRate send_bitrate = DataRate::BitsPerSec(0);
  // 来自 ProbeClusterConfig::id
  int probe_cluster_id = kNotAProbe;
  // 来自 ProbeClusterConfig::target_probe_count
  int probe_cluster_min_probes = -1;
  // 来自 ProbeClusterConfig::target_data_rate * ProbeClusterConfig::target_duration
  int probe_cluster_min_bytes = -1;
  // 发送的总字节数
  int probe_cluster_bytes_sent = 0;
};

struct ProbeCluster {
  PacedPacketInfo pace_info;
  // 已发送了几轮探测报文
  int sent_probes = 0;
  // 已经发送的探测报文数据量
  int ProbeCluster = 0;
  // 来自 ProbeClusterConfig::at_time
  Timestamp requested_at = Timestamp::MinusInfinity();
  // 开始发送探测报文时间
  Timestamp started_at = Timestamp::MinusInfinity();
  // 这个字段目前没用到
  int retries = 0;
};

PacedPacketInfo::probe_cluster_bytes_sent 是 ProbeCluster::ProbeCluster 的一个拷贝。

absl::optional<PacedPacketInfo> BitrateProber::CurrentCluster(Timestamp now) {
	...

  PacedPacketInfo info = clusters_.front().pace_info;

  // 将 ProbeCluster::sent_bytes 拷贝到 PacedPacketInfo::probe_cluster_bytes_sent
  info.probe_cluster_bytes_sent = clusters_.front().sent_bytes;
  return info;
}

5.3. 生命周期

5.3.1. 探测开始

调用 CreateProbeClusters 创建探测任务,探测任务并不会立即执行,BitrateProber 会等待一个足够大的报文来启动探测任务。

void BitrateProber::OnIncomingPacket(DataSize packet_size) {
  if (ReadyToSetActiveState(packet_size)) {
    next_probe_time_ = Timestamp::MinusInfinity();
    probing_state_ = ProbingState::kActive;
  }
}

bool BitrateProber::ReadyToSetActiveState(DataSize packet_size) const {
  if (clusters_.empty()) {
    return false;
  }
  switch (probing_state_) {
    case ProbingState::kDisabled: // 已经关闭
    case ProbingState::kActive:   // 已经处于active状态
      return false;
    case ProbingState::kInactive:
      // min_packet_size = 200B
      return packet_size >=
        std::min(RecommendedMinProbeSize(), config_.min_packet_size.Get());
  }
}

5.3.2. 报文发送

1)什么时间发送

与媒体报文的发送控制逻辑不一样,探测报文的发送控制不需要考虑网络管道的拥塞、发送队列的积压等情况,严格按照探测任务要求的速率发送数据。
 

Timestamp BitrateProber::CalculateNextProbeTime(const ProbeCluster& cluster) const 
{
	...

  DataSize sent_bytes = DataSize::Bytes(cluster.sent_bytes);
  DataRate send_bitrate = cluster.pace_info.send_bitrate;

  // 这里计算距离开始发送报文的相对时间,尽力保证目标探测速率
  TimeDelta delta = sent_bytes / send_bitrate;
  return cluster.started_at + delta;
}

每次发送完数据后,相对探测任务发送起点,使用探测码率重新计算 NextProbeTime,尽量保证发送码率符合探测任务要求。

2)发送多少数据

发送探测报文前,会获取这一轮探测需要发送数据量大小:recommended_probe_size。

if (is_probing) {
  pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo());
  // probe_cluster_id 是否有效
  if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) {
    recommended_probe_size = prober_.RecommendedMinProbeSize();
  } else {
    is_probing = false;
  }
}

recommended_probe_size = 设置的探测目标速率 x 配置的每轮探测最小时间间隔,带宽探测使用的发送时间间隔与媒体报文发送时间间隔 send_burst_interval_ 不一样,默认 2ms 的发送间隔感觉挺恐怖。

DataSize BitrateProber::RecommendedMinProbeSize() const {
  if (clusters_.empty()) {
    return DataSize::Zero();
  }
  // send_bitrate 是探测目标码率
  DataRate send_rate = clusters_.front().pace_info.send_bitrate;
  // 使用配置值:min_probe_delta = 2ms
  return send_rate * config_.min_probe_delta;
}

PacingController 保证会在一个 burst 发送指定数量的数据。

if (is_probing) {
  pacing_info.probe_cluster_bytes_sent += packet_size.bytes();
  // 如果是带宽探测,至少发送这么多数据才能退出发送循环。
  if (data_sent >= recommended_probe_size) {
    break;
  }
}

5.3.3. 探测结束

PacingController 每轮发送完以后,会调用 BitrateProber::ProbeSent,更新总共发送了多少数据和发送了多少轮。当达到任务设置的发送数据大小和发送次数要求,探测结束,cluster 被从队列中移除。

void BitrateProber::ProbeSent(Timestamp now, DataSize size) {
  ...

  ProbeCluster* cluster = &clusters_.front();

  // 第一次发送
  if (cluster->sent_probes == 0) {
    cluster->started_at = now;
  }

  // 累加发送的字节数和探测的次数
  cluster->sent_bytes += size.bytes<int>();
  cluster->sent_probes += 1;

  // 根据要求的探测速率计算下一次探测时间
  next_probe_time_ = CalculateNextProbeTime(*cluster);

  // Cluster已经完成,发送数据和发送次数都需要达成目标
  if (cluster->sent_bytes >= cluster->pace_info.probe_cluster_min_bytes &&
      cluster->sent_probes >= cluster->pace_info.probe_cluster_min_probes) {
    clusters_.pop();
  }

  // 没有探测任务了,更新探测状态
  if (clusters_.empty()) {
    probing_state_ = ProbingState::kInactive;
  }

  ...
}

6. 总结

本文详细分析了 WebRTC 平滑发送模块的整体框架和实现原理,并对重要的数据结构和逻辑进行了深入剖析。平滑发送模块设计的非常灵活,采用动态发包周期和漏桶控制机制,能够满足媒体报文发送、带宽探测、高优先级报文优先发送等多种发送要求。不过,相比于固定发包周期,这种设计实现起来更加复杂性,大家在借鉴的时候,要充分评估、小心谨慎。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1944814.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springboot校园车辆管理系统-计算机毕业设计源码63557

校园车辆管理系统 摘 要 校园车辆管理系统是当前高校校园管理中的一个重要方面&#xff0c;其有效管理和调度对于提升校园的运行效率和管理水平至关重要。本论文基于Spring Boot框架开发了一套校园车辆管理系统&#xff0c;系统主要包括用户和管理员两大角色&#xff0c;涵盖…

pytest使用

主要技术内容 1.pytest设计 接口测试 框架设想 common—公共的东西封装 1.request请求 2.Session 3.断言 4.Log 5.全局变量 6.shell命令 ❖ config---配置文件及读取 ❖ Log— ❖ payload—请求参数—*.yaml及读取 ❖ testcases—conftest.py; testcase1.py…….可…

Chapter 14 Python数据容器总结

欢迎大家订阅【Python从入门到精通】专栏&#xff0c;一起探索Python的无限可能&#xff01; 文章目录 前言一、数据容器对比二、数据容器的通用操作【拓展】字符串大小比较 前言 Python 作为一种高级编程语言&#xff0c;内置了多种强大的数据容器&#xff0c;帮助开发者以更…

【GoLang】Golang 快速入门(第一篇)

目录 1.简介&#xff1a; 2.设计初衷&#xff1a; 3.Go语言的 特点 4.应用领域: 5.用go语言的公司&#xff1a; 6. 开发工具介绍以及环境搭建 1.工具介绍: 2.VSCode的安装: 3.安装过程&#xff1a; 4.Windows下搭建Go开发环境--安装和配置SDK 1.搭建Go开发环境 - 安装…

【洛谷】P1088 [NOIP2004 普及组] 火星人——C++

本题我们会用到函数next_permutation(start,end),是头文件algorithm标准库中的一个标准函数&#xff0c;用来表示[start,end]内存的数组中产生一个字典排序&#xff0c;比如[1 , 2 ,3]到[2 ,3, 1]再到[3 , 1, 2]这样的&#xff0c;这个函数的复杂度为&#xff08;n!&#xff09…

Rust代码答疑报错|Python一对一辅导答疑

Question 你好&#xff0c;我是悦创。 学员答疑&#xff1a; https://code.bornforthis.cn/?id4e72084d-1eaf-44ed-8067-744671491574https://code.bornforthis.cn/?id664ff169-41d6-409f-a05b-02ed42279759 问题代码&#xff1a; // You can bring module paths into sc…

开发日志:windows修复SSL漏洞CVE-2016-2183(3389端口)

漏洞危害&#xff1a; 具有足够资源的中间人攻击者可利用此漏洞,通过“birthday”攻击检测会在固定密码与已知纯文本之间泄露 XOR 的冲突,进而泄露密码文本(例如安全 HTTPS Cookie),并可能导致劫持经认证的会话。 参见《支持SSL 64位块大小的密码套件(SWEET32)-修复方案》 参考…

html 常用css样式及排布问题

1.常用样式 <style>.cy{width: 20%;height: 50px;font-size: 30px;border: #20c997 solid 3px;float: left;color: #00cc00;font-family: 黑体;font-weight: bold;padding: 10px;margin: 10px;}</style> ①宽度&#xff08;长&#xff09; ②高度&#xff08;宽&a…

pikachu 之CSRF(跨站请求伪造)get和post型

CSRF&#xff08;跨站请求伪造&#xff09; 概念 跨站请求伪造&#xff08;Cross-Site Request Forgery&#xff0c;简称CSRF&#xff09;是一种攻击方式&#xff0c;攻击者通过伪造用户的请求&#xff0c;欺骗受害者在不知情的情况下执行不想要的操作。这种攻击利用了用户已经…

使用python连接neo4j时报错:IndexError: pop from an empty deque的解决办法

遇见这个错&#xff0c;首先可能是python现在的py2neo的版本不对&#xff0c;把2021.1.0版本卸载&#xff0c;下载 py2neo4.2.0版本。我不是&#xff0c;一阵搜&#xff0c;发现需要改配置文件 首先找到你的neo4j的安装路径 在网上看的是&#xff0c;先找到data/dbms/auth文件…

产品升级|宏基因组产品增加新成员:多酚代谢注释数据库

多酚&#xff08;polyphenol&#xff09;是一种化学多样且丰富的植物衍生化合物&#xff0c;包括10000多个化学式和几个结构家族&#xff0c;包括聚合物&#xff08;例如单宁&#xff09;、单体&#xff08;例如类黄酮&#xff09;和简单酚类&#xff08;例如酚酸&#xff09;。…

nginx漏洞修复 ngx_http_mp4_module漏洞(CVE-2022-41742)【低可信】 nginx版本升级

风险描述&#xff1a; Nginx 是一款轻量级的Web服务器、反向代理服务器。 Nginx 的受影响版本中的ngx _http_mp4_module模块存在内存越界写入漏洞&#xff0c;当在配置中使用 mp4 directive时&#xff0c;攻击者可利用此漏洞使用使用ngx_http_mp4_module模块处理特制的音频或视…

服务攻防-框架安全(漏洞复现)

关闭靶场 sudo docker-compose down 运行此靶场 sudo docker-compose up -d 查看启动环境 sudo docker ps 运行dockers容器 docker exec -it 64052abd288b /bin/bash thinkphp框架 thinkphp 2 - rce漏洞复现 docker exec -it 731dbae0e0b5 /bin/bash 集成化工具扫描 可以命令…

【豆包Marscode体验官】揭秘MarsCode AI编辑助手:高效智能编辑新纪元之入门指导与最佳实践

文章目录 1. 概述2. 工具使用过程2.1 MarsCode插件简介2.2 安装和配置2.2.1 安装MarsCode插件2.2.2 配置MarsCode插件 2.3 各个功能的使用2.3.1 代码补全2.3.2 代码补全 Pro【操作提示&#xff0c;No suggestion from Model&#xff0c;不知道是不是版本的问题】2.3.3 代码生成…

爬虫学习2:爬虫爬取网页的信息与图片的方法

爬虫爬取网页的信息与图片的方法 爬取人物信息 import requestshead {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/126.0.0.0" } # 这是get请求带参数的模式…

boost::asio网络编程

目录 基础操作端点的生成创建socket服务端创建acceptor用于监听传入的连接请求 并 接受连接服务器绑定端口客户端通过ip连接服务端 connect客户端通过域名连接服务端服务器监听 接受连接读写buffer同步的读写实现一个同步读写的服务器与客户端应答 异步读写函数异步写异步读官方…

冒泡排序(数组作为函数参数)

什么是冒泡排序&#xff1f; 冒泡排序&#xff08;Bubble Sort&#xff09;也是一种简单直观的排序算法。它重复地走访过要排序的数列&#xff0c;一次比较两个元素&#xff0c;如果他们的顺序错误就把他们交换过来。走访数列的工作是重复地进行直到没有再需要交换&#xff0c;…

CVE-2020-7248 OpenWRT libubox标记二进制数据序列化漏洞(更新中)

提要 该文档会一直处于更新当中&#xff0c;当状态为完毕后&#xff0c;才是更新完成。由于网络上关于该漏洞原理的分析文档和资源实在是太少&#xff0c;而本人关于该方向也才是刚入门&#xff0c;能力有限&#xff0c;所以复现需要的时间较长&#xff0c;需要补充和学习的东西…

中国的AI技术水平,是否已经到了能够帮助创作者实现内容的程度?

AI视频生成&#xff1a;小说文案智能分镜智能识别角色和场景批量Ai绘图自动配音添加音乐一键合成视频百万播放量https://aitools.jurilu.com/ 是的&#xff0c;早就实现了&#xff0c;我从上年开始用AI辅助写小说&#xff0c;已经挣了2万多。 这是我在番茄那开的小号&#xff0…

Redis的集群模式

1. Redis三种集群模式 Redis 提供的三种集群模式各有其特点和适用场景&#xff0c;以下是对这三种模式的简要概述&#xff1a; 主从模式&#xff08;Master-Slave Replication&#xff09;&#xff1a; 在这种模式下&#xff0c;数据在主节点&#xff08;Master&#xff09;上…