深入浅出SRS—RTMP实现

news2024/9/22 13:22:18

RTMP 直播是 SRS 最典型的使用场景,客户端使用 RTMP 协议向 SRS 推流,使用 RTMP 协议从 SRS 拉流,SRS 作为一个 RTMP 直播服务器实现媒体的转发。同时,RTMP 是 SRS 的中转协议,其他协议之间的互通需要先转为 RTMP,因此,理解 SRS RTMP 直播实现是理解其他协议实现的重要前提。本文主要分析 SRS RTMP 直播功能的实现原理,相关概念和配置请参考官方文档。

1. Origin

单机版 SRS,加上一个推流端和一个拉流端,组成最简单的 RTMP 直播场景,如下图所示。其中 RtmpConn 表示一个 RTMP 连接,LiveSouce 表示一个直播资源,接下来我们来分析这个直播场景的实现。

1.1. 创建连接

在初始化阶段,SrsServer 会读取配置文件使用 SrsMultipleTcpListeners 建立监听。SrsMultipleTcpListeners 把自己装扮成一个 ISrsListener,但其内部包含的 SrsTcpListener 才是真正的监听者,支持多个监听者。

SrsTcpLisener 启动后,当有客户端连接上来,SrsTcpLisener 会通知 SrsMultipleTcpListeners,SrsMultipleTcpListeners 再通知 SrsServer,SrsServer 创建一个 SrsRtmpConn,添加到 SrsResourceManger(用来管理所有 RTMP 连接),并启动 SrsRtmpConn 事件循环。Publisher 和 palyer 连接都是相同的处理流程。

相关源码如下:

if (!resource) {
  if (listener == rtmp_listener_) {
    resource = new SrsRtmpConn(this, stfd2, ip, port);
  }
  ...
}

// 添加到容器进行集中管理
conn_manager->add(resource);

// 启动连接的事件循环,开始处理报文
ISrsStartable* conn = dynamic_cast<ISrsStartable*>(resource);
if ((err = conn->start()) != srs_success) {
  return srs_error_wrap(err, "start conn coroutine");
}

1.2. 协议交互

连接建立成功后,下一步就要按照 RTMP 协议规范,完成 handshake、connect 等协议交互,当然还会有其他参数设置。

在 SrsRtmpConn 的事件循环中,可以看到 handshake 和 connect 动作。因为 RTMP 协议是 CS 模式,不管是推流还是拉流,都是客户端主动发起连接,然后执行收发媒体之前的协议交互,处理逻辑是一样的。

srs_error_t SrsRtmpConn::do_cycle()
{
  ...

  // 完成 handshake
  if ((err = rtmp->handshake()) != srs_success) {
    return srs_error_wrap(err, "rtmp handshake");
  }

  ...

  // 完成 connect
  SrsRequest* req = info->req;
  if ((err = rtmp->connect_app(req)) != srs_success) {
    return srs_error_wrap(err, "rtmp connect tcUrl");
  }

  ...
  
  // 进入接收媒体数据循环
  if ((err = service_cycle()) != srs_success) {
    err = srs_error_wrap(err, "service cycle");
  }
  
  ...
}

1.3. 识别类型

在处理媒体数据之前,需要能识别客户端是 publisher 还是 player。SRS 根据具体协议定义了一系列连接类型,分为 publishers 和 players 两大类。我们当前分析的场景,只需要关注 SrsRtmpConnPlay 和 SrsRtmpConnFMLEPublish 两种类型即可。

enum SrsRtmpConnType
{
    SrsRtmpConnUnknown = 0x0000,
    // All players.
    SrsRtmpConnPlay = 0x0100,
    SrsHlsPlay = 0x0101,
    SrsFlvPlay = 0x0102,
    SrsRtcConnPlay = 0x0110,
    SrsSrtConnPlay = 0x0120,
    // All publishers.
    SrsRtmpConnFMLEPublish = 0x0200,
    SrsRtmpConnFlashPublish = 0x0201,
    SrsRtmpConnHaivisionPublish = 0x0202,
    SrsRtcConnPublish = 0x0210,
    SrsSrtConnPublish = 0x0220,
};

SRS 先调用 identify_client 方法识别客户端类型,然后根据客户端类型确定是做推流处理还是拉流处理。

srs_error_t SrsRtmpConn::stream_service_cycle()
{
  // 解析客户端类型
  if ((err = rtmp->identify_client(info->res->stream_id, info->type, req->stream, 
  req->duration)) != srs_success) {
      return srs_error_wrap(err, "rtmp: identify client");
  }

  ...

  // 根据客户端类型进行对应处理
  switch (info->type) {
    case SrsRtmpConnPlay: {
      ...
      return playing(source); // 拉流
    }
    case SrsRtmpConnFMLEPublish: {
      ...
      return publishing(source); // 推流
    }
    ...
  }
  ...
}

identify_client 根据收到的消息来判定客户端类型,推流端才会发送以下消息:

#define RTMP_AMF0_COMMAND_RELEASE_STREAM        "releaseStream"
#define RTMP_AMF0_COMMAND_FC_PUBLISH            "FCPublish"
#define RTMP_AMF0_COMMAND_UNPUBLISH             "FCUnpublish"

拉流端才会发送以下消息:

#define RTMP_AMF0_COMMAND_PLAY                  "play"
srs_error_t SrsRtmpServer::identify_client(int stream_id, SrsRtmpConnType& type, 
  string& stream_name, srs_utime_t& duration)
{    
  while (true) {
  SrsCommonMessage* msg = NULL;
    if ((err = protocol->recv_message(&msg)) != srs_success) {
      return srs_error_wrap(err, "recv identify message");
    }
    ...
    
    if (dynamic_cast<SrsCreateStreamPacket*>(pkt)) {
      // 内部做进一步分析
      return identify_create_stream_client(
        dynamic_cast<SrsCreateStreamPacket*>(pkt), stream_id, 3, type,
        stream_name, duration);
    }
    if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
      // type = SrsRtmpConnFMLEPublish
      return identify_fmle_publish_client(
        dynamic_cast<SrsFMLEStartPacket*>(pkt), type, stream_name);
    }
    if (dynamic_cast<SrsPlayPacket*>(pkt)) {
      // type = SrsRtmpConnPlay
      return identify_play_client(dynamic_cast<SrsPlayPacket*>(pkt), type, 
        stream_name, duration);
    }
  }
}

1.4. Publish

如果识别是连接的客户端是 publisher,进入 publishing 处理流程。

1.4.1. 准备工作

推流端发送 publish 消息,SrsRtmpConn 会创建一个 SrsLiveSource,SrsLiveSource 是直播源的逻辑实体,每个推流在 SRS 上都会有一个 SrsLiveSource 对应。

srs_error_t SrsRtmpConn::stream_service_cycle()
{
  ...

  // 可能需要创建 SrsLiveSource
  SrsLiveSource* source = NULL;
  if ((err = _srs_sources->fetch_or_create(req, server, &source)) != srs_success) {
    return srs_error_wrap(err, "rtmp: fetch source");
  }

  switch (info->type) {
    ...
    case SrsRtmpConnFMLEPublish: {
      if ((err = rtmp->start_fmle_publish(info->res->stream_id)) != srs_success) {
        return srs_error_wrap(err, "rtmp: start FMLE publish");
      }
      return publishing(source);
    }
    ...
  }

  return err;
}

可能是基于性能考量,SRS 创建了一个新的协程来专门接收推流媒体数据,一切准备妥当,通知客户端可以发送媒体数据了,进入 SrsRtmpConn 的事件循环。

srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThread* rtrd)
{
  srs_error_t err = srs_success;
  
  SrsRequest* req = info->req;
  SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish();
  SrsAutoFree(SrsPithyPrint, pprint);

  // 这里面启动了一个新的协程接收媒体数据
  if ((err = rtrd->start()) != srs_success) {
    return srs_error_wrap(err, "rtmp: receive thread");
  }

  ...

  // 发送 NetStream.Publish.Start 消息,通知客户端开始发送媒体数据
  if ((err = rtmp->start_publishing(info->res->stream_id)) != srs_success) {
    return srs_error_wrap(err, "start publishing");
  }
  
  while (true) {
    if ((err = trd->pull()) != srs_success) {
      return srs_error_wrap(err, "rtmp: thread quit");
    }

    ...
  }
  
  return err;
}

1.4.2. 接收媒体

刚刚讲到,SRS 启动一个新的协程来接收媒体数据,这个协程就是 SrsRecvThread,只是它被 SrsPublishRecvThread 包裹了一层。SrsRecvThread 启动后,使用 SrsRtmpServer 从网络接收媒体数据,收到的媒体数据经过 SrsPublishRecvThread 传递到 SrsRtmpConn。

具体来说,SrsRecvThread 调用 SrsRtmpServer::recv_message 从网络读取媒体数据,并调用 consume 接口发送到 SrsPublishRecvThread。

srs_error_t SrsRecvThread::do_cycle()
{    
  while (true) {
    ...

    // 这里的 pumper 就是 SrsPublishRecvThread
    SrsCommonMessage* msg = NULL;
    if ((err = rtmp->recv_message(&msg)) == srs_success) {
        err = pumper->consume(msg);
    }
    
    ...
  }
  
  return err;
}

SrsPublishRecvThread 只是一个二道贩子,继续把数据发送到 SrsRtmpConn 处理。

srs_error_t SrsPublishRecvThread::consume(SrsCommonMessage* msg)
{
    ...
    
    // the rtmp connection will handle this message
    err = _conn->handle_publish_message(_source, msg);
    
    ...
}

SrsRtmpConn 根据协议头判断是音频数据还是视频数据,然后将数据交给 SrsLiveSource 处理。

srs_error_t SrsRtmpConn::process_publish_message(SrsLiveSource* source, 
  SrsCommonMessage* msg)
{
  ...
  
  // process audio packet
  if (msg->header.is_audio()) {
    if ((err = source->on_audio(msg)) != srs_success) {
      return srs_error_wrap(err, "rtmp: consume audio");
    }
    return err;
  }

  // process video packet
  if (msg->header.is_video()) {
    if ((err = source->on_video(msg)) != srs_success) {
      return srs_error_wrap(err, "rtmp: consume video");
    }
    return err;
  }

  ...
}

1.4.3. 分发媒体

SrsLiveSource 收到媒体数据后(以视频数据为例),也没做太多处理,主要是进行数据分发,分发对象有三个:

1)SrsLiveConsumer:RTMP 直播观看者。

2)SrsRtcFromRtmpBridge:RTMP 转 RTC。

3)SrsOriginHub:DVR(录制)、HLS(HLS 直播观看)、DASH(DASH 直播观看)等。

srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage* msg)
{
  ...

  // 解析媒体数据
  if ((err = format_->on_video(msg)) != srs_success) {
    return srs_error_wrap(err, "format consume video");
  }
  
  ...
  
  // HUB 转发
  if ((err = hub->on_video(msg, is_sequence_header)) != srs_success) {
    return srs_error_wrap(err, "hub consume video");
  }

  // 桥接转发
  if (bridge_ && (err = bridge_->on_video(msg)) != srs_success) {
    return srs_error_wrap(err, "bridge consume video");
  }

  // RTMP player 转发
  if (!drop_for_reduce) {
    for (int i = 0; i < (int)consumers.size(); i++) {
      SrsLiveConsumer* consumer = consumers.at(i);
      if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {
        return srs_error_wrap(err, "consume video");
      }
    }
  }
  
  ...
}

SrsOriginHub 内部将媒体数据转发给 HLS、DASH、DVR 和 fowarder(Oring 热备)。

srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, 
  bool is_sequence_header)
{
  ...

  // HLS
  if ((err = hls->on_video(msg, format)) != srs_success) {
    std::string hls_error_strategy = _srs_config->get_hls_on_error(req_->vhost);
    if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) {
      srs_warn("hls: ignore video error %s", srs_error_desc(err).c_str());
      hls->on_unpublish();
      srs_error_reset(err);
    } else if (srs_config_hls_is_on_error_continue(hls_error_strategy)) {
      if (srs_hls_can_continue(srs_error_code(err), source->meta->vsh(), msg)) {
        srs_error_reset(err);
      } else {
        return srs_error_wrap(err, "hls: video");
      }
    } else {
      return srs_error_wrap(err, "hls: video");
    }
  }

  // DASH
  if ((err = dash->on_video(msg, format)) != srs_success) {
    srs_warn("dash: ignore video error %s", srs_error_desc(err).c_str());
    srs_error_reset(err);
    dash->on_unpublish();
  }

  // DVR
  if ((err = dvr->on_video(msg, format)) != srs_success) {
    srs_warn("dvr: ignore video error %s", srs_error_desc(err).c_str());
    srs_error_reset(err);
    dvr->on_unpublish();
  }
  
  // Forwarder
  if (!forwarders.empty()) {
    std::vector<SrsForwarder*>::iterator it;
    for (it = forwarders.begin(); it != forwarders.end(); ++it) {
      SrsForwarder* forwarder = *it;
      if ((err = forwarder->on_video(msg)) != srs_success) {
        return srs_error_wrap(err, "forward video");
      }
    }
  }
  
  return err;
}

1.5. Play

如果识别是连接的客户端是 publisher,进入 playing 处理流程。

1.5.1. 准备工作

播放之前需要先查找直播源,如果直播源不存在,会触发提前创建一个 SrsLiveSource,等待后面直播源连上来后发送媒体数据。

srs_error_t SrsRtmpConn::stream_service_cycle()
{
	...

	// 查找直播源(直播源没连上也没关系,会触发提前创建 SrsLiveSource)
	SrsLiveSource* source = NULL;
	if ((err = _srs_sources->fetch_or_create(req, server, &source)) != srs_success) {
		return srs_error_wrap(err, "rtmp: fetch source");
	}

	switch (info->type) {
		case SrsRtmpConnPlay: {
			// 响应客户端的 play 请求
			if ((err = rtmp->start_play(info->res->stream_id)) != srs_success) {
				return srs_error_wrap(err, "rtmp: start play");
			}
			// 进入播放处理
			return playing(source);
		}
		...
	}

	return err;
}

每一个 RTMP player,都会创建一个 SrsLiveConsumer,SrsLiveConsumer 是 RTMP player 在 SRS 上的逻辑实体。和 publishing 一样,SRS 会启动一个新的协程从客户端接收消息。

srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
{
	...
    
    // 创建一个 consumer
    SrsLiveConsumer* consumer = NULL;
    SrsAutoFree(SrsLiveConsumer, consumer);
    if ((err = source->create_consumer(consumer)) != srs_success) {
        return srs_error_wrap(err, "rtmp: create consumer");
    }
    
    // Use receiving thread to receive packets from peer.
    SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP, _srs_context->get_id());

	// 启动新的协程从客户端收取消息
    if ((err = trd.start()) != srs_success) {
        return srs_error_wrap(err, "rtmp: start receive thread");
    }
    
    err = do_playing(source, consumer, &trd);
   
    ...
}

1.5.2. 接收消息

SrsRecvThread 通过 SrsRtmpServer 从 player 接收控制消息,比如暂停、恢复等。收到的消息会放到 SrsQueueRecvThread 内部的一个队列中,SrsRtmpConn 在事件循环中从 SrsQueueRecvThread 的队列拉取消息,作相应处理,然后通过 SrsRtmpServer 向 player 发送响应。

收到的控制消息插入 SrsQueueRecvThread 队列。

srs_error_t SrsQueueRecvThread::consume(SrsCommonMessage* msg)
{
    queue.push_back(msg);
    return srs_success;
}

SrsRtmpConn 在事件循环中拉取消息并进行处理。

srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, 
  SrsQueueRecvThread* rtrd)
{
  ...
  
  while (true) {
    ...

    // 从客户端收到的消息都放在队列中,通过 pump 方法从队列中获取收到的消息
    while (!rtrd->empty()) {
      SrsCommonMessage* msg = rtrd->pump(); // SrsQueueRecvThread::pump
      if ((err = process_play_control_msg(consumer, msg)) != srs_success) {
        return srs_error_wrap(err, "rtmp: play control message");
      }
    }
    
    ...
  }
  
  return err;
}

1.5.3. 发送媒体

前面 publishing 讲过,Publiser 的媒体数据会分发到 SrsLiveConsumer。Player 的 SrsRtmpConn 事件循环从 SrsLiveConsumer 队列中拉取媒体数据,通过 SrsRtmpServer 发送到客户端。

srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, 
  SrsQueueRecvThread* rtrd)
{
  ...
  
  while (true) {
    ...

    // 拉取媒体数据
    int count = (send_min_interval > 0)? 1 : 0;
    if ((err = consumer->dump_packets(&msgs, count)) != srs_success) {
      return srs_error_wrap(err, "rtmp: consumer dump packets");
    }
    
    ...
    
    // 发送媒体数据
    if (count > 0 && (err = rtmp->send_and_free_messages(msgs.msgs, count, 
      info->res->stream_id)) != srs_success) {
      return srs_error_wrap(err, "rtmp: send %d messages", count);
    }
    
    ...
  }
  
  return err;
}

2. Edge cluster

当直播客户端越来越多,Origin 已经扛不住这么大的并发和流量,必须上集群,Edge cluster 就是这样一种集群,用来解决大量直播观众带来的挑战与问题。

2.1. Play

当某个直播源的直播观众非常多的时候,可以考虑引入 Edge cluster。流还是推到 Origin,拉流则从 Edge cluster 拉,如下图所示。

Origin 与之前保持一致,因为对于 Origin 来说,Edge 就相当于一个 player。Edge 与 Origin 相比有一些变化,变化主要在推流侧。对应 Origin 的 Client 推流,Edge 其实是使用 Ingester 从 Origin 拉流,支持使用 RMTP 和 FLV 两种拉流方式。

直播源的第一个 player 来拉流的时候,会触发 Edge 从 Origin 拉流。

srs_error_t SrsLiveSource::create_consumer(SrsLiveConsumer*& consumer)
{
    srs_error_t err = srs_success;
    
    consumer = new SrsLiveConsumer(this);
    consumers.push_back(consumer);

    // There should be one consumer, so reset the timeout.
    stream_die_at_ = 0;
    publisher_idle_at_ = 0;

    // 如果是 edge,通知 SrsPlayEdge 从 oring 拉流
    if (_srs_config->get_vhost_is_edge(req->vhost)) {
        if ((err = play_edge->on_client_play()) != srs_success) {
            return srs_error_wrap(err, "play edge");
        }
    }
    
    return err;
}

SrsEdgeIngester 内部使用 SrsBasicRtmpClient 模拟一个 RTMP player 从 Origin 拉流,拉到的媒体数据回调到 SrsLiveSource,后面的分发逻辑与 Origin 保持一致。

2.2. Publish

Edge 既可以从 Origin 拉流,也可以推流到 Origin,如下图所示。至于为什么不直接推 Origin,可能是 Origin 不可达,或者 Client 到 Edge 的网络比到 Origin 好。

处理逻辑分为三部分,如下图所示(使用不同的颜色区分)。第一部分是调用 SrsRtmpConn::acquire_publish 与 Origin 建立推送通道;第二部分是收到媒体数据插入到 SrsEdgeForwarder 的消息队列;第三部分是 SrsEdgeForwrader 事件循环从消息队列中拉取媒体数据发送到 Origin。

SRS 在处理 publish 时,Edge 和 Origin 走不同的分支。

srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source)
{
    ...

    if (info->edge) {
		// Edge 处理分支
        err = source->on_edge_start_publish();
    } else {
		// Origin 处理分支
        err = source->on_publish();
    }

    return err;
}

如果是 Edge,则会启动 SrsEdgeForwarder 推流到 Origin。

srs_error_t SrsPublishEdge::on_client_publish()
{
    ...
    
    // 启动 forwarder 推流到 origin
    err = forwarder->start();
    
    ...
}

收到媒体数据时,直接转发到 origin。

srs_error_t SrsRtmpConn::process_publish_message(SrsLiveSource* source, 
	SrsCommonMessage* msg)
{
    srs_error_t err = srs_success;
    
    // 如果是 edge,媒体数据直接转发到 origin
    if (info->edge) {
        if ((err = source->on_edge_proxy_publish(msg)) != srs_success) {
            return srs_error_wrap(err, "rtmp: proxy publish");
        }
        return err;
    }
	...
}    

3. Origin cluster

当直播源不多的时候,使用一个 Origin 就行(不考虑高可用),此时不存在负载均衡和回哪个源的问题。如果直播源越来越多,一个 Origin 承受不起并发和流量时,此时就需要使用 Origin cluster。但这也带来一个问题,直播源分布在多个 Origin 上,该如何调度直播观看端或者 Edge 到哪个 Origin 拉流。Origin Cluster 通过开放集群内资源查询接口,实现基于 RTMP302 的集群资源自动调度。

如果调度到没有此直播源的 Origin,则 Client 可能等到海枯石烂也徒劳,如下图所示。

Origin Cluster 通过开放集群内资源查询接口,实现基于 RTMP302 的集群资源自动调度。

如果本地没有直播源,SrsLiveSource 状态为 inactive。如果没有配置 Origin cluster,SRS 会等待直播源连上来;如果配置了 Origin cluster,则会向集群中其他 Origin 发起资源查询,

srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
{
    ...

	// 如果本服务是 origin,且配置了 origin 集群,且在本服务没有找到直播源
    if (!info->edge && // Origin
		_srs_config->get_vhost_origin_cluster(req->vhost) &&  // Origin cluster
		source->inactive()) {
		// 获取 origin 集群的地址
        vector<string> coworkers = _srs_config->get_vhost_coworkers(req->vhost);
	
		// 遍历集群中所有 origin 进行查询
        for (int i = 0; i < (int)coworkers.size(); i++) {
            string host; int port = 0; string coworker = coworkers.at(i);

			// 构造资源查询 URL
            string url = "http://" + coworker + "/api/v1/clusters?"
                + "vhost=" + req->vhost 
				+ "&ip=" + req->host 
				+ "&app=" + req->app 
				+ "&stream=" + req->stream
                + "&coworker=" + coworker;

			// 发起资源查询请求
            if ((err = SrsHttpHooks::discover_co_workers(url, host, port)) 
				!= srs_success) {
                // 向所有 origin 发起查询
				if (i < (int)coworkers.size() - 1) {
                    continue;
                }
				// 都查不到返回失败
                return srs_error_wrap(err, "discover coworkers, url=%s", url.c_str());
            }

			// 构造 RTMP302
            string rurl = srs_generate_rtmp_url(host, port, req->host, req->vhost, 
				req->app, req->stream, req->param);

			// 向客户端或 edge 发送 RTMP302
            bool accepted = false;
            if ((err = rtmp->redirect(req, rurl, accepted)) != srs_success) {
                srs_error_reset(err);
            } else {
				// 重定向完成后关闭连接
                return srs_error_new(ERROR_CONTROL_REDIRECT, "redirected");
            }
        }
        
        return srs_error_new(ERROR_OCLUSTER_REDIRECT, "no origin");
    }
	...
}

4. Forward

大部分场景只需要按需拉流逻辑,但也有些场景需要服务器推流,比如热备、小型集群等场景,Forward 特性补全了 SRS 的推流能力。

如下所示,中间配置了 Forward 的 Origin 可以将直播流通过 Forwarder 推给其他 Origin,没有配置 Forward 的 Origin 感知不到任何变化,从 Origin 推过来的流与客户端推过来的流是一样的处理逻辑。

当有直播源开始 publish 时,SRS 会尝试创建针对此直播源的 forwarder。

srs_error_t SrsOriginHub::on_publish()
{
    srs_error_t err = srs_success;
    
    // create forwarders
    if ((err = create_forwarders()) != srs_success) {
        return srs_error_wrap(err, "create forwarders");
    }
    
    ...
}

SrsOriginHub 读取配置创建 forwarder,SRS 支持静态和动态两种 forwarder,可以根据需求选择。所谓动态 forwarder,就是 SRS 会通过 Hook 回调应用层,应用层返回转发目的地址。所谓静态 forwarder,就是配置文件中配置的目的地址。

srs_error_t SrsOriginHub::create_forwarders()
{
    srs_error_t err = srs_success;

	// 未开启 forwarder,返回
    if (!_srs_config->get_forward_enabled(req_->vhost)) {
        return err;
    }

    // 动态 forward 配置
    bool applied_backend_server = false;
    if ((err = create_backend_forwarders(applied_backend_server)) != srs_success) {
        return srs_error_wrap(err, "create backend applied=%d", applied_backend_server);
    }

    // 动态 forward 优先,配置了动态 forward,则忽略静态 forward
    if (applied_backend_server) {
        return err;
    }

    // 静态 forward 配置
    SrsConfDirective* conf = _srs_config->get_forwards(req_->vhost);

	// 遍历 destination 配置的所有地址
    for (int i = 0; conf && i < (int)conf->args.size(); i++) {
        std::string forward_server = conf->args.at(i);

		// 创建 forwarder
        SrsForwarder* forwarder = new SrsForwarder(this);
        forwarders.push_back(forwarder);
        
        // 初始化 forwarder
        if ((err = forwarder->initialize(req_, forward_server)) != srs_success) {
            return srs_error_wrap(err, "init forwarder");
        }

        srs_utime_t queue_size = _srs_config->get_queue_length(req_->vhost);
        forwarder->set_queue_size(queue_size);
        
        if ((err = forwarder->on_publish()) != srs_success) {
            return srs_error_wrap(err, 
				"start forwarder failed, vhost=%s, app=%s, stream=%s, forward-to=%s",
                req_->vhost.c_str(), req_->app.c_str(), req_->stream.c_str(), 
				forward_server.c_str());
        }
    }
    
    return err;
}

收到直播源的媒体数据后,SRS 会将媒体数据转发给直播源的所有 fowarder。

srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, 
	bool is_sequence_header)
{
	...
    
    // copy to all forwarders.
    if (!forwarders.empty()) {
        std::vector<SrsForwarder*>::iterator it;
        for (it = forwarders.begin(); it != forwarders.end(); ++it) {
            SrsForwarder* forwarder = *it;
            if ((err = forwarder->on_video(msg)) != srs_success) {
                return srs_error_wrap(err, "forward video");
            }
        }
    }
    
    return err;
}

5. 媒体处理

5.1. Seuqence header

RTMP 协议的媒体数据使用 FLV 封装,编解码器一般选择 AVC/AAC 组合。

对于 AVC 来说,会先传输 AVC sequence header, AVC sequence header 通常包含 AVC 的配置信息,其中包括 SPS(Sequence Parameter Set)和 PPS(Picture Parameter Set)。

对于 AAC 来说,会先传输 AAC sequence header,AAC sequence header 通常包含采样率、采样位深、通道数、AOT(Audio Object Type)等一些元数据。

Sequence header 通常只在连接建立时发送一次,SRS 会解析 sequence header 并将解析结果保存到 SrsFormat,以备后用。

if ((err = format_->on_audio(msg)) != srs_success) {
	return srs_error_wrap(err, "format consume audio");
}

if ((err = format_->on_video(msg)) != srs_success) {
	return srs_error_wrap(err, "format consume video");
}

同时还会将包含 sequence header 的报文缓存起来。

if (is_sequence_header || !meta->ash()) {
	if ((err = meta->update_ash(msg)) != srs_success) {
		return srs_error_wrap(err, "meta consume audio");
	}
}

if (is_sequence_header && (err = meta->update_vsh(msg)) != srs_success) {
	return srs_error_wrap(err, "meta update video");
}

需要 sequence header 元数据的地方,如果是 RTMP 协议转发,可以直接发送缓存报文;但如果是不同协议的转发,则需要使用 sequence header 的解析结果。

比如,后面加入的 RTMP player,必须要先发送 sequence header,否则 player 无法解码。

srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
{
	...

	// 创建 consumer
	SrsLiveConsumer* consumer = NULL;
    SrsAutoFree(SrsLiveConsumer, consumer);
    if ((err = source->create_consumer(consumer)) != srs_success) {
        return srs_error_wrap(err, "rtmp: create consumer");
    }

    // 立即向 consumer 发送 sequence header
    if ((err = source->consumer_dumps(consumer)) != srs_success) {
        return srs_error_wrap(err, "rtmp: dumps consumer");
    }

	...
}

再比如,启动新的 forwrader 时,也需要发送 sequence header。

srs_error_t SrsOriginHub::on_forwarder_start(SrsForwarder* forwarder)
{
    srs_error_t err = srs_success;
    
    SrsSharedPtrMessage* cache_metadata = source->meta->data();
    SrsSharedPtrMessage* cache_sh_video = source->meta->vsh();
    SrsSharedPtrMessage* cache_sh_audio = source->meta->ash();
    
    // feed the forwarder the metadata/sequence header,
    // when reload to enable the forwarder.
    if (cache_metadata && (err = forwarder->on_meta_data(cache_metadata)) != srs_success) {
        return srs_error_wrap(err, "forward metadata");
    }
    if (cache_sh_video && (err = forwarder->on_video(cache_sh_video)) != srs_success) {
        return srs_error_wrap(err, "forward video sh");
    }
    if (cache_sh_audio && (err = forwarder->on_audio(cache_sh_audio)) != srs_success) {
        return srs_error_wrap(err, "forward audio sh");
    }
    
    return err;
}

5.2. GOP cache

GOP cache 主要用来提升直播观看的体验。如果没有开启 GOP cache,新加入的直播客户端,要等到下一个关键帧才能够播放视频;如果开启了 GOP cache,新加入的直播客户端可以立即播放视频。

GOP cap 的实现逻辑比较简单,每次收到音频或视频消息都调用 SrsGopCache::cache。

srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage* msg)
{
    ...
    
    // cache the last gop packets
    if ((err = gop_cache->cache(msg)) != srs_success) {
        return srs_error_wrap(err, "gop cache consume vdieo");
    }

	...
}

SrsGopCache 默认只缓存当前 GOP 的视频帧,同时还会处理最大缓存帧数限制。


srs_error_t SrsGopCache::cache(SrsSharedPtrMessage* shared_msg)
{
    srs_error_t err = srs_success;

	// 配置没有开启
    if (!enable_gop_cache) {
        return err;
    }
    
    // the gop cache know when to gop it.
    SrsSharedPtrMessage* msg = shared_msg;
    
    // 只缓存 H264
    if (msg->is_video()) {
        if (!SrsFlvVideo::h264(msg->payload, msg->size)) {
            return err;
        }        
        cached_video_count++;
        audio_after_last_video_count = 0;
    }
    
    // 没缓存任何视频帧
    if (pure_audio()) {
        return err;
    }
    
    // 统计自缓存上一帧视频以来收到的音频报文数量
    if (msg->is_audio()) {
        audio_after_last_video_count++;
    }
    
    // 超过 3 秒没有收到视频帧了,清空视频缓存
    if (audio_after_last_video_count > SRS_PURE_AUDIO_GUESS_COUNT) {
        srs_warn("clear gop cache for guess pure audio overflow");
        clear();
        return err;
    }
    
    // 收到关键帧刷新缓存,重新开始缓存
    if (msg->is_video() && SrsFlvVideo::keyframe(msg->payload, msg->size)) {
        clear();
        cached_video_count = 1;
    }

    // cache the frame.
    gop_cache.push_back(msg->copy());

    // 如果超过配置的最大缓存帧数,也要刷新
    if (gop_cache_max_frames_ > 0 && gop_cache.size() > (size_t)gop_cache_max_frames_) {
        clear();
    }

    return err;
}

新加入的 RTMP 直播客户端,SRS 服务器会在发送 sequence header 后,立即发送 GOP cache。

srs_error_t SrsLiveSource::consumer_dumps(SrsLiveConsumer* consumer, bool ds, bool dm, bool dg)
{
    ...

    // If stream is publishing, dumps the sequence header and gop cache.
    if (hub->active()) {
        // Copy metadata and sequence header to consumer.
        if ((err = meta->dumps(consumer, atc, jitter_algorithm, dm, ds)) != srs_success) {
            return srs_error_wrap(err, "meta dumps");
        }

        // copy gop cache to client.
        if (dg && (err = gop_cache->dump(consumer, atc, jitter_algorithm)) != srs_success) {
            return srs_error_wrap(err, "gop cache dumps");
        }
    }

    return err;
}

5.3. Chunk

RTMP 协议以 message 为处理单位,其负载采用 FLV 封装,每个视频帧被封装为一个 FLV tag。RTMP 基于 chunk 进行传输,chunk size 默认为 128 bytes,可以动态设置。如果 message 长度超过 chunk size,则 message 会被切分为多个 chunk。接收端会将多个 chunk 重组成 message 后再进行处理。

srs_error_t SrsProtocol::recv_message(SrsCommonMessage** pmsg)
{
    *pmsg = NULL;
    srs_error_t err = srs_success;
    while (true) {
        SrsCommonMessage* msg = NULL;
        if ((err = recv_interlaced_message(&msg)) != srs_success) {
            srs_freep(msg);
            return srs_error_wrap(err, "recv interlaced message");
        }
        
        // 没有收到完整的 message,继续接收
        if (!msg) {
            continue;
        }
        ...
    }
    return err;
}

一般来说,FLV 一个 tag 只会封装一个 NALU,如果一个 tag 要封装多个 NALU,则需要使用 AnnexB 格式,插入起始码。

5.4. CompositionTime

FLV 封装格式中,有 Timestamp 和 CompositionTime 两个时间,其中 Timestamp 一般当做视频帧的 DTS,PTS = Timestamp + CompositionTime。CompositonTime 是有符号整数,可以为0、正数或负数。

6. 总结

本文分析了 SRS RTMP 直播的实现,覆盖了单机、Edge 集群、Origin 集群、Forward 等多种场景,详细讲解了单机场景下全流程处理,重点分析了 RTMP 直播实现所需的媒体处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2111252.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

机器学习:数据清洗流程及完整代码实现

概述&#xff1a; 在处理数据之前&#xff0c;需要进行数据质量分析&#xff0c;了解数据的功能和作用&#xff0c;检查原始数据中是否存在脏数据。脏数据一般是指不符合要求以及不能直接进行相应分析的数据。 脏数据往往存在如下问题&#xff1a;没有列头&#xff0c;一个列有…

【CanMV K230】线段检测

【CanMV K230】线段检测 什么是线段检测线段检测应用领域1.地图解析中的道路、建筑轮廓提取2.计算机视觉中的物体识别和跟踪。3.机器人导航和环境理解。4.图像分割和特征点提取。5.建筑测量和图像重建。6.OCR&#xff08;光学字符识别&#xff09;预处理。7.行人检测和交通标志…

html+css+js网页设计 宝石及材料与工艺学专业知识科普与学习11个页面 带报告

htmlcssjs网页设计 宝石及材料与工艺学专业知识科普与学习11个页面 带报告 网页作品代码简单&#xff0c;可使用任意HTML辑软件&#xff08;如&#xff1a;Dreamweaver、HBuilder、Vscode 、Sublime 、Webstorm、Text 、Notepad 等任意html编辑软件进行运行及修改编辑等操作&a…

免费分享:2014-2018年全球5.0级及以上地震正式报目录数据集

数据详情 本数据集为2014年—2018年中国台网正式目录&#xff08;统一编目目录&#xff09;全球5.0及以上地震6459次地震数据&#xff0c;属性字段包含发震时刻、经度、纬度、深度、地震类型、震级、参考位置、事件类型等。 数据属性 数据名称&#xff1a;全球5.0级及以上地震…

【数学建模】2024数学建模国赛B题(word论文+matlab):生产过程中的决策问题

详情请见&#xff1a;https://mbd.pub/o/bread/mbd-ZpqblZ5u 文章目录 一、问题二、解答2.1 问题一2.2 问题二2.3 问题三2.4 问题四 一、问题 某企业生产某种畅销的电子产品&#xff0c;需要分别购买两种零配件&#xff08;零配件1和零配件2&#xff09;&#xff0c;在企业将两…

gcn(从空间域理解)

一、背景 常见的神经网络&#xff0c;如BP神经网络可以用来处理表格型的数据&#xff0c;卷积神经网络可以用来处理图片数据&#xff0c;循环神经网络则可以用来处理序列数据&#xff0c;这些数据都是结构化的数据&#xff0c;当我们需要处理的数据为图这种非结构化的数据&…

黑马点评9——附近商户-GEO数据结构

文章目录 GEO数据结构的基本用法导入店铺数据结构到GEO实现附件商户功能 GEO数据结构的基本用法 导入店铺数据结构到GEO 数据库里没法实现按照地理坐标排序等复杂的搜索功能&#xff0c;我们把数据存到redis中&#xff0c;只需要保存id和对应的x以及y的坐标&#xff0c;就可以…

一键部署Phi 3.5 mini+vision!多模态阅读基准数据集MRR-Benchmark上线,含550个问答对

小模型又又又卷起来了&#xff01;微软开源三连发&#xff01;一口气发布了 Phi 3.5 针对不同任务的 3 个模型&#xff0c;并在多个基准上超越了其他同类模型。 其中 Phi-3.5-mini-instruct 专为内存或算力受限的设备推出&#xff0c;小参数也能展现出强大的推理能力&#xff…

使用Cskin时候 遇到按钮有默认阴影问题解决

使用Cskin时候 遇到按钮有默认阴影 设置 DrawType 属性就可以了

【应用】浅谈大模型的应用场景

在人工智能(AI)的发展历程中&#xff0c;大模型无疑是一个重要的里程碑。这类模型通过海量数据的训练&#xff0c;其参数量都达到了千亿甚至万亿的规模&#xff0c;具备了强大的语言理解和生成能力&#xff0c;能够完成各种复杂的自然语言处理任务。 大模型在各种领域都有广泛的…

Web day0906

ok了家人们&#xff0c;今天开始学习javaweb&#xff0c;我们一起看看吧 一.WEB开发介绍 1.1 WEB介绍 Web &#xff08; World Wide Web &#xff09;即全球广域网&#xff0c;也称为万维网。简 单说&#xff0c;能够通过浏览器访问的网站。在我们日常的生活中&#xff0c;经…

【JAVA基础】StringUtils.isEmpty、StringUtils.isBlank()、Objects.isNull()三者区别

&#x1f4dd;个人主页&#x1f339;&#xff1a;个人主页 ⏩收录专栏⏪&#xff1a;日常经验 &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339;&#xff0c;让我们共同进步&#xff01; 总是区分不清楚这几个的差别&#xff1a;我们来直接验证一下&#…

【机械手控制】基于matlab 4-RPR平面机械手的可操作性、工作空间分析和路径跟踪【含Matlab源码 7422期】

✅博主简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;修心和技术同步精进&#xff0c;Matlab项目合作可私信或扫描文章底部QQ二维码。 &#x1f34e;个人主页&#xff1a;Matlab凤凰涅槃 &#x1f3c6;代码获取方式&#xff1a;扫描文章底部QQ二维码 ⛳️座右铭&…

U盘数据危机应对:详解文件或目录损坏无法读取的恢复之道

在数字化信息日益重要的今天&#xff0c;U盘作为便携存储设备&#xff0c;几乎成为了每个人工作和生活中的必需品。然而&#xff0c;当U盘突然遭遇文件或目录损坏且无法读取的困境时&#xff0c;我们往往面临数据丢失的风险&#xff0c;这不仅影响工作效率&#xff0c;还可能造…

【开学季】智慧城市入门教程福利放送,零基础入门三维WebGIS

步入九月&#xff0c;不知道有多少同学还没享受够假期 就要开始返校“直面天命” 各种大朋友、小朋友&#xff1b;大一萌新、学长学姐们也 都陆陆续续返校 小编整理了新中地公开课教程 帮助大家丝滑入门大学学习 这份开学礼包&#xff0c;请注意查收&#xff01; WebGIS三…

Xinstall如何用一个包打通多个推广路径?

在移动互联网时代&#xff0c;App的推广和运营对于开发者而言是一项至关重要的任务。然而&#xff0c;面对繁多的推广渠道&#xff0c;如何高效地管理和评估各个渠道的效果&#xff0c;成为了摆在开发者面前的一大难题。今天&#xff0c;我们就来科普一下&#xff0c;如何通过X…

13条自动化测试框架设计原则

1、代码规范 测试框架随着业务推进&#xff0c;必然会涉及代码的二次开发&#xff0c;所以代码编写应符合通用规范&#xff0c;代码命名符合业界标准&#xff0c;并且代码层次清晰。特别在大型项目、多人协作型项目中&#xff0c;如果代码没有良好的规范&#xff0c;那么整个框…

计算机毕业设计选题推荐-博物馆管理系统-Java/Python项目实战

✨作者主页&#xff1a;IT毕设梦工厂✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Py…

Helm Deploy Online Rancher v2.9.1

文章目录 准备安装查看下载 准备 $ kubectl get node NAME STATUS ROLES AGE VERSION kube-master01 Ready control-plane 19d v1.29.5 kube-node01 Ready <none> 19d v1.29.5 kube-node02 Ready <none&…

嵌入式学习(链式栈和链式队列)

栈&#xff08;stack&#xff09;是一种只能在一端插入或删除操作的线性表。 栈只能在表尾插入或删除元素&#xff0c;表尾就是栈的栈顶&#xff0c;表头就是栈底 栈的主要特点&#xff1a;LIFO(last in first out) "后进先出" 栈可以采用顺序存储结构(顺序栈) 和…