在音视频处理流程中,ffplay
的有两种队列,包缓存队列(Packet Buffer Queue)和帧缓存队列(Frame Buffer Queue)。这两个队列的存在,是为了适应音视频数据处理过程中的多线程架构——包括收包线程、解码线程和渲染线程。具体来说,收包线程负责从网络或文件中读取数据并将其放入包缓存队列中;解码线程从包缓存队列中取出数据进行解码,然后将解码后的数据放入帧缓存队列中;最后,渲染线程从帧缓存队列中取出数据进行渲染。由于每个线程的处理速度不同,缓存队列在这一过程中起到了平衡各线程工作负荷和避免数据丢失的关键作用。
音频、视频和字幕都经历了类似的处理流程,因此设计出高效且适应音视频特性的缓存队列显得尤为重要。ffplay
中对于包缓存队列和帧缓存队列的设计不仅确保了音视频数据的流畅处理,还有效地提升了播放体验。这种设计通过合理的缓存策略和线程同步机制,成功地解决了音视频处理中的各种挑战。
一、包缓存队列
包缓存队列的设计需要考虑多个因素,以确保其高效性和稳定性。因为数据包本身通常较小,因此没有必要将缓存队列设计为循环队列,采用常规的入队申请内存和出队释放内存的方式即可。
ffplay中的包缓存队列设计适配了音视频的特性,和普通的队列相比有如下差异
- 序列号处理功能
使用serial
字段来追踪数据包的顺序,在某些多路流(如音视频同步)场景中非常有用。每次队列重启或刷新时,serial
都会递增,有助于区分不同的播放段。
比如说发生跳转时,又解码到了跳转之前的数据,可能会有回跳的现象,
ffplay会在发生跳转的时候,更新包的序列号,当解码到老的序列号,就把数据给丢弃掉,直到解码到新的数据。
- 自动增长的fifo队列
使用av_fifo_alloc2
创建自动增长的FIFO队列,避免了频繁内存分配,提高了性能。
- 阻塞和非阻塞设置
阻塞与非阻塞模式:packet_queue_get
函数通过block
参数实现了阻塞和非阻塞模式的灵活切换,使得队列在不同的使用场景下能够适应需求。
1.1 PacketQueue结构体
//MyAVPacketList结构体的作用就是给包加上序列号
typedef struct MyAVPacketList {
AVPacket *pkt;
int serial;
} MyAVPacketList;
typedef struct PacketQueue {
AVFifo *pkt_list;//fifo队列,
int nb_packets;//packet数量
int size;//packet大小(字节)
int64_t duration;//持续时长
int abort_request;//中断请求
int serial;//序列号
SDL_mutex *mutex;//锁
SDL_cond *cond;//条件变量
} PacketQueue;
1.2 初始化
static int packet_queue_init(PacketQueue *q)
{
memset(q, 0, sizeof(PacketQueue));
//
q->pkt_list = av_fifo_alloc2(1, sizeof(MyAVPacketList), AV_FIFO_FLAG_AUTO_GROW);
if (!q->pkt_list)
return AVERROR(ENOMEM);
q->mutex = SDL_CreateMutex();
if (!q->mutex) {
av_log(NULL, AV_LOG_FATAL, "SDL_CreateMutex(): %s\n", SDL_GetError());
return AVERROR(ENOMEM);
}
q->cond = SDL_CreateCond();
if (!q->cond) {
av_log(NULL, AV_LOG_FATAL, "SDL_CreateCond(): %s\n", SDL_GetError());
return AVERROR(ENOMEM);
}
q->abort_request = 1;
return 0;
}
1.3 开始运行
static void packet_queue_start(PacketQueue *q)
{
SDL_LockMutex(q->mutex);
q->abort_request = 0;
q->serial++;
SDL_UnlockMutex(q->mutex);
}
1.4 放入packet
//内部调用
static int packet_queue_put_private(PacketQueue *q, AVPacket *pkt)
{
MyAVPacketList pkt1;
int ret;
if (q->abort_request)
return -1;
//给音视频包加上序列号
pkt1.pkt = pkt;
pkt1.serial = q->serial;
//把包添加进队列
ret = av_fifo_write(q->pkt_list, &pkt1, 1);
if (ret < 0)
return ret;
//队列相关数据更新
q->nb_packets++;
q->size += pkt1.pkt->size + sizeof(pkt1);
q->duration += pkt1.pkt->duration;
//发出信号,表明当前队列中有数据了,通知等待中的读线程可以取数据了
SDL_CondSignal(q->cond);
return 0;
}
static int packet_queue_put(PacketQueue *q, AVPacket *pkt)
{
AVPacket *pkt1;
int ret;
pkt1 = av_packet_alloc();
if (!pkt1) {
av_packet_unref(pkt);
return -1;
}
//将pkt的内存转移到pkt1里,这样可以减少一次内存的拷贝。
av_packet_move_ref(pkt1, pkt);
//保护队列的线程安全
SDL_LockMutex(q->mutex);
ret = packet_queue_put_private(q, pkt1);
SDL_UnlockMutex(q->mutex);
if (ret < 0)
av_packet_free(&pkt1);
return ret;
}
1.5 获取队列中的数据
/* return < 0 if aborted, 0 if no packet and > 0 if packet. */
static int packet_queue_get(PacketQueue *q, AVPacket *pkt, int block, int *serial)
{
MyAVPacketList pkt1;
int ret;
SDL_LockMutex(q->mutex);
for (;;) {
if (q->abort_request) {
ret = -1;
break;
}
//从队列中取出一个包
if (av_fifo_read(q->pkt_list, &pkt1, 1) >= 0) {
q->nb_packets--;
q->size -= pkt1.pkt->size + sizeof(pkt1);
q->duration -= pkt1.pkt->duration;
//移动这个包的数据
av_packet_move_ref(pkt, pkt1.pkt);
//更新序列号
if (serial)
*serial = pkt1.serial;
//释放这个packet本身
av_packet_free(&pkt1.pkt);
ret = 1;
break;
//若非阻塞模式,直接返回
} else if (!block) {
ret = 0;
break;
} else {
//若阻塞模式,等待数据
SDL_CondWait(q->cond, q->mutex);
}
}
SDL_UnlockMutex(q->mutex);
return ret;
}
1.6 清空队列
static void packet_queue_flush(PacketQueue *q)
{
MyAVPacketList pkt1;
SDL_LockMutex(q->mutex);
while (av_fifo_read(q->pkt_list, &pkt1, 1) >= 0)
av_packet_free(&pkt1.pkt);
q->nb_packets = 0;
q->size = 0;
q->duration = 0;
q->serial++;
SDL_UnlockMutex(q->mutex);
}
1.7 停止队列
static void packet_queue_abort(PacketQueue *q)
{
SDL_LockMutex(q->mutex);
q->abort_request = 1;
//通知处理,避免packet_queue_get没有获取到数据,一直在等待
SDL_CondSignal(q->cond);
SDL_UnlockMutex(q->mutex);
}
1.8 销毁数据
static void packet_queue_destroy(PacketQueue *q)
{
packet_queue_flush(q);
av_fifo_freep2(&q->pkt_list);
SDL_DestroyMutex(q->mutex);
SDL_DestroyCond(q->cond);
}
二、帧缓存队列
帧缓存队列主要功能是用于视频解码器的帧管理,它提供了一个环形缓冲区,用于存储已经解码的帧,同时支持多线程的读写操作。它与包缓存队列类似,使用互斥锁和条件变量来同步访问,确保线程安全。
其设计要点如下,
环形缓冲区:
-
使用固定大小的数组实现环形缓冲区,通过读写索引(
rindex
和windex
)管理帧的存取。 -
通过模运算确保读写索引在队列范围内循环,提供高效的队列操作。
阻塞/非阻塞操作:
- 提供阻塞式的
peek_writable
和peek_readable
方法,在队列满或空时等待条件满足。这种设计适合生产者-消费者模式,确保帧处理的同步性。
支持保存最后一帧:
- 通过
keep_last
标志控制是否保留最后一帧,适应不同的使用场景,例如在播放结束时显示最后一帧。
2.1 FrameQueue结构体
typedef struct FrameQueue {
Frame queue[FRAME_QUEUE_SIZE]; // 存储帧的数组,容量为 FRAME_QUEUE_SIZE
int rindex; // 读索引,指向下一个要读取的帧位置
int windex; // 写索引,指向下一个要写入的帧位置
int size; // 当前队列中的帧数量
int max_size; // 队列的最大帧数量
int keep_last; // 标志位,表示是否保留最后一帧以供显示
int rindex_shown; // 标志是否已经显示最后读取的帧
SDL_mutex *mutex; // 互斥锁,用于同步访问队列
SDL_cond *cond; // 条件变量,用于线程之间的等待和通知
PacketQueue *pktq; // 指向关联的 PacketQueue,用于解码的输入数据
} FrameQueue;
2.1 初始化和销毁
/* 初始化 FrameQueue,设置队列大小,创建互斥锁和条件变量,并分配每个帧的内存 */
static int frame_queue_init(FrameQueue *f, PacketQueue *pktq, int max_size, int keep_last)
{
int i;
// 初始化 FrameQueue 结构体
memset(f, 0, sizeof(FrameQueue));
// 创建互斥锁,用于线程同步
if (!(f->mutex = SDL_CreateMutex())) {
av_log(NULL, AV_LOG_FATAL, "SDL_CreateMutex(): %s\n", SDL_GetError());
return AVERROR(ENOMEM); // 内存分配错误
}
// 创建条件变量,用于线程间的信号传递
if (!(f->cond = SDL_CreateCond())) {
av_log(NULL, AV_LOG_FATAL, "SDL_CreateCond(): %s\n", SDL_GetError());
return AVERROR(ENOMEM); // 内存分配错误
}
// 关联 PacketQueue,后续用于检查是否中止操作
f->pktq = pktq;
// 设置队列的最大容量,不能超过预定义的 FRAME_QUEUE_SIZE
f->max_size = FFMIN(max_size, FRAME_QUEUE_SIZE);
// f->keep_last = !!keep_last是将int取值的keep_last转换为boot取值(0或1)
f->keep_last = !!keep_last;
// 为队列中的每个 Frame 分配内存
for (i = 0; i < f->max_size; i++)
if (!(f->queue[i].frame = av_frame_alloc()))
return AVERROR(ENOMEM); // 内存分配失败
return 0; // 初始化成功
}
/* 销毁 FrameQueue,释放所有资源,包括互斥锁、条件变量和帧的内存 */
static void frame_queue_destroy(FrameQueue *f)
{
int i;
// 逐个释放队列中每个 Frame 的内存
for (i = 0; i < f->max_size; i++) {
Frame *vp = &f->queue[i];
frame_queue_unref_item(vp); // 取消引用帧中的数据
av_frame_free(&vp->frame); // 释放 AVFrame 结构体
}
// 销毁互斥锁
SDL_DestroyMutex(f->mutex);
// 销毁条件变量
SDL_DestroyCond(f->cond);
}
2.2 读写帧
2.2.1 环状缓冲区
环状缓冲区是一种数据结构,用于高效管理固定大小的缓冲区,特别适用于需要频繁写入和读取的数据流,如音视频播放。它的核心思想是使用一个固定大小的数组,并使用两个指针(读指针和写指针)来跟踪读写位置。当写指针达到缓冲区末尾时,它会绕回到开始处,继续写入未被读指针消费的位置。
如下是一个生动的例子,蓝色是读指针,红色是写指针,读指针跟在写指针后边追着,
在 FrameQueue
中,环状缓冲区通过以下变量和逻辑实现:
-
缓冲区数组:
Frame queue[FRAME_QUEUE_SIZE]
,它是一个定长的数组,保存实际的帧数据。 -
写入索引 (
windex
):指示当前可以写入新帧的位置。 -
读取索引 (
rindex
):指示当前读取的帧的位置。 -
队列大小 (
size
):当前存储的帧的数量。 -
最大队列大小 (
max_size
):队列的容量限制。
环状缓冲区的操作逻辑:
-
写入操作 (
frame_queue_push
):将新帧写入queue[windex]
,然后递增windex
。如果windex
达到max_size
,则环绕回 0,即windex = 0
。 -
读取操作 (
frame_queue_next
):从queue[rindex]
读取帧,然后递增rindex
。如果rindex
达到max_size
,同样环绕回 0,即rindex = 0
。
这种结构的好处是高效地利用固定大小的数组,无需频繁地分配或释放内存,并能够高效地进行顺序的读取和写入操作。
2.2.2 线程安全
在查看 frame
源码时,我们注意到锁的主要作用是保护状态信息(如 size
、rindex
、windex
等),而不是直接保护实际的数据内容。这可能引发一个问题:是否会出现同一块内存在边读边写的情况?
答案是不会出现这样的情况,因为 FFplay 的队列设计采用了操作分离的策略。
具体来说,当 frame_queue_peek_writable
获取到一个可写的帧指针后,实际的数据写入操作是由调用者完成的。写入操作完成后,需要调用 frame_queue_push
告知队列写入已经完成,只有在这一步后,写指针 windex
才会被更新。由于队列的这种设计,即在写入操作完成并通知队列之前,读指针 rindex
不会去访问该帧的数据,因此不会出现同时对同一块内存进行读写的情况。
这种操作分离的设计确保了在没有锁保护实际数据的情况下,读写操作依然是安全的,避免了竞态条件的发生。
2.2.3 写数据
/* 获取可写入的新帧,如果队列已满则阻塞直到有空间可写 */
static Frame *frame_queue_peek_writable(FrameQueue *f)
{
/* 等待直到有空间可以写入新帧 */
SDL_LockMutex(f->mutex); // 锁定互斥锁,进入临界区
while (f->size >= f->max_size && !f->pktq->abort_request) {
SDL_CondWait(f->cond, f->mutex); // 等待条件变量信号
}
SDL_UnlockMutex(f->mutex); // 解锁互斥锁,离开临界区
// 如果接收到中止请求,则返回 NULL
if (f->pktq->abort_request)
return NULL;
// 返回当前可写入的新帧引用
return &f->queue[f->windex];
}
/* 将写入索引移动到下一位置,并更新队列大小 */
static void frame_queue_push(FrameQueue *f)
{
if (++f->windex == f->max_size) // 循环更新写索引
f->windex = 0;
SDL_LockMutex(f->mutex); // 锁定互斥锁
f->size++; // 增加队列中帧的数量
SDL_CondSignal(f->cond); // 发送信号通知可读
SDL_UnlockMutex(f->mutex); // 解锁互斥锁
}
2.2.4 读数据
读队列引入了一种机制,用于保留已显示的最后一帧,即始终在队列中保留一帧。这样设计的好处是在播放结束时,画面能够停留在最后一帧。如果没有这种机制,就需要额外判断队列中的帧数量是否等于一,这会使代码变得复杂且难以维护。
虽然这个机制使得读队列的设计比写队列复杂得多,但它非常巧妙且实用。这种设计体现了对细节的用心,需要没有kpi压力的程序员才能打磨出来。
keep_last
: 表示是否需要保留最后一个已显示的帧。如果设置为 1
,则在调用 frame_queue_next
时会保留当前显示的帧,不会立即移动读索引。
rindex_shown
: 用于标记当前是否已显示保留的最后一帧。默认是 0
,如果设置为 1
,表示当前的读索引位置对应的是已经显示的帧,并且该帧被保留用于后续显示。
简单理解,就是如果rindex_shown + rindex 才等于当前帧
/* 获取可读取的帧,如果队列为空则阻塞直到有帧可读 */
static Frame *frame_queue_peek_readable(FrameQueue *f)
{
/* 等待直到有可读的帧 */
SDL_LockMutex(f->mutex); // 锁定互斥锁
//如果rindex_shown为1,说明当前读索引对应的是已读的帧,所以判断可读数量时需要减去这一帧。
while (f->size - f->rindex_shown <= 0 && !f->pktq->abort_request) {
SDL_CondWait(f->cond, f->mutex); // 等待条件变量
}
SDL_UnlockMutex(f->mutex); // 解锁互斥锁
// 如果接收到中止请求,则返回 NULL
if (f->pktq->abort_request)
return NULL;
// 返回当前可读取的帧引用
return &f->queue[(f->rindex + f->rindex_shown) % f->max_size];
}
/* 获取下一个即将显示的帧 */
static Frame *frame_queue_peek_next(FrameQueue *f)
{
// 通过计算得出下一个显示帧的索引
return &f->queue[(f->rindex + f->rindex_shown + 1) % f->max_size];
}
/* 获取最后一个显示的帧*/
static Frame *frame_queue_peek_last(FrameQueue *f)
{
// 返回最后一个已显示帧的引用
return &f->queue[f->rindex];
}
/* 获取当前应该显示的帧*/
static Frame *frame_queue_peek(FrameQueue *f)
{
//它使用 rindex(当前读取索引)加上 rindex_shown 来确定当前显示的帧。如果 rindex_shown 为 0,表示当前帧就是 rindex 指向的帧;如果 rindex_shown 为 1,则获取下一个帧
return &f->queue[(f->rindex + f->rindex_shown) % f->max_size];
}
/* 将读取索引移动到下一位置,并更新队列大小 */
static void frame_queue_next(FrameQueue *f)
{
if (f->keep_last && !f->rindex_shown) {
// 如果设置了 keep_last 并且当前没有显示帧,则保留当前帧,
//如果满足这两个条件,rindex_shown 被设置为 1,函数直接返回,不移动读索引 rindex,也不减少 size。这样就实现了保留当前帧的功能。
f->rindex_shown = 1;
return;
}
// 取消引用当前显示的帧数据
frame_queue_unref_item(&f->queue[f->rindex]);
if (++f->rindex == f->max_size) // 循环更新读索引
f->rindex = 0;
SDL_LockMutex(f->mutex); // 锁定互斥锁
f->size--; // 减少队列中帧的数量
SDL_CondSignal(f->cond); // 发送信号通知可写
SDL_UnlockMutex(f->mutex); // 解锁互斥锁
}