系列文章目录
- GStreamer 简明教程(一):环境搭建,运行 Basic Tutorial 1 Hello world!
- GStreamer 简明教程(二):基本概念介绍,Element 和 Pipeline
- GStreamer 简明教程(三):动态调整 Pipeline
- GStreamer 简明教程(四):Seek 以及获取文件时长
- GStreamer 简明教程(五):Pad 相关概念介绍,Pad Capabilities/Templates
- GStreamer 简明教程(六):利用 Tee 复制流数据,巧用 Queue 实现多线程
- GStreamer 简明教程(七):实现管道的动态数据流
- GStreamer 简明教程(八):常用工具介绍
- GStreamer 简明教程(九):Seek 与跳帧
- GStreamer 简明教程(十):插件开发,以一个音频特效插件为例
文章目录
- 系列文章目录
- 前言
- 一、准备工作
- 二、Show me the code
- 3.1 线程模型分析
- 消费端模型
- 生产端实现方案
- 3.2 深入 audiotestsrc 的实现逻辑
- 3.2.1 生产者线程的启动时机
- 3.2.2 GStreamer Pad Task 机制详解
- 核心接口与功能
- 设计优势
- 3.2.3 loop 中做了哪些事情
- 3.2.4 格式协商
- 3.2 写一个 AudioSource 插件
- 3.2.1 初始化函数
- 3.2.2 激活函数
- 3.3.3 loop 循环
- 总结
- 参考
前言
GStreamer 中插件分为三种:Source、Filter 和 Sink,在上一章中我们学习了如何写一个 Filter 插件,可以说 Filter 插件是最简单的,因为它只需要关系数据的处理逻辑,而 Source 和 Sink 就更加复杂一些。本章我们来讨论如何写一个 Source 插件。
本章所提及的代码你可以在 my_plugin 找到。
一、准备工作
准备工作与 GStreamer 简明教程(十):插件开发,以一个音频特效插件为例 中提到的类似,不再赘述。
二、Show me the code
接下来详细说明代码中各个细节,其中很多逻辑都是参考 audiotestsrc 来实现的,大家如果想自己对代码进行详细的分线,建议写一个 audiotestsrc 的 demo,进行代码调试。
3.1 线程模型分析
要理解GStreamer pipeline中的数据流动机制,需要首先明确其线程模型。我们以基础音频流水线为例:
audiotestsrc -> autoaudiosink
- 生产者:
audiotestsrc
元素,负责生成音频测试信号 - 消费者:
autoaudiosink
元素,负责将音频输出到系统声卡
消费端模型
autoaudiosink
的实现通常会依赖系统音频服务(如ALSA/PulseAudio)的回调机制:
- 系统音频线程定期通过回调请求数据
- 形成天然的"消费线程"驱动模型
生产端实现方案
生产者有两种典型的实现范式:
方案A:Push模式(主动生产)
audiotestsrc
创建独立的生产者线程- 持续生成数据并推送(push)至下游
- 下游可能需维护数据缓冲区
- 优势:实现直接,适合连续数据流
- 缺点:可能需要维护缓冲区
方案B:Pull模式(按需生产)
audiotestsrc
保持被动状态- 当
autoaudiosink
需要数据时,通过链式调用向上游拉取(pull) - 优势:流量控制精准
- 难点:需要实现复杂的同步机制
由于Push模式更符合"生产者-消费者"的直观理解,且实现复杂度较低,本文选择方案A作为实现基础。Pull模式涉及GStreamer更底层的调度机制,将在后续深入研究后另文探讨。
3.2 深入 audiotestsrc 的实现逻辑
我们先分析官方 audiotestsrc
的关键实现,学习 GStreamer 标准 source 元素的调度机制。
3.2.1 生产者线程的启动时机
当 pipeline 进入播放流程时,audiotestsrc
的生产者线程在 PAUSED 状态下就已经启动了。通过调试分析,其线程启动流程如下:
典型触发路径:
-
状态切换触发
当gst_element_set_state(pipeline, GST_STATE_PAUSED)
被调用时,会触发对所有元素的 pad 激活操作:gst_pad_set_active(pad, TRUE) // 激活所有 pad
-
Pad 激活回调
audiotestsrc
的 src pad 重写了activatemode_function
,此时会调用继承链:→ gst_base_src_activate_mode() // GstBaseSrc 的标准实现
-
任务线程创建
在gst_base_src_activate_mode()
中,最终通过:gst_pad_start_task(pad, gst_base_src_loop, ...)
启动独立线程执行主循环逻辑
-
主循环工作
gst_base_src_loop()
包含完整处理逻辑:- 格式协商(caps negotiation)
- 发送
STREAM_START
事件 - 生成音频数据
- 数据推送(
gst_pad_push()
)
关键结论:
- 线程启动的实际触发点是 PAUSED 状态下的 pad 激活,而非 PLAYING 状态
- 通过重写
activatemode_function
可以自定义启动逻辑 GstBaseSrc
已封装标准线程调度框架,子类只需实现数据生成
3.2.2 GStreamer Pad Task 机制详解
在 GStreamer 框架中,GstPad
不仅负责数据流的连接与协商,还提供了一套完整的异步任务(Task)接口,允许开发者将线程逻辑直接封装在 Pad 层面,而非传统的 Element 中。这一设计显著提升了模块化程度和灵活性。
核心接口与功能
-
任务启动:
gst_pad_start_task()
gboolean gst_pad_start_task( GstPad *pad, GstTaskFunction func, gpointer user_data, GDestroyNotify notify );
- 作用:启动一个专用线程,循环执行指定的
GstTaskFunction
。 - 关键特性:
- 线程会自动进入循环,持续调用目标函数,无需开发者手动实现循环逻辑。
- 典型应用场景:在
GstBaseSrc
的子类中,gst_base_src_loop()
仅需实现单次数据生成逻辑,任务线程会负责循环调度。例如音频源(audiosource)可通过此机制持续生成音频帧。
- 作用:启动一个专用线程,循环执行指定的
-
任务暂停:
gst_pad_pause_task()
- 行为:临时挂起任务线程的执行,但保留任务状态(如内部变量)。
- 用途:实现动态流控,如响应管道的暂停状态或资源限制。
-
任务终止:
gst_pad_stop_task()
- 行为:完全停止任务线程并释放相关资源。
- 注意:与暂停不同,停止后需重新调用
start_task
才能恢复执行。
设计优势
- 逻辑解耦:将线程管理与业务逻辑分离,Element 只需关注数据处理,Pad Task 处理线程调度。
- 性能优化:避免在 Element 层频繁创建/销毁线程,任务线程可复用。
- 标准化的流控:通过统一的任务接口实现暂停/恢复,简化状态管理。
3.2.3 loop 中做了哪些事情
在 GStreamer 中,audiotestsrc
这类源元素(source element)通过 gst_pad_start_task
启动一个任务循环(gst_base_src_loop
),其中有两件事情非常重要
-
格式协商(negotiate):
和下游商量用什么格式传递数据(比如采样率、位深等)。这一步确保数据能被正确处理。 -
生成数据并推送(push):
按协商好的格式生成音频数据,然后推给下游。
接下来我们重点讲 格式协商,当格式确定后如何生成数据和推给下游就会变得简单很多
3.2.4 格式协商
格式协商的前提是两个元素已经 link 成功。两个元素能够相互连接的前提是它们 pad 的 Capability 是有交集的,比如 src pad 支持的音频采样率是 [1, 96000],那么如果下游支持的采样率在这个范围内,他们就能 link 成功,否则在元素 link 阶段就会失败
auto ok = gst_element_link_many(ele0, ele1, NULL);
if(!ok){
printf("link failed");
}
在 link 阶段仅仅是确认了元素之间支持的数据格式是包含一个子集的,那么接下来在运行阶段,我们要从这个子集中,确认唯一的格式,这样才能确定数据是以什么形式进行流动,例如确定音频采样率是 44100,声道数 2,32位浮点数。也就是说,协商的过程就是找到一个这样的固定格式,audiotestsrc 根据这个固定格式来生成音频数据。为了说明这一点,我这边举一个简单的例子。
有三个元素 Source、Filter 和 Sink,它们顺序相互连接,支持的采样率分别是:
- Source : {16000, 32000}
- Filter: [1, Max]
- Sink: {32000, 44100, 48000}
+--------+ +--------+ +--------+
| Source |------>| Filter |------>| Sink |
+--------+ +--------+ +--------+
{16000, 32000} [1, Max] {16000, 32000, 44100, 48000}
首先,它们的采样率有一个公共的子集,即 {16000, 32000}
,因此它们在 link 阶段是成功的;接着,格式协商由 Source 发起,它用自己的格式作为格式过滤器(filter),获取 peer 端的所支持的格式,流程大致是:
- Sink 支持采样率
{16000, 32000, 44100, 48000}
与{16000, 32000}
取交集,得到{16000, 32000}
记作 A - Filter 支持采样率
[1, Mac]
与 A 取交集,得到{16000, 32000}
记作 B - Source 支持采样率
{16000, 32000}
与 B 取交集,得到{16000, 32000}
记作 peercaps
这时候 peercaps 仍然是一个范围,不是一个固定的值,最终由 source 来决定使用哪个固定值,固定下来后,再将它作为 source 的 caps,并通过事件通知给其他元素,其他元素会收到 GST_QUERY_ACCEPT_CAPS 事件,在这个事件中获取 caps 数据做相应的处理。
// 获取 source 的 caps
thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (src), NULL);
// 以 thiscaps 作为 filter,获取 peercaps
peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps);
// basesrc 来决定最终使用哪些固定值
caps = gst_base_src_fixate (basesrc, caps);
// 固定 caps,并发送 GST_QUERY_ACCEPT_CAPS 事件
result = gst_base_src_set_caps (basesrc, caps);
3.2 写一个 AudioSource 插件
了解了上面的知识后,我们来开始写一个自己的音频生成插件,为了让代码简单,我们做了这些简化:
- 只支持单声道、F32LE 、interleave 格式的数据
- 只支持 push 模式
详细的代码实现参考 my_plugin,使用 demo 参考 gstmyaudiotestsrc_example
3.2.1 初始化函数
在类初始化函数如下:
static void gst_my_audio_test_src_class_init(GstMyAudioTestSrcClass *klass) {
//...
gobject_class->set_property = gst_my_audio_test_src_set_property;
gobject_class->get_property = gst_my_audio_test_src_get_property;
gobject_class->finalize = gst_my_audio_test_src_finalize;
gst_my_audio_test_class_init_install_properties(gobject_class, klass);
// ...
}
我们覆写三个函数
set_property
,用于设置属性值get_property
,用于获取属性值finalize
,用于类的析构,释放一些申请的资源
gst_my_audio_test_class_init_install_properties
函数中注册了属性,具体大家自行看源码,不展开说了。
实例的初始化函数如下:
static void gst_my_audio_test_src_init(GstMyAudioTestSrc *filter) {
filter->impl = new GstMyAudioTestSrcImpl();
filter->srcpad = gst_pad_new_from_static_template(&gst_audio_test_src_src_template, "src");
gst_pad_set_activatemode_function(filter->srcpad,
gst_my_audio_test_src_activate_mode);
gst_element_add_pad(GST_ELEMENT(filter), filter->srcpad);
}
- 申请
GstMyAudioTestSrcImpl
实例,它用 c++ 来写,可以简化一些代码逻辑 - 创建
srcpad
,并设置srcpad
的激活函数(_activatemode_function
)
3.2.2 激活函数
前面提到,Pad 的 _activatemode_function
是线程启动的入口,我们看看函数逻辑是怎么样的
static gboolean gst_my_audio_test_src_activate_mode(GstPad *pad,
GstObject *parent,
GstPadMode mode,
gboolean active) {
auto *src = GST_MYAUDIOTESTSRC(parent);
switch (mode) {
case GST_PAD_MODE_PULL: {
res = gst_my_audio_test_src_pull();
break;
}
case GST_PAD_MODE_PUSH: {
res = gst_my_audio_test_src_activate_push(src->srcpad, parent, active);
break;
}
// ...
}
目前 GST_PAD_MODE_PULL
是不支持的,因此看 GST_PAD_MODE_PUSH
即可
static gboolean gst_my_audio_test_src_activate_push(GstPad *srcpad,
GstObject *parent,
gboolean active) {
if (active) {
g_print("start loop");
gst_pad_start_task(srcpad, (GstTaskFunction)gst_my_audio_test_src_loop,
srcpad, NULL);
} else {
g_print("stop loop");
gst_pad_stop_task(srcpad);
}
return TRUE;
}
_activate_push
函数很简单,启动 _src_loop
或者停止 _src_loop
3.3.3 loop 循环
接下来看最重要的 _src_loop
函数,主要做的两个事情就是格式协商和数据填充
格式协商逻辑如下:
static void gst_my_audio_test_src_loop(GstPad *pad) {
GstMyAudioTestSrc *src;
src = GST_MYAUDIOTESTSRC(GST_OBJECT_PARENT(pad));
GstCaps *caps = NULL;
gboolean result = FALSE;
if (gst_pad_check_reconfigure(pad))
{
g_print("need renegotiate\n");
GstCaps *thiscaps = gst_pad_query_caps(src->srcpad, NULL);
GST_DEBUG_OBJECT(src, "caps of src: %" GST_PTR_FORMAT, thiscaps);
GstCaps *peercaps = gst_pad_peer_query_caps(src->srcpad, thiscaps);
GST_DEBUG_OBJECT(src, "caps of peer: %" GST_PTR_FORMAT, peercaps);
if (peercaps)
{
caps = peercaps;
gst_caps_unref(thiscaps);
} else
{
caps = thiscaps;
}
if (caps && !gst_caps_is_empty(caps))
{
caps = gst_my_audio_test_src_fixate(src, caps);
caps = gst_caps_fixate(caps);
GST_DEBUG_OBJECT(src, "fixated to: %" GST_PTR_FORMAT, caps);
if (gst_caps_is_fixed(caps))
{
/* yay, fixed caps, use those then, it's possible that the subclass
* does not accept this caps after all and we have to fail. */
result = gst_my_audio_test_src_set_caps(src, caps);
if (result)
{
result = gst_pad_push_event(src->srcpad, gst_event_new_caps(caps));
}
}
}
if (!result)
{
GST_DEBUG_OBJECT(src, "negotiation failed");
gst_pad_pause_task(pad);
}
}
// ...
}
逻辑大致是:
- 获取当前 src pad 的 caps,获取 peer pad 的 caps,两者取交集
- 调用
gst_my_audio_test_src_fixate
去设置 audio src 最期望的数据格式,然后调用gst_caps_fixate
固化数据格式(此时所有数据格式已经确定,不再是一个范围值) gst_my_audio_test_src_set_caps
函数从 caps 中获取 AudioInfo 信息,拿到例如采样率、声道数等关键信息gst_pad_push_event
发送事件,通知下游数据格式
数据填充数据如下:
static void gst_my_audio_test_src_loop(GstPad *pad) {
//...
// generate audio data
GstBuffer *buf = NULL;
guint blocksize = impl->samples_per_buffer;
guint bufferSize = blocksize * sizeof(float);
buf = gst_buffer_new_allocate(NULL, bufferSize, NULL);
if (buf == NULL) {
GST_DEBUG_OBJECT(src, "alloc buffer failed");
}
GstMapInfo map;
gst_buffer_map(buf, &map, GST_MAP_WRITE);
float *data = (float *)map.data;
impl->fill(data, blocksize);
auto ret = gst_pad_push(src->srcpad, buf);
if (ret != GST_FLOW_OK) {
GST_DEBUG_OBJECT(src, "push buffer failed");
gst_pad_pause_task(pad);
}
}
gst_buffer_new_allocate
申请 GstBuffer,用于存放音频数据gst_buffer_map
从 GstBuffer 中拿到可写音频数据的地址impl->fill(data, blocksize);
用于填充音频数据,这部分用的是一个 LFO 生成器,具体逻辑大家可以不用在意。总之就是往一块内存中,写入生成的音频数据,你甚至可以写入随机噪声。。gst_pad_push
将数据 push 到下游
总结
以上,我们就将 AudioSource 如何生成数据的逻辑大致讲了一遍,运行 gstmyaudiotestsrc_example 之后可以听到正弦波的声音。后面我会去研究下如何实现 pull 模式,以及支持更多类型的音频数据和波形。
参考
- my_plugin
- gstmyaudiotestsrc_example