GStreamer中如何自定义配置线程优先级

news2024/11/26 12:27:09

1.引言

如果看了gstreamer官方教程配置多线程出现编译不过的问题了,不妨进来看看这篇文章或许能解决一些编译问题。

GStreamer 本质上是多线程的,并且是完全线程安全的。大多数线程内部对应用程序是隐藏的,这应该使应用程序开发更容易。但是,在某些情况下,应用程序可能希望影响其中的某些部分。 GStreamer 允许应用程序在pipeline的某些部分强制使用多个线程,GStreamer 还可以在创建线程时通知您,以便您可以配置要使用的线程优先级或线程池等内容。

2.GStreamer 中的调度

GStreamer pipeline中的每个element决定如何调度它。element可以选择是基于推还是基于拉的方式调度它们的pad。例如,一个element可以选择启动一个线程以开始从sink pad拉取 或/并 开始从source pad推送。element也可以选择使用上游(upstream )或下游(downstream )线程分别以推和拉模式对数据进行处理。 GStreamer 对element选择如何调度没有任何限制。有关更多详细信息,请参阅插件编写指南。

在任何情况下都会发生的是,某些element将启动一个线程来处理它们的数据,称为“流线程”(“streaming threads”)。当element需要创建一个流线程时,流线程或叫做 GstTask 对象是从 GstTaskPool 创建的。在下一节中,我们将看到如何接收任务和池的通知。

3.在 GStreamer 中配置线程

STREAM_STATUS 消息发布在总线上以通知有关流线程的状态。你将从消息中获得以下信息:

当将要创建新线程时,您将收到 GST_STREAM_STATUS_TYPE_CREATE 类型的通知。然后可以在 GstTask 中配置 GstTaskPool。自定义任务池将为任务提供自定义线程以实现流线程。

  • 如果要配置自定义任务池,则需要同步处理此消息。如果你在此消息返回时未在任务上配置任务池,则任务将使用其默认池。

  • 进入或离开线程时。这是你可以配置线程优先级的时刻。当线程被销毁时,你也会收到通知。

  • 当线程开始、暂停和停止时,你会收到消息。这可用于在 gui 应用程序中可视化流线程的状态。

4.如何提高线程的优先级

请添加图片描述
让我们看一下上面的pipeline。我们希望提高流线程的优先级。appsrc element将启动流线程来将appsrc中src pad上的数据推送给对端queue element的sink pad。改变优先级的流程是这样的:

  • 当从 READY 到 PAUSED 状态时, appsrc 将需要一个流线程来将数据推送到 queue 中。它将发布一条STREAM_STATUS 消息,指示其对流线程的需求。

  • 应用程序将使用同步总线处理程序对 STREAM_STATUS 消息做出反应。然后它将在消息内的 GstTask 上配置自定义GstTaskPool。自定义任务池负责创建线程。在这个例子中,我们将创建一个具有更高优先级的线程。

  • 或者,由于同步消息是在线程上下文中调用的,您可以使用线程 ENTER/LEAVE 通知来更改当前线程的优先级或调度策略。

第一步,我们需要实现一个可以在任务上配置的自定义 GstTaskPool。下面是一个 GstTaskPool 子类的实现,它使用 pthreads 创建一个 SCHED_RR 实时线程。请注意,创建实时线程可能需要额外的权限。

#include <gst/gst.h>
#include <gst/gsttaskpool.h>

#define TEST_TYPE_RT_POOL \
  (test_rt_pool_get_type())

GType test_rt_pool_get_type (void);
GstTaskPool *test_rt_pool_new (void);

typedef struct _GstTaskPool      TestRTPool;
typedef struct _GstTaskPoolClass TestRTPoolClass;

typedef struct
{
    pthread_t thread;
} TaskPoolRTId;

G_DEFINE_TYPE (TestRTPool, test_rt_pool, GST_TYPE_TASK_POOL);

static void
default_prepare (GstTaskPool * pool, GError ** error)
{
  /* we don't do anything here. We could construct a pool of threads here that
   * we could reuse later but we don't */
}

static void
default_cleanup (GstTaskPool * pool)
{
}

static gpointer
default_push (GstTaskPool * pool, GstTaskPoolFunction func, gpointer data,
    GError ** error)
{
  TestRTId *tid;
  gint res;
  pthread_attr_t attr;
  struct sched_param param;

  tid = g_slice_new0 (TestRTId);

  pthread_attr_init (&attr);
  if ((res = pthread_attr_setschedpolicy (&attr, SCHED_RR)) != 0)
    g_warning ("setschedpolicy: failure: %p", g_strerror (res));

  param.sched_priority = 50;
  if ((res = pthread_attr_setschedparam (&attr, &param)) != 0)
    g_warning ("setschedparam: failure: %p", g_strerror (res));

  if ((res = pthread_attr_setinheritsched (&attr, PTHREAD_EXPLICIT_SCHED)) != 0)
    g_warning ("setinheritsched: failure: %p", g_strerror (res));

  res = pthread_create (&tid->thread, &attr, (void *(*)(void *)) func, data);

  if (res != 0) {
    g_set_error (error, G_THREAD_ERROR, G_THREAD_ERROR_AGAIN,
        "Error creating thread: %s", g_strerror (res));
    g_slice_free (TestRTId, tid);
    tid = NULL;
  }

  return tid;
}

static void
default_join (GstTaskPool * pool, gpointer id)
{
  TestRTId *tid = (TestRTId *) id;

  pthread_join (tid->thread, NULL);

  g_slice_free (TestRTId, tid);
}

static void
test_rt_pool_class_init (TestRTPoolClass * klass)
{
  GstTaskPoolClass *gsttaskpool_class;

  gsttaskpool_class = (GstTaskPoolClass *) klass;

  gsttaskpool_class->prepare = default_prepare;
  gsttaskpool_class->cleanup = default_cleanup;
  gsttaskpool_class->push = default_push;
  gsttaskpool_class->join = default_join;
}

static void
test_rt_pool_init (TestRTPool * pool)
{
}

GstTaskPool *
test_rt_pool_new (void)
{
  GstTaskPool *pool;

  pool = g_object_new (TEST_TYPE_RT_POOL, NULL);

  return pool;
}

编写任务池时要实现的重要功能是“push”功能。 实现上应该启动一个调用给定函数的线程。 更复杂的实现可能希望在池中保留一些线程,因为创建和销毁线程并不总是最快的操作。

在下一步中,我们需要在 appsrc 需要时实际配置自定义任务池。 为此,我们使用同步处理程序拦截 STREAM_STATUS 消息。

static GMainLoop* loop;

static void
on_stream_status (GstBus     *bus,
                  GstMessage *message,
                  gpointer    user_data)
{
  GstStreamStatusType type;
  GstElement *owner;
  const GValue *val;
  GstTask *task = NULL;

  gst_message_parse_stream_status (message, &type, &owner);

  val = gst_message_get_stream_status_object (message);

  /* see if we know how to deal with this object */
  if (G_VALUE_TYPE (val) == GST_TYPE_TASK) {
    task = g_value_get_object (val);
  }

  switch (type) {
    case GST_STREAM_STATUS_TYPE_CREATE:
      if (task) {
        GstTaskPool *pool;

        pool = test_rt_pool_new();

        gst_task_set_pool (task, pool);
      }
      break;
    default:
      break;
  }
}

static void
on_error (GstBus     *bus,
          GstMessage *message,
          gpointer    user_data)
{
  g_message ("received ERROR");
  g_main_loop_quit (loop);
}

static void
on_eos (GstBus     *bus,
        GstMessage *message,
        gpointer    user_data)
{
  g_main_loop_quit (loop);
}


int
main (int argc, char *argv[])
{
  GstElement *bin, *AppSrc;
  GstBus *bus;
  GstStateChangeReturn ret;

  gst_init (&argc, &argv);

  /* create a new bin to hold the elements */
  bin = gst_pipeline_new ("pipeline");
  g_assert (bin);
    
  //pseudo-code

  /* create some elements */
  AppSrc = gst_element_factory_make ("appsrc", "appsrc");
  .............


  /* add objects to the main pipeline */
  gst_bin_add_many (GST_BIN (bin),AppSrc ,..., ..., NULL);

  /* link the elements */
  gst_element_link_many(AppSrc,..., ...,null);

  loop = g_main_loop_new (NULL, FALSE);

  /* get the bus, we need to install a sync handler */
  bus = gst_pipeline_get_bus (GST_PIPELINE (bin));

  gst_bus_enable_sync_message_emission (bus);

  gst_bus_add_signal_watch (bus);

  g_signal_connect (bus, "sync-message::stream-status",
      (GCallback) on_stream_status, NULL);
  g_signal_connect (bus, "message::error",
      (GCallback) on_error, NULL);
  g_signal_connect (bus, "message::eos",
      (GCallback) on_eos, NULL);

  /* start playing */
  ret = gst_element_set_state (bin, GST_STATE_PLAYING);
  if (ret != GST_STATE_CHANGE_SUCCESS) {
    g_message ("failed to change state");
    return -1;
  }

  /* Run event loop listening for bus messages until EOS or ERROR */
  g_main_loop_run (loop);

  /* stop the bin */
  gst_element_set_state (bin, GST_STATE_NULL);
  gst_object_unref (bus);
  g_main_loop_unref (loop);

  return 0;
}

请注意,该程序可能需要 root 权限才能创建实时线程。 当无法创建线程时,状态更改函数将失败,我们在上面的应用程序中捕获了这一点。

当pipeline中有多个线程时,您将收到多个 STREAM_STATUS 消息。 您应该使用消息的所有者(可能是 pad 或启动线程的element)来确定该线程在应用程序上下文中的功能是什么。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1671466.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

测试项目实战--安享理财2(Jmeter接口测试)

说明&#xff1a; 1.访问地址&#xff1a; 本项目实战使用的是传智播客的安享理财项目&#xff08;找了半天这个项目能免费用且能够满足测试实战需求&#xff09; 前台&#xff1a;http://121.43.169.97:8081/ 后台&#xff1a;http://121.43.169.97:8082/ &#xff08;点赞收藏…

机器人开发项目实现过程

比赛项目实现过程 第一步&#xff1a;设置远程桌面连接 登录机器人系统&#xff0c;设置网络&#xff0c;参考远程桌面连接20230525.mp4 外接显示器、鼠标和键盘 登录系统 账户&#xff1a;robuster 密码&#xff1a;123456 建议&#xff0c;手机开热点&#xff0c;机器人…

开关电源功率测试方法:输入、输出功率测试步骤

在现代电子设备中&#xff0c;开关电源扮演着至关重要的角色&#xff0c;其效率和稳定性直接影响到整个系统的性能。因此&#xff0c;对开关电源进行功率测试成为了电源管理的重要环节。本文将详细介绍如何使用DC-DC电源模块测试系统对开关电源的输入输出功率进行准确测量&…

RabbitMQ(四种使用模式)

文章目录 1.Fanout&#xff08;广播模式&#xff09;1.基本介绍2.需求分析3.具体实现1.编写配置类 RabbitMQConfig.java2.编写生产者&#xff0c;发送消息到交换机 MQSender.java3.编写消费者&#xff0c;接受消息 MQReceiver.java4.控制层调用方法&#xff0c;发送信息到交换机…

大厂Java面试题:MyBatis中是如何实现动态SQL的?有哪些动态SQL元素(标签)?描述下动态SQL的实现原理。

大家好&#xff0c;我是王有志。 今天给大家带来的是一道来自京东的 MyBatis 面试题&#xff1a;MyBatis 中是如何实现动态 SQL 的&#xff1f;有哪些动态 SQL 元素&#xff08;标签&#xff09;&#xff1f;描述下动态 SQL 的实现原理。 MyBatis 中提供了 7 个动态 SQL 语句…

7B2 PRO主题5.4.2免授权直接安装

B2 PRO 5.4.2 最新免授权版不再需要改hosts&#xff0c;直接在wordpress上传安装即可

全域运营是割韭菜吗?看完再下结论!

随着流量时代的到来&#xff0c;各大公私域平台中的流量争夺战日益激烈&#xff0c;商家和品牌实现流量变现的难度值也不断提高&#xff0c;运营人员的压力也逐渐增大。在此背景下&#xff0c;全域运营的兴起或许是一个契机&#xff0c;能够将所有人从内卷的状态中解救出来。而…

基于springboot+vue+Mysql的医疗服务系统

开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;…

流量卡避坑指南

流量卡避坑指南 在选择流量卡时&#xff0c;有几点需要注意以避免踩坑&#xff1a; 合同期和优惠期。 务必看清楚流量卡的合同期和优惠期。 有些卡可能首月免费&#xff0c;但月底办理可能不划算。 真正的长期套餐应该是优惠期20年以上的。 宣传与实际。 对于所谓的“永久9元…

UV胶的应用场景有哪些?

UV胶是一种特殊的胶水&#xff0c;其固化过程需要紫外光照射。它具有快速固化、高强度、无溶剂挥发等优点&#xff0c;因此在许多应用场景中被广泛使用。UV胶的应用场景非常广泛&#xff0c;包括但不限于以下几个方面&#xff1a; 1.电子产品组装: UV胶在电子产品的组装中扮演…

【iOS逆向与安全】网上gw如何自动登录与签到SM2,SM3,SM4算法加解密

1.下载 app 2.frida 调试 3.抓包查看接口 4.分析加密数据 5.易语言编写代码 1 .开始下载 下载好发现有越狱检测&#xff0c;检测点为&#xff1a; -[AppDelegate isJailBreak]; 于是编写插件xm代码 : %hook AppDelegate- (void)isJailBreak{NSLog("AppDelegate is…

M 有效算法

M 有效算法 本题考验二分知识&#xff0c;思路是二分k的取值&#xff0c;就按第一组样例来说当我们k取值为1的时候我们遍历数组想让|8-x|<k1的话x的取值范围是7-9&#xff0c;想让|3-x|<k2的话x的取值范围是1-5&#xff0c;两者x的区间不重合&#xff0c;说明肯定没有x能…

【c++】二叉搜索树(BST)

&#x1f525;个人主页&#xff1a;Quitecoder &#x1f525;专栏&#xff1a;c笔记仓 朋友们大家好&#xff0c;本篇文章来到二叉搜索树的内容 目录 1.二叉搜索树的介绍2.二叉搜索树的操作与实现insert插入Find查找InOrder中序遍历Erase删除 3.二叉搜索树的应用&#xff08;K…

链动2+1结合消费增值:破解用户留存与复购的密码

大家好&#xff0c;我是吴军&#xff0c;来自一家领先的软件开发公司&#xff0c;担任产品经理的职务。今天&#xff0c;我希望能与大家深入交流链动21模式&#xff0c;特别是它在提升用户留存和复购率方面的独特价值。 虽然链动模式在某些人眼中可能被视为传统或已被超越&…

HCIP的学习(16)

BGP的状态机 ​ OSPF的状态机是在描述整个协议的完整工作过程&#xff0c;而BGP的状态机仅描述的是对等体关系建立过程中的状态变化。-----因为BGP将邻居建立过程以及BGP路由收发过程完全隔离。 ​ IGP协议在启动后&#xff0c;需要通过network命令激活接口&#xff0c;从而使…

运筹系列92:vrp算法包VROOM

1. 介绍 VROOM is an open-source optimization engine written in C20 that aim at providing good solutions to various real-life vehicle routing problems (VRP) within a small computing time. 可以解决如下问题&#xff1a; TSP (travelling salesman problem) CVRP …

三极管 导通条件

一、三极管理解 三极管是电子行业常用的元器件之一&#xff0c;他是一种电流型控制的器件&#xff0c;他有三种工作状态&#xff1a;截止区&#xff0c;放大区、饱和区。当三极管当做开关使用时&#xff0c;他工作在饱和区。下面简短讲解三极管作为开关使用的方法&#xff0c;只…

2025考研 | 北京师范大学计算机考研考情分析

北京师范大学&#xff08;Beijing Normal University&#xff09;简称“北师大”&#xff0c;由中华人民共和国教育部直属&#xff0c;中央直管副部级建制&#xff0c;位列“211工程”、“985工程”&#xff0c;入选国家“双一流”、“珠峰计划”、“2011计划”、“111计划”、…

C--贪吃蛇

目录 前言 简单的准备工作 蛇的节点 开始前 void GameStart(pSnake ps) void WelcomeToGame() void CreateMap() void InitSnake(pSnake ps) void CreateFood(pSnake ps) 游戏进行 void GameRun(pSnake ps) int NextIsFood(pSnakeNode psn, pSnake ps) void NoFood(pSnak…

whisper之初步使用记录

文章目录 前言 一、whisper是什么&#xff1f; 二、使用步骤 1.安装 2.python调用 3.识别效果评估 4.一点封装 5.参考链接 总结 前言 随着AI大模型的不断发展&#xff0c;语音识别等周边内容也再次引发关注&#xff0c;通过语音转文字再与大模型交互&#xff0c;从而…