1.引言
如果看了gstreamer官方教程配置多线程出现编译不过的问题了,不妨进来看看这篇文章或许能解决一些编译问题。
GStreamer 本质上是多线程的,并且是完全线程安全的。大多数线程内部对应用程序是隐藏的,这应该使应用程序开发更容易。但是,在某些情况下,应用程序可能希望影响其中的某些部分。 GStreamer 允许应用程序在pipeline的某些部分强制使用多个线程,GStreamer 还可以在创建线程时通知您,以便您可以配置要使用的线程优先级或线程池等内容。
2.GStreamer 中的调度
GStreamer pipeline中的每个element决定如何调度它。element可以选择是基于推还是基于拉的方式调度它们的pad。例如,一个element可以选择启动一个线程以开始从sink pad拉取 或/并 开始从source pad推送。element也可以选择使用上游(upstream )或下游(downstream )线程分别以推和拉模式对数据进行处理。 GStreamer 对element选择如何调度没有任何限制。有关更多详细信息,请参阅插件编写指南。
在任何情况下都会发生的是,某些element将启动一个线程来处理它们的数据,称为“流线程”(“streaming threads”)。当element需要创建一个流线程时,流线程或叫做 GstTask 对象是从 GstTaskPool 创建的。在下一节中,我们将看到如何接收任务和池的通知。
3.在 GStreamer 中配置线程
STREAM_STATUS 消息发布在总线上以通知有关流线程的状态。你将从消息中获得以下信息:
当将要创建新线程时,您将收到 GST_STREAM_STATUS_TYPE_CREATE 类型的通知。然后可以在 GstTask 中配置 GstTaskPool。自定义任务池将为任务提供自定义线程以实现流线程。
-
如果要配置自定义任务池,则需要同步处理此消息。如果你在此消息返回时未在任务上配置任务池,则任务将使用其默认池。
-
进入或离开线程时。这是你可以配置线程优先级的时刻。当线程被销毁时,你也会收到通知。
-
当线程开始、暂停和停止时,你会收到消息。这可用于在 gui 应用程序中可视化流线程的状态。
4.如何提高线程的优先级
让我们看一下上面的pipeline。我们希望提高流线程的优先级。appsrc element将启动流线程来将appsrc中src pad上的数据推送给对端queue element的sink pad。改变优先级的流程是这样的:
-
当从 READY 到 PAUSED 状态时, appsrc 将需要一个流线程来将数据推送到 queue 中。它将发布一条STREAM_STATUS 消息,指示其对流线程的需求。
-
应用程序将使用同步总线处理程序对 STREAM_STATUS 消息做出反应。然后它将在消息内的 GstTask 上配置自定义GstTaskPool。自定义任务池负责创建线程。在这个例子中,我们将创建一个具有更高优先级的线程。
-
或者,由于同步消息是在线程上下文中调用的,您可以使用线程 ENTER/LEAVE 通知来更改当前线程的优先级或调度策略。
第一步,我们需要实现一个可以在任务上配置的自定义 GstTaskPool。下面是一个 GstTaskPool 子类的实现,它使用 pthreads 创建一个 SCHED_RR 实时线程。请注意,创建实时线程可能需要额外的权限。
#include <gst/gst.h>
#include <gst/gsttaskpool.h>
#define TEST_TYPE_RT_POOL \
(test_rt_pool_get_type())
GType test_rt_pool_get_type (void);
GstTaskPool *test_rt_pool_new (void);
typedef struct _GstTaskPool TestRTPool;
typedef struct _GstTaskPoolClass TestRTPoolClass;
typedef struct
{
pthread_t thread;
} TaskPoolRTId;
G_DEFINE_TYPE (TestRTPool, test_rt_pool, GST_TYPE_TASK_POOL);
static void
default_prepare (GstTaskPool * pool, GError ** error)
{
/* we don't do anything here. We could construct a pool of threads here that
* we could reuse later but we don't */
}
static void
default_cleanup (GstTaskPool * pool)
{
}
static gpointer
default_push (GstTaskPool * pool, GstTaskPoolFunction func, gpointer data,
GError ** error)
{
TestRTId *tid;
gint res;
pthread_attr_t attr;
struct sched_param param;
tid = g_slice_new0 (TestRTId);
pthread_attr_init (&attr);
if ((res = pthread_attr_setschedpolicy (&attr, SCHED_RR)) != 0)
g_warning ("setschedpolicy: failure: %p", g_strerror (res));
param.sched_priority = 50;
if ((res = pthread_attr_setschedparam (&attr, ¶m)) != 0)
g_warning ("setschedparam: failure: %p", g_strerror (res));
if ((res = pthread_attr_setinheritsched (&attr, PTHREAD_EXPLICIT_SCHED)) != 0)
g_warning ("setinheritsched: failure: %p", g_strerror (res));
res = pthread_create (&tid->thread, &attr, (void *(*)(void *)) func, data);
if (res != 0) {
g_set_error (error, G_THREAD_ERROR, G_THREAD_ERROR_AGAIN,
"Error creating thread: %s", g_strerror (res));
g_slice_free (TestRTId, tid);
tid = NULL;
}
return tid;
}
static void
default_join (GstTaskPool * pool, gpointer id)
{
TestRTId *tid = (TestRTId *) id;
pthread_join (tid->thread, NULL);
g_slice_free (TestRTId, tid);
}
static void
test_rt_pool_class_init (TestRTPoolClass * klass)
{
GstTaskPoolClass *gsttaskpool_class;
gsttaskpool_class = (GstTaskPoolClass *) klass;
gsttaskpool_class->prepare = default_prepare;
gsttaskpool_class->cleanup = default_cleanup;
gsttaskpool_class->push = default_push;
gsttaskpool_class->join = default_join;
}
static void
test_rt_pool_init (TestRTPool * pool)
{
}
GstTaskPool *
test_rt_pool_new (void)
{
GstTaskPool *pool;
pool = g_object_new (TEST_TYPE_RT_POOL, NULL);
return pool;
}
编写任务池时要实现的重要功能是“push”功能。 实现上应该启动一个调用给定函数的线程。 更复杂的实现可能希望在池中保留一些线程,因为创建和销毁线程并不总是最快的操作。
在下一步中,我们需要在 appsrc 需要时实际配置自定义任务池。 为此,我们使用同步处理程序拦截 STREAM_STATUS 消息。
static GMainLoop* loop;
static void
on_stream_status (GstBus *bus,
GstMessage *message,
gpointer user_data)
{
GstStreamStatusType type;
GstElement *owner;
const GValue *val;
GstTask *task = NULL;
gst_message_parse_stream_status (message, &type, &owner);
val = gst_message_get_stream_status_object (message);
/* see if we know how to deal with this object */
if (G_VALUE_TYPE (val) == GST_TYPE_TASK) {
task = g_value_get_object (val);
}
switch (type) {
case GST_STREAM_STATUS_TYPE_CREATE:
if (task) {
GstTaskPool *pool;
pool = test_rt_pool_new();
gst_task_set_pool (task, pool);
}
break;
default:
break;
}
}
static void
on_error (GstBus *bus,
GstMessage *message,
gpointer user_data)
{
g_message ("received ERROR");
g_main_loop_quit (loop);
}
static void
on_eos (GstBus *bus,
GstMessage *message,
gpointer user_data)
{
g_main_loop_quit (loop);
}
int
main (int argc, char *argv[])
{
GstElement *bin, *AppSrc;
GstBus *bus;
GstStateChangeReturn ret;
gst_init (&argc, &argv);
/* create a new bin to hold the elements */
bin = gst_pipeline_new ("pipeline");
g_assert (bin);
//pseudo-code
/* create some elements */
AppSrc = gst_element_factory_make ("appsrc", "appsrc");
.............
/* add objects to the main pipeline */
gst_bin_add_many (GST_BIN (bin),AppSrc ,..., ..., NULL);
/* link the elements */
gst_element_link_many(AppSrc,..., ...,null);
loop = g_main_loop_new (NULL, FALSE);
/* get the bus, we need to install a sync handler */
bus = gst_pipeline_get_bus (GST_PIPELINE (bin));
gst_bus_enable_sync_message_emission (bus);
gst_bus_add_signal_watch (bus);
g_signal_connect (bus, "sync-message::stream-status",
(GCallback) on_stream_status, NULL);
g_signal_connect (bus, "message::error",
(GCallback) on_error, NULL);
g_signal_connect (bus, "message::eos",
(GCallback) on_eos, NULL);
/* start playing */
ret = gst_element_set_state (bin, GST_STATE_PLAYING);
if (ret != GST_STATE_CHANGE_SUCCESS) {
g_message ("failed to change state");
return -1;
}
/* Run event loop listening for bus messages until EOS or ERROR */
g_main_loop_run (loop);
/* stop the bin */
gst_element_set_state (bin, GST_STATE_NULL);
gst_object_unref (bus);
g_main_loop_unref (loop);
return 0;
}
请注意,该程序可能需要 root 权限才能创建实时线程。 当无法创建线程时,状态更改函数将失败,我们在上面的应用程序中捕获了这一点。
当pipeline中有多个线程时,您将收到多个 STREAM_STATUS 消息。 您应该使用消息的所有者(可能是 pad 或启动线程的element)来确定该线程在应用程序上下文中的功能是什么。