libuv 是一个跨平台的异步事件驱动库,用于构建高性能和可扩展的网络应用程序。mediasoup 基于 libuv 构建了包括管道、信号和 socket 在内的一整套通信框架,具有单线程、事件驱动和异步的典型特征,是构建高性能 WebRTC 流媒体服务器的重要基础,本文主要分析 mediasoup 对 libuv 的封装。
1. Pipe 通信
Node.js 进程与 worker 进程之间使用管道通信,而且是双向通信。node.js 进程通过管道向 worker 进程发送请求,并接收响应。worker 进程也可以主动向 node.js 进程发送通知消息。
1.1. 文件描述符
管道通信需要使用两个文件描述符,node.js 进程的文件描述符定义如下:
this.#channel = new Channel({
producerSocket: this.#child.stdio[3],
consumerSocket: this.#child.stdio[4],
pid: this.#pid,
});
worker 进程的文件描述符定义如下:
static constexpr int ConsumerChannelFd{ 3 };
static constexpr int ProducerChannelFd{ 4 };
1.2. 静态结构
worker 进程对管道通信的封装看起来比较复杂,涉及到多个类,如下图所示。由于这里面糅合了几个逻辑,拆解以后会更好理解:
1)UnixStreamSocketHandle 封装了基于 libuv 的 pipe 通信能力,内部包含 libuv 句柄。
2)ChannelSocket 内部包含的 ConsumerSocket 和 ProducerSocket 对应管道通信的读和写两个方向。ChannelSocket 继承了 ConsumerSocket::Listener,从 ConsumerSocket 收到的管道消息,都会回调到 ChannelSocket。
3)全局只有一个 ChannelSocket 对象,被 Worker 持有。Worker 继承了 ChannelSocekt::Listener,ChannelSocket 收到的所有管道消息都会回调 Worker。
4)Worker 包含了一个 Shared 对象,从名字上能看出,这是一个“共享对象”,通过传参的方式共享给各个对象,本质上就是一个全局对象。
5)Shared 内部包含两个对象:ChannelMessageRegistor 和 ChannelNotifier。ChannelMessageRegistor 用来管理管道消息处理器,因为全局就一个 ChannelSocket 对象,所有需要处理管道消息的对象都要把自己注册到 ChannelMessageRegistor,Worker 根据注册信息把管道消息分发给各个处理器。ChannelNotifier 用来发送管道消息,其内部也是使用 ChannelSocket 来发送消息,所有对象需要向 Node.js 进程发送管道消息调用 ChannelNotifier 接口即可。
1.3. 数据流
管道通信的数据流如下图所示。接收到的管道消息会一层层回调到 Worker 对象,Worker 先对消息进行过滤,如果是 Worker 自己关注的消息,自己先处理,其他消息则根据“注册表”进行路由。发送管道消息,调用 ChannelNotifier::Emit 接口,最终通过 libuv 发送出去。
2. Socket 通信
Socket 通信主要用来处理 mediasoup worker 与 WebRTC 客户端之间的媒体通信,支持 TCP 和 UDP。
2.1. 静态结构
2.1.1. UDP
1)UdpSocketHandle 封装了基于 libuv 的 UDP 通信能力,内部包含 libuv 句柄。
2)UdpSocket 继承自 UdpSocketHandle,内部包含了一个数据监听对象,用来接收 UDP 消息。
2)PipeTransport、PlainTransport、WebRtcTransport 和 WebRtcServer 都支持 UDP 通信,它们内部都包含一个指向 UdpSocket 的指针,用来发送 UDP 消息。
【注】这里的 PipeTransport 并不是使用管道通信的 transport。
2.1.2. TCP
1)TcpServerHandle 封装了基于 libuv 的 TCP 监听能力,内部包含 libuv 句柄。
2)TcpConnectionHandle 封装了基于 libuv 的 TCP 通信能力,内部包含 libuv 句柄。TCP 连接中断会通过 OnTcpConnectionClosed 通知 TcpServerHandle。
3)TcpConnection 继承自 TcpConnectionHandle,收到 TCP 报文会回调连接监听者。
4)当前只有 WebRtcServer 和 WebRtcTransport 支持 TCP 通信。
【注】WebRtcServer 用来实现端口聚合,其上可以承载多个 WebRtcTransport。
2.2. Socket 创建
2.2.1. WebRtcServer
WebRtcServer 用来实现 WebRTC 连接的端口聚合,WebRtcTransport 可以运行在 WebRtcServer 之上,共享 WebRtcServer 的端口。
WebRtcServer 根据传入的参数,决定创建 UdpSocket 还是 TcpServer,支持指定端口或端口范围。
WebRtcServer::WebRtcServer(RTC::Shared* shared, const std::string& id,
const flatbuffers::Vector<flatbuffers::Offset<Transport::ListenInfo>>* listenInfos)
: id(id), shared(shared)
{
...
// 遍历所有地址
for (const auto* listenInfo : *listenInfos)
{
auto ip = listenInfo->ip()->str();
...
// UDP 协议
if (listenInfo->protocol() == FBS::Transport::Protocol::UDP)
{
RTC::UdpSocket* udpSocket;
// 指定端口范围,从中选择一个
if (listenInfo->portRange()->min() != 0 && listenInfo->portRange()->max() != 0)
{
uint64_t portRangeHash{ 0u };
udpSocket = new RTC::UdpSocket(
this,
ip,
listenInfo->portRange()->min(),
listenInfo->portRange()->max(),
flags,
portRangeHash);
}
// 指定端口
else if (listenInfo->port() != 0)
{
udpSocket = new RTC::UdpSocket(this, ip, listenInfo->port(), flags);
}
// 未指定端口,使用配置中的端口
else
{
uint64_t portRangeHash{ 0u };
udpSocket = new RTC::UdpSocket(
this,
ip,
Settings::configuration.rtcMinPort,
Settings::configuration.rtcMaxPort,
flags,
portRangeHash);
}
...
}
// TCP 协议
else if (listenInfo->protocol() == FBS::Transport::Protocol::TCP)
{
RTC::TcpServer* tcpServer;
// 指定端口范围
if (listenInfo->portRange()->min() != 0 && listenInfo->portRange()->max() != 0)
{
uint64_t portRangeHash{ 0u };
tcpServer = new RTC::TcpServer(
this,
this,
ip,
listenInfo->portRange()->min(),
listenInfo->portRange()->max(),
flags,
portRangeHash);
}
// 指定端口
else if (listenInfo->port() != 0)
{
tcpServer = new RTC::TcpServer(this, this, ip, listenInfo->port(), flags);
}
// 未指定端口,使用配置中的端口
else
{
uint64_t portRangeHash{ 0u };
tcpServer = new RTC::TcpServer(
this,
this,
ip,
Settings::configuration.rtcMinPort,
Settings::configuration.rtcMaxPort,
flags,
portRangeHash);
}
...
}
}
...
}
2.2.2. WebRtcTransport
如果 WebRtcTransport 运行在 WebRtcServer 之上,则 WebRtcTransport 不会再创建 Socket。
WebRtcTransport::WebRtcTransport(...)
{
...
// 将 WebRtcTransport 加入到 WebRtcServer 的转发列表
this->webRtcTransportListener->OnWebRtcTransportCreated(this);
...
}
否则,还需自食其力,WebRtcTransport 创建 Socket 的逻辑与 WebRtcServer 类似,不再赘述。
2.2.3. PlainTransport
PlainTransport 用来对接像 FFMPEG 这种第三方编码器和工具的推拉流, 只支持 UDP 协议,创建逻辑类似,也支持指定端口或端口范围。
PipeTransport::PipeTransport(
RTC::Shared* shared,
const std::string& id,
RTC::Transport::Listener* listener,
const FBS::PipeTransport::PipeTransportOptions* options)
: RTC::Transport::Transport(shared, id, listener, options->base())
{
...
// 指定端口范围
if (this->listenInfo.portRange.min != 0 && this->listenInfo.portRange.max != 0)
{
uint64_t portRangeHash{ 0u };
this->udpSocket = new RTC::UdpSocket(
this,
this->listenInfo.ip,
this->listenInfo.portRange.min,
this->listenInfo.portRange.max,
this->listenInfo.flags,
portRangeHash);
}
// 指定端口
else if (this->listenInfo.port != 0)
{
this->udpSocket = new RTC::UdpSocket(
this, this->listenInfo.ip, this->listenInfo.port, this->listenInfo.flags);
}
// 未指定端口,使用配置
else
{
uint64_t portRangeHash{ 0u };
this->udpSocket = new RTC::UdpSocket(
this,
this->listenInfo.ip,
Settings::configuration.rtcMinPort,
Settings::configuration.rtcMaxPort,
this->listenInfo.flags,
portRangeHash);
}
...
}
2.2.4. PipeTransport
PipeTransport 的设计目的是为了使位于同一主机上或不同主机上的两个Router实例之间进行通信,只支持 UDP 协议,创建逻辑类似,也支持指定端口或端口范围。
PipeTransport::PipeTransport(
RTC::Shared* shared,
const std::string& id,
RTC::Transport::Listener* listener,
const FBS::PipeTransport::PipeTransportOptions* options)
: RTC::Transport::Transport(shared, id, listener, options->base())
{
...
if (this->listenInfo.portRange.min != 0 && this->listenInfo.portRange.max != 0)
{
uint64_t portRangeHash{ 0u };
this->udpSocket = new RTC::UdpSocket(
this,
this->listenInfo.ip,
this->listenInfo.portRange.min,
this->listenInfo.portRange.max,
this->listenInfo.flags,
portRangeHash);
}
else if (this->listenInfo.port != 0)
{
this->udpSocket = new RTC::UdpSocket(
this, this->listenInfo.ip, this->listenInfo.port, this->listenInfo.flags);
}
else
{
uint64_t portRangeHash{ 0u };
this->udpSocket = new RTC::UdpSocket(
this,
this->listenInfo.ip,
Settings::configuration.rtcMinPort,
Settings::configuration.rtcMaxPort,
this->listenInfo.flags,
portRangeHash);
}
...
}
2.3. 数据流
2.3.1. UDP
2.3.1.1. 接收数据
以 WebRtcServer 为例,libuv 收到 UDP 消息会回调 UdpSocketHandle::OnUvRecv,UdpSocketHandle 再回调 UdpSocket::UserOnUdpDatagramReceived,最终将消息回调给数据监听者 WebRtcServer。
2.3.1.2. 发送数据
需要发送 UDP 消息的模块持有 TransportTuple 对象,调用 TransportTuple:: Send 方法,内部调用 UdpSocketHandle::Send,最终通过 libuv 接口将数据发送到网络。
需要注意,UDP 报文发送有一个特殊机制,mediaoup 会先调用 libuv 同步发送接口,如果同步发送接口出错,mediasoup 不是立即返回,而是拷贝发送数据,继续调用 libuv 的异步发送接口。这在某些极端场景下,可能会大量消耗服务器内存。
void UdpSocketHandle::Send(const uint8_t* data, size_t len, const struct sockaddr* addr, UdpSocketHandle::onSendCallback* cb)
{
...
// 使用待发送送数据初始化一块uv缓冲区
uv_buf_t buffer = uv_buf_init(
reinterpret_cast<char*>(const_cast<uint8_t*>(data)), len);
// 调用同步接口发送
const int sent = uv_udp_try_send(this->uvHandle, &buffer, 1, addr);
// 所有数据都发送完成
if (sent == static_cast<int>(len))
{
// Update sent bytes.
this->sentBytes += sent;
if (cb)
{
(*cb)(true); // 回调返回成功
delete cb;
}
return;
}
// 发送了部分数据
else if (sent >= 0)
{
this->sentBytes += sent;
if (cb)
{
(*cb)(false); // 回调返回失败
delete cb;
}
return;
}
// 出错了,可能是网络繁忙,使用异步接口uv_udp_send发送
else if (sent != UV_EAGAIN)
{
MS_WARN_DEV("uv_udp_try_send() failed, trying uv_udp_send(): %s", uv_strerror(sent));
}
// 创建一个异步处理数据结构
auto* sendData = new UvSendData(len);
// 作为自定义数据挂载到uv数据结构中
sendData->req.data = static_cast<void*>(sendData);
// 拷贝待发送数据
std::memcpy(sendData->store, data, len);
// 保存回调函数指针
sendData->cb = cb;
// 使用待发送数据的拷贝初始化uv缓冲区
buffer = uv_buf_init(reinterpret_cast<char*>(sendData->store), len);
// 调用异步接口发送,设置回调接口onSend
const int err = uv_udp_send(&sendData->req, this->uvHandle, &buffer, 1, addr,
static_cast<uv_udp_send_cb>(onSend));
if (err != 0)
{
if (cb)
{
(*cb)(false);
}
delete sendData;
}
else
{
this->sentBytes += len;
}
}
UvSendData 定义如下:
struct UvSendData
{
uv_udp_send_t req{};
uint8_t* store{ nullptr };
UdpSocketHandle::onSendCallback* cb{ nullptr };
};
libuv 发送完成后会回调 onSend,在 onSend 函数中处理善后事宜。
inline static void onSend(uv_udp_send_t* req, int status)
{
auto* sendData = static_cast<UdpSocketHandle::UvSendData*>(req->data);
auto* handle = req->handle;
auto* socket = static_cast<UdpSocketHandle*>(handle->data);
const auto* cb = sendData->cb;
if (socket)
{
socket->OnUvSend(status, cb);
}
// Delete the UvSendData struct (it will delete the store and cb too).
delete sendData;
}
2.3.2. TCP
2.3.2.1. 监听连接
1)TcpServer 调用 libuv 接口建立监听。
2)客户端与服务器完成三次握手后,libuv 会回调 TcpServerHandle::OnUvConnection。
3)TcpServerHandle 回调 TcpServer::UserOnTcpConnectionAlloc。
4)TcpServer 创建 TcpConnection 并调用 TcpServerHandle::AcceptTcpConnection 告知要接受这个连接。
5)TcpServerHandle 对 TcpConnection 进行初始化,调用 libuv 的 uv_accpet 方法完成新连接的创建。
6)调用 TcpConnectionHandle::Start 开始接收数据。
2.3.2.2. 接收数据
接收数据逻辑非常简单,以 WebRtcServer 为例,libuv 收到 TCP 数据后会层层回调到 WebRtcServer。
2.3.2.3. 发送数据
发送 TCP 数据的逻辑也很简单,调用 TransportTuple 接口,内部最终调用 libuv 将数据发送到网络。
3. 定时器
定时器在很多地方都会被用到,mediasoup 使用 TimerHanlde 封装 libuv 的定时器能力。需要使用定时器的类需要继承 TimerHandle::Listener,实现 OnTimer 虚拟方法。然后创建一个 TimerHandle 对象,传入 this 指针,调用 TimerHandle::Start 方法启动定时器即可。
4. 信号处理
信号是进程间通信的一种机制,也是操作系统用来通知进程有关系统事件或异常状况的重要手段。信号可以由系统内核发送给进程,也可以由一个进程发送给另一个进程。在 Worker 进程中,Worker 类是唯一处理 signal 的类,它继承 SignalHandle::Listener,实现 OnSignal 虚拟方法,进程接收的所有信号都会回调给 Worker 处理。
mediasoup 当前只处理了 SIGINT 和 SIGTERM 两个信号,用来优雅的关闭 mediasoup 进程。
void Worker::OnSignal(SignalHandle* /*signalHandle*/, int signum)
{
if (this->closed)
{
return;
}
switch (signum)
{
case SIGINT:
{
if (this->closed)
{
return;
}
Close();
break;
}
case SIGTERM:
{
if (this->closed)
{
return;
}
Close();
break;
}
default:
{
MS_WARN_DEV("received a non handled signal [signum:%d]", signum);
}
}
}
5. 总结
熟悉 mediasoup 的底层通信机制,是深入阅读 mediasoup 源码的基础。本文详细描述了 mediasoup 对 libuv 的封装,覆盖了 pipe、socket、signal 等几种通信方式,重点分析了 Socket 通信的静态结构和数据流,补充分析了 UDP 报文的异步发送机制。mediasoup 对 libuv 的封装简洁清晰,是一个优秀的设计方案,值得大家借鉴。