本章将讨论如何在应用程序中使用多种方式操作管道。本章的部分内容都是很底层的,所以在开始阅读之前,请确保你需要一些编程知识,并对GStreamer有很好的理解。
这里将讨论的主题包括如何从应用程序向管道中插入数据,如何从管道中读取数据,如何操作管道的速度、长度、起点,以及如何监听管道的数据处理过程。
Using probes
监听probe最好设想对一个pad listener。从技术上讲,探针probe只不过是一个可以连接到pad上的回调函数。可以使用gst_pad_add_probe()附加探测。类似地,可以使用gst_pad_remove_probe()再次删除回调函数。探针会通知您在pad上发生的任何活动,如buffer、event和queries。您可以在添加探针时定义您感兴趣的通知类型。
探针可以在pad上通知您以下活动:
-  buffer被push或pull。您希望在注册探针时指定GST_PAD_PROBE_TYPE_BUFFER。因为pad可以以不同的方式调度,还可以通过可选的标志GST_PAD_PROBE_TYPE_PUSH和GST_PAD_PROBE_TYPE_PULL指定用户感兴趣的调度模式。 
 您可以使用此探针来检查、修改或删除buffer。参见Data probes。
-  一个bufferlist被推送push。在注册探测时,使用GST_PAD_PROBE_TYPE_BUFFER_LIST。 
-  事件在pad上传播。使用GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM和GST_PAD_PROBE_TYPE_EVENT_UPSTREAM标志来选择下游和上游事件。还有一个方便的GST_PAD_PROBE_TYPE_EVENT_BOTH,用于通知发生在上游和下游的事件。默认情况下,flush事件不会触发通知。用户需要显式启用GST_PAD_PROBE_TYPE_EVENT_FLUSH来接收刷出事件的回调函数。只有在推送模式下才会通知事件。 
 您可以使用此探针来检查、修改或删除事件。
-  查询在一个pad上传递。使用GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM和GST_PAD_PROBE_TYPE_QUERY_UPSTREAM标志来选择下游和上游查询。GST_PAD_PROBE_TYPE_QUERY_BOTH也可用于选择两个方向。查询探针将被通知两次,一次是在查询上行/下行时,一次是在查询结果返回时。可以使用GST_PAD_PROBE_TYPE_PUSH和GST_PAD_PROBE_TYPE_PULL分别在执行查询和返回查询结果时选择在哪个阶段调用回调函数。 
 您可以使用这个探针来检查或修改查询。还可以在probe回调函数中回答查询,方法是在查询中放置结果值,并从回调函数中返回GST_PAD_PROBE_DROP。
-  除了通知您数据流之外,您还可以在回调函数返回时要求探针阻塞数据流。这称为阻塞探测(blocking probe),通过指定GST_PAD_PROBE_TYPE_BLOCK标志激活。您可以将此标志与其他标志一起使用,以仅阻塞选定活动上的数据流。如果移除探测,或者从回调函数中返回GST_PAD_PROBE_REMOVE,则pad将再次变得畅通。通过从回调函数返回GST_PAD_PROBE_PASS,可以只让当前被阻塞的元素通过,它会再次阻塞下一个元素。 
 阻塞探测用于临时阻塞pad,因为它们未连接或因为您将要断开它们的连接。如果数据流没有被阻塞,如果数据被推送到未链接的pad上,管道将进入错误状态。我们将介绍如何使用阻塞探测对管道进行部分预滚动。参见Play a region of a media file。
-  当pad上没有活动时收到通知。使用GST_PAD_PROBE_TYPE_IDLE标志安装该探测。你可以指定GST_PAD_PROBE_TYPE_PUSH和/或GST_PAD_PROBE_TYPE_PULL只收到通知,具体取决于pad调度模式。空闲探针也是一个阻塞探针,因为只要空闲探针安装,它就不会让任何数据在pad上传递。 
 您可以使用空闲探针动态地重新链接pad。我们将看到如何使用空闲探测替换管道中的元素。另请参阅Dynamically changing the pipeline。
Data probes
数据探测器Data probes允许您在pad上有数据传递时收到通知。在添加探测时,指定GST_PAD_PROBE_TYPE_BUFFER和/或GST_PAD_PROBE_TYPE_BUFFER_LIST。
数据探测Data probes在pipeline流线程上下文中运行,所以回调应该尽量不阻塞,通常不做任何奇怪的事情,因为这可能会对pipeline性能产生负面影响,或者在出现bug的情况下,导致死锁或崩溃。更准确地说,通常不应该在probe回调中调用任何gui相关的函数,也不应该试图更改管道的状态。应用程序可以在管道的总线上发布自定义消息,以便与应用程序主线程通信,并让它执行诸如停止管道之类的操作。
在任何情况下,元素可以在_chain()函数中进行的最常见的buffer操作,也可以在probe回调函数中进行。下面的例子简要介绍了如何使用它们。
#include <gst/gst.h>
static GstPadProbeReturn
cb_have_data (GstPad          *pad,
              GstPadProbeInfo *info,
              gpointer         user_data) {
  gint x, y;
  GstMapInfo map;
  guint16 *ptr, t;
  GstBuffer *buffer;
  buffer = GST_PAD_PROBE_INFO_BUFFER (info);
  buffer = gst_buffer_make_writable (buffer);
  /* Making a buffer writable can fail (for example if it
   * cannot be copied and is used more than once)
   */
  if (buffer == NULL)
    return GST_PAD_PROBE_OK;
  /* Mapping a buffer can fail (non-writable) */
  if (gst_buffer_map (buffer, &map, GST_MAP_WRITE)) {
    ptr = (guint16 *) map.data;
    /* invert data */
    for (y = 0; y < 288; y++) {
      for (x = 0; x < 384 / 2; x++) {
        t = ptr[384 - 1 - x];
        ptr[384 - 1 - x] = ptr[x];
        ptr[x] = t;
      }
      ptr += 384;
    }
    gst_buffer_unmap (buffer, &map);
  }
  GST_PAD_PROBE_INFO_DATA (info) = buffer;
  return GST_PAD_PROBE_OK;
}
gint
main (gint   argc,
      gchar *argv[])
{
  GMainLoop *loop;
  GstElement *pipeline, *src, *sink, *filter, *csp;
  GstCaps *filtercaps;
  GstPad *pad;
  /* init GStreamer */
  gst_init (&argc, &argv);
  loop = g_main_loop_new (NULL, FALSE);
  /* build */
  pipeline = gst_pipeline_new ("my-pipeline");
  src = gst_element_factory_make ("videotestsrc", "src");
  if (src == NULL)
    g_error ("Could not create 'videotestsrc' element");
  filter = gst_element_factory_make ("capsfilter", "filter");
  g_assert (filter != NULL); /* should always exist */
  csp = gst_element_factory_make ("videoconvert", "csp");
  if (csp == NULL)
    g_error ("Could not create 'videoconvert' element");
  sink = gst_element_factory_make ("xvimagesink", "sink");
  if (sink == NULL) {
    sink = gst_element_factory_make ("ximagesink", "sink");
    if (sink == NULL)
      g_error ("Could not create neither 'xvimagesink' nor 'ximagesink' element");
  }
  gst_bin_add_many (GST_BIN (pipeline), src, filter, csp, sink, NULL);
  gst_element_link_many (src, filter, csp, sink, NULL);
  filtercaps = gst_caps_new_simple ("video/x-raw",
               "format", G_TYPE_STRING, "RGB16",
               "width", G_TYPE_INT, 384,
               "height", G_TYPE_INT, 288,
               "framerate", GST_TYPE_FRACTION, 25, 1,
               NULL);
  g_object_set (G_OBJECT (filter), "caps", filtercaps, NULL);
  gst_caps_unref (filtercaps);
  pad = gst_element_get_static_pad (src, "src");
  gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BUFFER,
      (GstPadProbeCallback) cb_have_data, NULL, NULL);
  gst_object_unref (pad);
  /* run */
  gst_element_set_state (pipeline, GST_STATE_PLAYING);
  /* wait until it's up and running or failed */
  if (gst_element_get_state (pipeline, NULL, NULL, -1) == GST_STATE_CHANGE_FAILURE) {
    g_error ("Failed to go into PLAYING state");
  }
  g_print ("Running ...\n");
  g_main_loop_run (loop);
  /* exit */
  gst_element_set_state (pipeline, GST_STATE_NULL);
  gst_object_unref (pipeline);
  return 0;
}
将上述输出与“gst-launch-1.0 videotestsrc !”Xvimagesink”,这样你就知道你在寻找什么。
严格地说,pad probe回调只允许在缓冲区buffer可写的情况下修改缓冲区buffer的内容。这种情况在很大程度上取决于管道和涉及的元素。通常情况下是这样,但有时不是,如果不是这样,那么对数据或元数据的意外修改可能会引入难以调试和跟踪的bug。使用gst_buffer_is_writable()可以检查缓冲区是否可写。由于您可以传递回一个不同的缓冲区,因此在回调函数中使用gst_buffer_make_writable()使缓冲区可写是一个好主意。
Pad探头最适合在数据通过管道时查看数据。如果你需要修改数据,你最好编写自己的GStreamer元素。像gstaudifilter, gstvidefilter或GstBaseTransform这样的基类使这变得相当容易。
如果只想在缓冲区通过管道时进行检查,甚至不需要设置pad探测器。你也可以在管道中插入一个标识元素,并连接到它的“handoff”信号。identity元素还提供了一些有用的调试工具,如"dump"属性或"last-message"属性(后者可以通过将-v开关传递给gst-launch并将identity上的silent属性设置为FALSE来启用)。
播放媒体文件的一个区域
在这个例子中,我们将展示如何播放媒体文件中的一个区域。我们的目标是仅在2秒到5秒之间播放文件,然后执行EOS。
第一步,我们将把一个uridecodebin元素设置为暂停状态,并确保我们阻塞了所有创建的source pad。当所有的source pad都被阻塞时,我们就有了所有source pad上的数据,我们说uridecodebin是预滚动的。(src元素设定为PAUSED)
在预先滚动的管道中,我们可以请求媒体的持续时间,也可以执行查找操作。我们感兴趣的是在管道上执行查找操作,以选择感兴趣的媒体范围。
配置好感兴趣的区域后,可以链接sink元素,解除阻塞source pads,并将管道设置为播放状态。用户可以看到,请求的region在发送到EOS之前就已经被sink播放了。
下面是一个大致遵循该算法的示例应用程序。
#include <gst/gst.h>
static GMainLoop *loop;
static volatile gint counter;
static GstBus *bus;
static gboolean prerolled = FALSE;
static GstPad *sinkpad;
static void
dec_counter (GstElement * pipeline)
{
  if (prerolled)
    return;
  if (g_atomic_int_dec_and_test (&counter)) {
    /* all probes blocked and no-more-pads signaled, post
     * message on the bus. */
    prerolled = TRUE;
    gst_bus_post (bus, gst_message_new_application (
          GST_OBJECT_CAST (pipeline),
          gst_structure_new_empty ("ExPrerolled")));
  }
}
/* called when a source pad of uridecodebin is blocked */
static GstPadProbeReturn
cb_blocked (GstPad          *pad,
            GstPadProbeInfo *info,
            gpointer         user_data)
{
  GstElement *pipeline = GST_ELEMENT (user_data);
  if (prerolled)
    return GST_PAD_PROBE_REMOVE;
  dec_counter (pipeline);
  return GST_PAD_PROBE_OK;
}
/* called when uridecodebin has a new pad */
static void
cb_pad_added (GstElement *element,
              GstPad     *pad,
              gpointer    user_data)
{
  GstElement *pipeline = GST_ELEMENT (user_data);
  if (prerolled)
    return;
  g_atomic_int_inc (&counter);
  gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
      (GstPadProbeCallback) cb_blocked, pipeline, NULL);
  /* try to link to the video pad */
  gst_pad_link (pad, sinkpad);
}
/* called when uridecodebin has created all pads */
static void
cb_no_more_pads (GstElement *element,
                 gpointer    user_data)
{
  GstElement *pipeline = GST_ELEMENT (user_data);
  if (prerolled)
    return;
  dec_counter (pipeline);
}
/* called when a new message is posted on the bus */
static void
cb_message (GstBus     *bus,
            GstMessage *message,
            gpointer    user_data)
{
  GstElement *pipeline = GST_ELEMENT (user_data);
  switch (GST_MESSAGE_TYPE (message)) {
    case GST_MESSAGE_ERROR:
      g_print ("we received an error!\n");
      g_main_loop_quit (loop);
      break;
    case GST_MESSAGE_EOS:
      g_print ("we reached EOS\n");
      g_main_loop_quit (loop);
      break;
    case GST_MESSAGE_APPLICATION:
    {
      if (gst_message_has_name (message, "ExPrerolled")) {
        /* it's our message */
        g_print ("we are all prerolled, do seek\n");
        gst_element_seek (pipeline,
            1.0, GST_FORMAT_TIME,
            GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE,
            GST_SEEK_TYPE_SET, 2 * GST_SECOND,
            GST_SEEK_TYPE_SET, 5 * GST_SECOND);
        gst_element_set_state (pipeline, GST_STATE_PLAYING);
      }
      break;
    }
    default:
      break;
  }
}
gint
main (gint   argc,
      gchar *argv[])
{
  GstElement *pipeline, *src, *csp, *vs, *sink;
  /* init GStreamer */
  gst_init (&argc, &argv);
  loop = g_main_loop_new (NULL, FALSE);
  if (argc < 2) {
    g_print ("usage: %s <uri>", argv[0]);
    return -1;
  }
  /* build */
  pipeline = gst_pipeline_new ("my-pipeline");
  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
  gst_bus_add_signal_watch (bus);
  g_signal_connect (bus, "message", (GCallback) cb_message,
      pipeline);
  src = gst_element_factory_make ("uridecodebin", "src");
  if (src == NULL)
    g_error ("Could not create 'uridecodebin' element");
  g_object_set (src, "uri", argv[1], NULL);
  csp = gst_element_factory_make ("videoconvert", "csp");
  if (csp == NULL)
    g_error ("Could not create 'videoconvert' element");
  vs = gst_element_factory_make ("videoscale", "vs");
  if (csp == NULL)
    g_error ("Could not create 'videoscale' element");
  sink = gst_element_factory_make ("autovideosink", "sink");
  if (sink == NULL)
    g_error ("Could not create 'autovideosink' element");
  gst_bin_add_many (GST_BIN (pipeline), src, csp, vs, sink, NULL);
  /* can't link src yet, it has no pads */
  gst_element_link_many (csp, vs, sink, NULL);
  sinkpad = gst_element_get_static_pad (csp, "sink");
  /* for each pad block that is installed, we will increment
   * the counter. for each pad block that is signaled, we
   * decrement the counter. When the counter is 0 we post
   * an app message to tell the app that all pads are
   * blocked. Start with 1 that is decremented when no-more-pads
   * is signaled to make sure that we only post the message
   * after no-more-pads */
  g_atomic_int_set (&counter, 1);
  g_signal_connect (src, "pad-added",
      (GCallback) cb_pad_added, pipeline);
  g_signal_connect (src, "no-more-pads",
      (GCallback) cb_no_more_pads, pipeline);
  gst_element_set_state (pipeline, GST_STATE_PAUSED);
  g_main_loop_run (loop);
  gst_element_set_state (pipeline, GST_STATE_NULL);
  gst_object_unref (sinkpad);
  gst_object_unref (bus);
  gst_object_unref (pipeline);
  g_main_loop_unref (loop);
  return 0;
}
注意,我们使用了一个自定义的应用消息来通知主线程uridecidebin被预滚动。主线程会向请求的region发出刷出指令。刷写操作会暂时解除对pad的阻塞,并在新数据再次到达时重新阻塞它们。我们检测第二个阻塞来删除探针。然后我们将管道设置为播放,它应该从2秒到5秒播放,然后结束并退出应用程序。
Manually adding or removing data from/to a pipeline
动态改变管道
在本节中,我们将讨论一些动态修改管道的技术。我们特别讨论的是在播放状态PLAYING下改变管道,而不中断流。
在构建动态管道时,有一些重要的事情需要考虑:
-  当从管道中删除元素时,请确保未连接的pads上没有数据流dataflow,因为这将导致致命的管道错误。断开pad连接之前一定要阻塞source pads(in push mode)或sink pads(pull mode)之前断开连接垫。另请参阅更改管道中的元素 Changing elements in a pipeline。 
-  在向管道添加元素时,确保将元素置于正确的状态,通常与父元素的状态相同,然后允许数据流传递元素。当新创建元素时,它的状态为NULL,并在接收到数据时返回错误。另请参阅更改管道中的元素Changing elements in a pipeline。 
-  当向管道添加元素时,GStreamer会默认将元素上的时钟和基本时间设置为管道的当前值。这意味着该元素将能够构建与管道中其他元素相同的管道运行时running-time。这意味着sink将像管道中的其他sink一样同步缓冲区,并且源产生的缓冲区的运行时与其他源匹配。 
-  当从上游链upstream中断开链接时,通过向元素sink pad发送EOS事件并等待EOS离开元素(使用事件探针),始终确保刷新元素中queue的任何数据。 - 如果不这样做,则会丢失未链接元素buffer数据。这可能会导致简单的帧丢失(一个或多个视频帧,几毫秒的音频)。但是,如果从管道中删除muxer(在某些情况下是编码器或类似的元素),则有可能得到无法正常播放的损坏文件,因为一些相关元数据(头、查找/索引表、内部同步标签)将无法正确存储或更新。另请参阅更改管道中的元素Changing elements in a pipeline。
 
-  live source将在管道中产生具有当前运行时的(运行时的buffer)。 -  没有live source的管道生成buffer的运行时间从0开始。同样,在flushing seek之后,这些管道会将运行时间重置为0。 
-  运行时间running-time可以用gst_pad_set_offset()改变。为了保持同步,了解管道中各个元素的运行时间是很重要的。 
 
-  
-  添加元素可能会改变管道的状态。例如,添加一个non-prerolled sink可以将管道恢复到预滚动状态。例如,删除non-prerolled sink可能会将管道更改为暂停和播放状态。 -  添加一个live source会取消preroll阶段,并将管道置于播放状态。添加live source或其他live elements也可能会改变管道的延迟。 
-  向管道添加或删除元素可能会改变管道的时钟选择。如果新添加的元素提供了时钟,那么将流水线中的时钟更改为新时钟可能是值得的。另一方面,如果为管道提供时钟的成员被移除,则必须选择一个新的时钟。 
 
-  
-  添加和删除元素可能会导致上游或下游元素重新协商caps和or分配器allocators。你实际上不需要从应用程序中做任何事情,插件很大程度上自适应新的管道拓扑,以优化它们的格式和分配策略。 
重要的是,当你在管道中添加、删除或更改元素时,管道可能需要协商新的格式,而这可能会失败。通常你可以通过在需要的地方插入正确的转换器元素来解决这个问题。另请参阅更改管道中的元素。
GStreamer提供了对任何动态管道修改的支持,但它需要您在不导致管道错误的情况下执行此操作之前了解一些细节。在下面的小节中,我们将演示几个典型的用例
改变管道中的元素
在下一个例子中,我们将看到以下元素连接:
  我们希望在管道处于播放状态时通过element4改变element2。假设element2是一个可视化对象,您希望在管道中切换可视化对象。
我们希望在管道处于播放状态时通过element4改变element2。假设element2是一个可视化对象,您希望在管道中切换可视化对象。
我们不能只是将element2的sinkpad从element1的source pad中断开链接,因为这样会使element1的source pad断开链接,并且在将数据推送到source pad时,会导致管道中的流错误。方法是在我们用element4改变element2之前,先阻塞element1的source pad的数据流,然后恢复数据流,步骤如下:
-  用阻塞pad probe阻塞element1的source pad。当pad被阻塞时,probe回调将被调用。 
-  在阻塞回调函数中,没有任何东西在element1和element2之间流动,直到被解阻塞。 
-  Unlink element1和element2。 
-  确保数据从element2中流出。有些元素可能会在内部保留一些数据,您需要确保不会通过强制从element2中丢失数据。你可以通过将EOS推送到element2来做到这一点,如下所示: -  在element2的source pad上放置一个event probe。 
-  将EOS发送到element2的sinkpad。这确保了element2中的所有数据都被强制取出。 
-  等待EOS事件出现在element2的source pad上。当接收到EOS时,删除它并删除事件探针。 
 
-  
-  Unlink element2和element3。现在还可以从管道中删除element2,并将element2状态设置为NULL。 
-  将element4添加到管道(如果还没有添加的话)。链接element4和element3。链接element1和element4。 
-  确保element4与管道中其他元素处于相同的状态。它至少应该处于暂停状态,然后才能接收缓冲区buffer和事件event。 
-  Unblock element1的source pad probe。这将让新数据进入element4并继续流。 
上述方法在source被阻塞时有效,即在管道中有数据流时。如果没有数据流,(暂时)也没有必要更改元素,因此该方法也可以在暂停状态下使用。
让我们通过一个例子来展示它是如何工作的。这个例子每秒改变一个简单管道上的视频效果。
#include <gst/gst.h>
static gchar *opt_effects = NULL;
#define DEFAULT_EFFECTS "identity,exclusion,navigationtest," \
    "agingtv,videoflip,vertigotv,gaussianblur,shagadelictv,edgetv"
static GstPad *blockpad;
static GstElement *conv_before;
static GstElement *conv_after;
static GstElement *cur_effect;
static GstElement *pipeline;
static GQueue effects = G_QUEUE_INIT;
static GstPadProbeReturn
event_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
  GMainLoop *loop = user_data;
  GstElement *next;
  if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_DATA (info)) != GST_EVENT_EOS)
    return GST_PAD_PROBE_PASS;
  gst_pad_remove_probe (pad, GST_PAD_PROBE_INFO_ID (info));
  /* push current effect back into the queue */
  g_queue_push_tail (&effects, gst_object_ref (cur_effect));
  /* take next effect from the queue */
  next = g_queue_pop_head (&effects);
  if (next == NULL) {
    GST_DEBUG_OBJECT (pad, "no more effects");
    g_main_loop_quit (loop);
    return GST_PAD_PROBE_DROP;
  }
  g_print ("Switching from '%s' to '%s'..\n", GST_OBJECT_NAME (cur_effect),
      GST_OBJECT_NAME (next));
  gst_element_set_state (cur_effect, GST_STATE_NULL);
  /* remove unlinks automatically */
  GST_DEBUG_OBJECT (pipeline, "removing %" GST_PTR_FORMAT, cur_effect);
  gst_bin_remove (GST_BIN (pipeline), cur_effect);
  GST_DEBUG_OBJECT (pipeline, "adding   %" GST_PTR_FORMAT, next);
  gst_bin_add (GST_BIN (pipeline), next);
  GST_DEBUG_OBJECT (pipeline, "linking..");
  gst_element_link_many (conv_before, next, conv_after, NULL);
  gst_element_set_state (next, GST_STATE_PLAYING);
  cur_effect = next;
  GST_DEBUG_OBJECT (pipeline, "done");
  return GST_PAD_PROBE_DROP;
}
static GstPadProbeReturn
pad_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
  GstPad *srcpad, *sinkpad;
  GST_DEBUG_OBJECT (pad, "pad is blocked now");
  /* remove the probe first */
  gst_pad_remove_probe (pad, GST_PAD_PROBE_INFO_ID (info));
  /* install new probe for EOS */
  srcpad = gst_element_get_static_pad (cur_effect, "src");
  gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BLOCK |
      GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, event_probe_cb, user_data, NULL);
  gst_object_unref (srcpad);
  /* push EOS into the element, the probe will be fired when the
   * EOS leaves the effect and it has thus drained all of its data */
  sinkpad = gst_element_get_static_pad (cur_effect, "sink");
  gst_pad_send_event (sinkpad, gst_event_new_eos ());
  gst_object_unref (sinkpad);
  return GST_PAD_PROBE_OK;
}
static gboolean
timeout_cb (gpointer user_data)
{
  gst_pad_add_probe (blockpad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
      pad_probe_cb, user_data, NULL);
  return TRUE;
}
static gboolean
bus_cb (GstBus * bus, GstMessage * msg, gpointer user_data)
{
  GMainLoop *loop = user_data;
  switch (GST_MESSAGE_TYPE (msg)) {
    case GST_MESSAGE_ERROR:{
      GError *err = NULL;
      gchar *dbg;
      gst_message_parse_error (msg, &err, &dbg);
      gst_object_default_error (msg->src, err, dbg);
      g_clear_error (&err);
      g_free (dbg);
      g_main_loop_quit (loop);
      break;
    }
    default:
      break;
  }
  return TRUE;
}
int
main (int argc, char **argv)
{
  GOptionEntry options[] = {
    {"effects", 'e', 0, G_OPTION_ARG_STRING, &opt_effects,
        "Effects to use (comma-separated list of element names)", NULL},
    {NULL}
  };
  GOptionContext *ctx;
  GError *err = NULL;
  GMainLoop *loop;
  GstElement *src, *q1, *q2, *effect, *filter1, *filter2, *sink;
  gchar **effect_names, **e;
  ctx = g_option_context_new ("");
  g_option_context_add_main_entries (ctx, options, NULL);
  g_option_context_add_group (ctx, gst_init_get_option_group ());
  if (!g_option_context_parse (ctx, &argc, &argv, &err)) {
    g_print ("Error initializing: %s\n", err->message);
    g_clear_error (&err);
    g_option_context_free (ctx);
    return 1;
  }
  g_option_context_free (ctx);
  if (opt_effects != NULL)
    effect_names = g_strsplit (opt_effects, ",", -1);
  else
    effect_names = g_strsplit (DEFAULT_EFFECTS, ",", -1);
  for (e = effect_names; e != NULL && *e != NULL; ++e) {
    GstElement *el;
    el = gst_element_factory_make (*e, NULL);
    if (el) {
      g_print ("Adding effect '%s'\n", *e);
      g_queue_push_tail (&effects, el);
    }
  }
  pipeline = gst_pipeline_new ("pipeline");
  src = gst_element_factory_make ("videotestsrc", NULL);
  g_object_set (src, "is-live", TRUE, NULL);
  filter1 = gst_element_factory_make ("capsfilter", NULL);
  gst_util_set_object_arg (G_OBJECT (filter1), "caps",
      "video/x-raw, width=320, height=240, "
      "format={ I420, YV12, YUY2, UYVY, AYUV, Y41B, Y42B, "
      "YVYU, Y444, v210, v216, NV12, NV21, UYVP, A420, YUV9, YVU9, IYU1 }");
  q1 = gst_element_factory_make ("queue", NULL);
  blockpad = gst_element_get_static_pad (q1, "src");
  conv_before = gst_element_factory_make ("videoconvert", NULL);
  effect = g_queue_pop_head (&effects);
  cur_effect = effect;
  conv_after = gst_element_factory_make ("videoconvert", NULL);
  q2 = gst_element_factory_make ("queue", NULL);
  filter2 = gst_element_factory_make ("capsfilter", NULL);
  gst_util_set_object_arg (G_OBJECT (filter2), "caps",
      "video/x-raw, width=320, height=240, "
      "format={ RGBx, BGRx, xRGB, xBGR, RGBA, BGRA, ARGB, ABGR, RGB, BGR }");
  sink = gst_element_factory_make ("ximagesink", NULL);
  gst_bin_add_many (GST_BIN (pipeline), src, filter1, q1, conv_before, effect,
      conv_after, q2, sink, NULL);
  gst_element_link_many (src, filter1, q1, conv_before, effect, conv_after,
      q2, sink, NULL);
  gst_element_set_state (pipeline, GST_STATE_PLAYING);
  loop = g_main_loop_new (NULL, FALSE);
  gst_bus_add_watch (GST_ELEMENT_BUS (pipeline), bus_cb, loop);
  g_timeout_add_seconds (1, timeout_cb, loop);
  g_main_loop_run (loop);
  gst_element_set_state (pipeline, GST_STATE_NULL);
  gst_object_unref (pipeline);
  return 0;
}
补充
upstream and downstream

- src pad传到sink pad方向,就是downstream
- sink pad传到src pa方向,就是upstream
pull-mode and push-mode
- src元素会使用gst_pad_push()方法将数据推送到下游元素。下游对端pad将在Chain函数中接收buffer。在push-mode中,src元素是管道中的驱动力,因为它发起了数据传输。
- 元素也可以从上游元素中拉取数据。下游元素通过在其中一个sink pad上调用gst_pad_pull_range()来实现这一点。在pull mode下,下游元素是管道中的驱动力,因为它发起了数据传输。
参考1:Pipeline manipulation
 参考2:push-pull
 参考3:Downstream and upstream are the terms



















