系列文章目录
- GStreamer 简明教程(一):环境搭建,运行 Basic Tutorial 1 Hello world!
- GStreamer 简明教程(二):基本概念介绍,Element 和 Pipeline
- GStreamer 简明教程(三):动态调整 Pipeline
- GStreamer 简明教程(四):Seek 以及获取文件时长
- GStreamer 简明教程(五):Pad 相关概念介绍,Pad Capabilities/Templates
- GStreamer 简明教程(六):利用 Tee 复制流数据,巧用 Queue 实现多线程
文章目录
- 系列文章目录
- 前言
- Pipeline
- appsrc 和 appsink
- appsrc
- appsink
- Pipeline 中的元素与外部动态交互的机制
- Show me the code
- 配置 appsrc 和 appsink
- appsrc 回调函数
- appsink 回调函数
- 总结
- 参考
前言
本章基于 Basic tutorial 8: Short-cutting the pipeline
进行说明和补充,以便读者掌握如何动态的插入或者提取出 Pipeline 中的数据。
本章内容并不复杂,它引入了两个新的 Element:appsrc 和 appsink。通过这两个 Element 我们可以实现动态管理数据流。
由于官方教程代码在本人机器上总是运行失败,因此我对它做了一些修改,具体代码在 basic-tutorial-8.c
Pipeline
本次教程中使用的 Pipeline 如上图,简单做个解释:
- appsrc: 用于推送数据到 Pipeline 中
- tee: 负责复制数据
- queue: 创建一个 FIFO(先进先出)缓冲区。它接收输入数据,暂时存储,然后在单独线程中按照接收顺序输出数据。
- audioconvert: 音频格式转换
- audioresample: 音频重采样
- autoaudiosink: 播放音频
- appsink: 负责接收 Pipeline 中的数据
appsrc 和 appsink
appsrc
appsrc
是 GStreamer 的一个源元素,用于在外部应用程序中插入数据到 GStreamer 管道。它提供了一种灵活的方式让用户从自定义或非标准数据源读取数据,并将其注入到 GStreamer 管道中进行进一步处理和流处理。
主要特点:
- 支持从应用程序中推送数据到管道。
- 可以用于实时数据流(如视频捕捉或生成的音频)。
- 允许通过回调函数和事件控制数据流。
appsink
appsink
是 GStreamer 的一个接收器元素,允许从管道中提取数据到外部应用程序。通过它,用户可以读取 GStreamer 管道末端的数据块并进行自定义处理。
主要特点:
- 从管道中拉取数据到应用程序中。
- 支持获取数据的回调,便于集成到应用程序的逻辑中。
- 常用于需要对流媒体进行自定义处理或分析的场景。
这两个元素结合使用,可以在应用程序与 GStreamer 管道之间实现动态、灵活的数据交互和处理。
Pipeline 中的元素与外部动态交互的机制
在 GStreamer 中,Element Signals 和 Element Actions 提供了与元素进行交互的额外机制:
-
Element Signals(元素信号):
- 方向:从元素向外部发送
- 用途:通知外部模块关于元素内部的状态变化或事件
- 使用方法:外部模块通过 g_signal_connect() 连接回调函数来接收信号
- 示例:‘new-sample’(新样本可用), ‘need-data’(需要更多数据)
-
Element Actions(元素动作):
- 方向:从外部向元素发送
- 用途:允许外部模块控制元素的特定行为
- 使用方法:外部模块通过 g_signal_emit_by_name() 触发动作
- 示例:‘push-buffer’(推送数据缓冲区), ‘end-of-stream’(结束数据流)
这两种机制共同提供了一个双向通信的框架:
- Signals 允许元素主动通知外部世界其内部发生的事件或状态变化。
- Actions 允许外部世界主动控制元素的行为或触发特定操作。
这种设计使得 GStreamer 的元素能够灵活地与应用程序和其他元素交互,同时保持良好的封装性。应用程序可以根据元素发出的信号做出反应,也可以通过动作来影响元素的行为,而无需直接访问元素的内部实现。
Show me the code
有了上面的基础知识铺垫后,让我们来看具体的实现代码,再重申一次,
由于官方教程代码在本人机器上总是运行失败,因此我对它做了一些修改,具体代码在 basic-tutorial-8.c
#include <gst/audio/audio.h>
#include <gst/gst.h>
#include <string.h>
#ifdef __APPLE__
#include <TargetConditionals.h>
#endif
#define CHUNK_SIZE 1024 /* Amount of bytes we are sending in each buffer */
#define SAMPLE_RATE 44100 /* Samples per second we are sending */
/* Structure to contain all our information, so we can pass it to callbacks */
typedef struct _CustomData {
GstElement *pipeline, *app_source, *tee, *audio_queue, *audio_convert1,
*audio_resample, *audio_sink;
GstElement *video_queue, *audio_convert2, *visual, *video_convert,
*video_sink;
GstElement *app_queue, *app_sink;
guint64 num_samples; /* Number of samples generated so far (for timestamp
generation) */
gfloat a, b, c, d; /* For waveform generation */
guint sourceid; /* To control the GSource */
GMainLoop *main_loop; /* GLib's Main Loop */
} CustomData;
/* This method is called by the idle GSource in the mainloop, to feed CHUNK_SIZE
* bytes into appsrc. The idle handler is added to the mainloop when appsrc
* requests us to start sending data (need-data signal) and is removed when
* appsrc has enough data (enough-data signal).
*/
static gboolean push_data(CustomData *data) {
GstBuffer *buffer;
GstFlowReturn ret;
int i;
GstMapInfo map;
gint16 *raw;
gint num_samples = CHUNK_SIZE / 2; /* Because each sample is 16 bits */
gfloat freq;
/* Create a new empty buffer */
buffer = gst_buffer_new_and_alloc(CHUNK_SIZE);
/* Set its timestamp and duration */
GST_BUFFER_TIMESTAMP(buffer) =
gst_util_uint64_scale(data->num_samples, GST_SECOND, SAMPLE_RATE);
GST_BUFFER_DURATION(buffer) =
gst_util_uint64_scale(num_samples, GST_SECOND, SAMPLE_RATE);
/* Generate some psychodelic waveforms */
gst_buffer_map(buffer, &map, GST_MAP_WRITE);
raw = (gint16 *)map.data;
data->c += data->d;
data->d -= data->c / 1000;
freq = 1100 + 1000 * data->d;
for (i = 0; i < num_samples; i++) {
data->a += data->b;
data->b -= data->a / freq;
raw[i] = (gint16)(500 * data->a);
}
gst_buffer_unmap(buffer, &map);
data->num_samples += num_samples;
/* Push the buffer into the appsrc */
g_signal_emit_by_name(data->app_source, "push-buffer", buffer, &ret);
/* Free the buffer now that we are done with it */
gst_buffer_unref(buffer);
if (ret != GST_FLOW_OK) {
/* We got some error, stop sending data */
return FALSE;
}
return TRUE;
}
/* This signal callback triggers when appsrc needs data. Here, we add an idle
* handler to the mainloop to start pushing data into the appsrc */
static void start_feed(GstElement *source, guint size, CustomData *data) {
if (data->sourceid == 0) {
g_print("Start feeding\n");
data->sourceid = g_idle_add((GSourceFunc)push_data, data);
}
}
/* This callback triggers when appsrc has enough data and we can stop sending.
* We remove the idle handler from the mainloop */
static void stop_feed(GstElement *source, CustomData *data) {
if (data->sourceid != 0) {
g_print("Stop feeding\n");
g_source_remove(data->sourceid);
data->sourceid = 0;
}
}
/* The appsink has received a buffer */
static GstFlowReturn new_sample(GstElement *sink, CustomData *data) {
GstSample *sample;
/* Retrieve the buffer */
g_signal_emit_by_name(sink, "pull-sample", &sample);
if (sample) {
/* The only thing we do in this example is print a * to indicate a received
* buffer */
g_print("got buffer\n");
gst_sample_unref(sample);
return GST_FLOW_OK;
}
return GST_FLOW_ERROR;
}
/* This function is called when an error message is posted on the bus */
static void error_cb(GstBus *bus, GstMessage *msg, CustomData *data) {
GError *err;
gchar *debug_info;
/* Print error details on the screen */
gst_message_parse_error(msg, &err, &debug_info);
g_printerr("Error received from element %s: %s\n", GST_OBJECT_NAME(msg->src),
err->message);
g_printerr("Debugging information: %s\n", debug_info ? debug_info : "none");
g_clear_error(&err);
g_free(debug_info);
g_main_loop_quit(data->main_loop);
}
int tutorial_main(int argc, char *argv[]) {
CustomData data;
/* Initialize cumstom data structure */
memset(&data, 0, sizeof(data));
data.b = 1; /* For waveform generation */
data.d = 1;
gst_init(&argc, &argv);
/* Create the elements */
data.app_source = gst_element_factory_make("appsrc", "audio_source");
data.tee = gst_element_factory_make("tee", "tee");
data.audio_queue = gst_element_factory_make("queue", "audio_queue");
data.audio_convert1 =
gst_element_factory_make("audioconvert", "audio_convert1");
data.audio_resample =
gst_element_factory_make("audioresample", "audio_resample");
data.audio_sink = gst_element_factory_make("autoaudiosink", "audio_sink");
data.app_queue = gst_element_factory_make("queue", "app_queue");
data.app_sink = gst_element_factory_make("appsink", "app_sink");
/* Create the empty pipeline */
data.pipeline = gst_pipeline_new("test-pipeline");
/* Configure appsrc */
GstAudioInfo info;
gst_audio_info_set_format(&info, GST_AUDIO_FORMAT_S16, SAMPLE_RATE, 1, NULL);
GstCaps *audio_caps = gst_audio_info_to_caps(&info);
g_object_set(data.app_source, "caps", audio_caps, "format", GST_FORMAT_TIME,
NULL);
g_signal_connect(data.app_source, "need-data", G_CALLBACK(start_feed), &data);
g_signal_connect(data.app_source, "enough-data", G_CALLBACK(stop_feed),
&data);
/* Configure appsink */
g_object_set(data.app_sink, "emit-signals", TRUE, "caps", audio_caps, NULL);
g_signal_connect(data.app_sink, "new-sample", G_CALLBACK(new_sample), &data);
gst_caps_unref(audio_caps);
// link elements
gst_bin_add_many(GST_BIN(data.pipeline), data.app_source, data.tee,
data.audio_queue, data.audio_convert1, data.audio_resample,
data.audio_sink, data.app_queue, data.app_sink, NULL);
if (gst_element_link_many(data.app_source, data.tee, NULL) != TRUE) {
g_printerr("Elements could not be linked.\n");
gst_object_unref(data.pipeline);
return -1;
}
if (gst_element_link_many(data.audio_queue, data.audio_convert1,
data.audio_resample, data.audio_sink,
NULL) != TRUE) {
g_printerr("Elements could not be linked.\n");
gst_object_unref(data.pipeline);
return -1;
}
if (gst_element_link_many(data.app_queue, data.app_sink, NULL) != TRUE) {
g_printerr("Elements could not be linked.\n");
gst_object_unref(data.pipeline);
return -1;
}
GstPad *tee_audio_pad = gst_element_request_pad_simple(data.tee, "src_%u");
g_print("Obtained request pad %s for audio branch.\n",
gst_pad_get_name(tee_audio_pad));
GstPad *queue_audio_pad =
gst_element_get_static_pad(data.audio_queue, "sink");
if (gst_pad_link(tee_audio_pad, queue_audio_pad) != GST_PAD_LINK_OK) {
g_printerr("Tee could not be linked.\n");
gst_object_unref(data.pipeline);
return -1;
}
gst_object_unref(queue_audio_pad);
GstPad *tee_app_pad = gst_element_request_pad_simple(data.tee, "src_%u");
GstPad *queue_app_pad = gst_element_get_static_pad(data.app_queue, "sink");
if (gst_pad_link(tee_app_pad, queue_app_pad) != GST_PAD_LINK_OK) {
g_printerr("Tee could not be linked.\n");
gst_object_unref(data.pipeline);
return -1;
}
gst_object_unref(queue_app_pad);
GstBus *bus = gst_element_get_bus(data.pipeline);
gst_bus_add_signal_watch(bus);
g_signal_connect(G_OBJECT(bus), "message::error", (GCallback)error_cb, &data);
gst_object_unref(bus);
/* Start playing the pipeline */
gst_element_set_state(data.pipeline, GST_STATE_PLAYING);
GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(data.pipeline), GST_DEBUG_GRAPH_SHOW_ALL,
"pipeline");
/* Create a GLib Main Loop and set it to run */
data.main_loop = g_main_loop_new(NULL, FALSE);
g_main_loop_run(data.main_loop);
/* Free resources */
gst_element_set_state(data.pipeline, GST_STATE_NULL);
gst_object_unref(data.pipeline);
return 0;
}
int main(int argc, char *argv[]) {
#if defined(__APPLE__) && TARGET_OS_MAC && !TARGET_OS_IPHONE
return gst_macos_main((GstMainFunc)tutorial_main, argc, argv, NULL);
#else
return tutorial_main(argc, argv);
#endif
}
对上面代码先做一些基本的解释
- 创建出我们需要的 Element,包括 appsrc、appsink、tee、queue、autoaudiosink 等等
- 配置 appsrc 和 appsink,这块是本章内容重点,后面细说。
- 构建 pipeline,连接各个元素,其中 tee 元素的使用在上一章中我们已经讲过,不再赘述。
- 启动 pipeline,监听 bus 数据等常规操作,不再赘述。
- 创建并运行一个 GLib 的主循环,你可以认为这里启动了一个线程池,用于执行某些任务。这个也后面细说。
配置 appsrc 和 appsink
/* Configure appsrc */
GstAudioInfo info;
gst_audio_info_set_format(&info, GST_AUDIO_FORMAT_S16, SAMPLE_RATE, 1, NULL);
GstCaps *audio_caps = gst_audio_info_to_caps(&info);
g_object_set(data.app_source, "caps", audio_caps, "format", GST_FORMAT_TIME,
NULL);
g_signal_connect(data.app_source, "need-data", G_CALLBACK(start_feed), &data);
g_signal_connect(data.app_source, "enough-data", G_CALLBACK(stop_feed),
&data);
这段代码是在配置 GStreamer 管道中的 appsrc 元素。让我逐步解释:
-
创建并设置音频信息:
GstAudioInfo info; gst_audio_info_set_format(&info, GST_AUDIO_FORMAT_S16, SAMPLE_RATE, 1, NULL);
这里设置了音频格式(16位有符号整数)、采样率(SAMPLE_RATE)和通道数(1,即单声道)。
-
创建音频能力(caps):
GstCaps *audio_caps = gst_audio_info_to_caps(&info);
将音频信息转换为 GStreamer 能力描述。
-
设置 appsrc 的属性:
g_object_set(data.app_source, "caps", audio_caps, "format", GST_FORMAT_TIME, NULL);
设置 appsrc 的能力(caps)和时间格式。
-
连接信号处理函数:
g_signal_connect(data.app_source, "need-data", G_CALLBACK(start_feed), &data); g_signal_connect(data.app_source, "enough-data", G_CALLBACK(stop_feed), &data);
这里连接了两个信号处理函数:
- “need-data”: 当 appsrc 需要更多数据时调用 start_feed 函数
- “enough-data”: 当 appsrc 有足够数据时调用 stop_feed 函数
/* Configure appsink */
g_object_set(data.app_sink, "emit-signals", TRUE, "caps", audio_caps, NULL);
g_signal_connect(data.app_sink, "new-sample", G_CALLBACK(new_sample), &data);
gst_caps_unref(audio_caps);
这段代码配置了 GStreamer 管道中的 appsink 元素。让我详细解释一下:
-
设置 appsink 的属性:
g_object_set(data.app_sink, "emit-signals", TRUE, "caps", audio_caps, NULL);
这行代码设置了 appsink 的两个属性:
- “emit-signals” 设置为 TRUE:这使得 appsink 会发出信号(如 “new-sample”)当新的样本到达时。
- “caps” 设置为 audio_caps:这指定了 appsink 接受的数据格式,确保只有符合这个格式的数据才会被接受。
-
连接信号处理函数:
g_signal_connect(data.app_sink, "new-sample", G_CALLBACK(new_sample), &data);
这行代码将 “new-sample” 信号与 new_sample 函数关联。每当 appsink 接收到新的音频样本时,new_sample 函数就会被调用。
-
释放 caps:
gst_caps_unref(audio_caps);
这行代码释放了之前创建的 audio_caps 对象,因为它已经不再需要了。这是良好的内存管理实践。
appsrc 回调函数
static void start_feed(GstElement *source, guint size, CustomData *data) {
if (data->sourceid == 0) {
g_print("Start feeding\n");
data->sourceid = g_idle_add((GSourceFunc)push_data, data);
}
}
static void stop_feed(GstElement *source, CustomData *data) {
if (data->sourceid != 0) {
g_print("Stop feeding\n");
g_source_remove(data->sourceid);
data->sourceid = 0;
}
}
appsrc 连接了两个信号回调函数:
- “need-data” ,当 appsrc 需要更多数据时调用 start_feed 函数,start_feed 使用
g_idle_add
确保 push_data 在主循环的空闲时间被调用。 - “enough-data”,当 appsrc 有足够数据时调用 stop_feed 函数,stop_feed 使
g_source_remove(data->sourceid)
移除之前添加的空闲函数,以停止调用 push_data
而 push_data 函数中,做了几件重要的事情:
- 生成音频的 Buffer
- 使用
g_signal_emit_by_name
将 buffer push 到 appsrc 元素中 push_data
函数的返回值决定了空闲函数是否继续执行:
-
返回
TRUE
:- 表示函数应继续被调用。
push_data
将继续运行,继续合成和推送新的音频数据到appsrc
。
-
返回
FALSE
:- 表示函数应该停止。
- 空闲函数将被移除,不会再被调用。
- 通常用于处理错误情况(例如
ret != GST_FLOW_OK
),以停止数据推送进程。
总结下我们是如何通过 appsrc 向 pipeline 动态发送数据:
- 在 appsrc 中注册 “need-data” 和 “enough-data” 信号的回调函数
- 当 appsrc 发出 “need-data” 时,回调函数向主循环中添加一个 push_data 的任务
- push_data 中负责生成数据,并通过
g_signal_emit_by_name
将数据发送给 appsrc - 当 appsrc 缓存区满了后,发送 “enough-data” 信号,回调函数将 push_data 任务从主循环中移除。
- “need-data” 和 “enough-data” 在运行过程中会交替的重复出现。
appsink 回调函数
static GstFlowReturn new_sample(GstElement *sink, CustomData *data) {
GstSample *sample;
/* Retrieve the buffer */
g_signal_emit_by_name(sink, "pull-sample", &sample);
if (sample) {
/* The only thing we do in this example is print a * to indicate a received
* buffer */
g_print("got buffer\n");
gst_sample_unref(sample);
return GST_FLOW_OK;
}
return GST_FLOW_ERROR;
}
appsink 中注册了 “new-sample” 的回调函数,当接收到数据时发送该信号,在其回调函数中,使用
g_signal_emit_by_name(sink, "pull-sample", &sample)
从 appsink 中拉取接收到的数据。
总结
本文介绍了 appsrc 和 appsink 的使用,通过使用 Element Signals 和 Element Actions 机制,我们可以动态地与元素进行交互。
参考
- Basic tutorial 8: Short-cutting the pipeline
- basic-tutorial-8.c