zlMediaKit 9 ThreadPool模块

news2024/11/28 0:40:15

semaphore.h

TaskQueue.h

threadgroup.h

ThreadPool.h

ThreadPool

semaphore

基于条件变量和锁实现的信号量postwait语义

#include <mutex>
#include <condition_variable>

namespace toolkit {

class semaphore {
public:
    explicit semaphore(size_t initial = 0) {
#if defined(HAVE_SEM)
        sem_init(&_sem, 0, initial);
#else
        _count = 0;
#endif
    }

    ~semaphore() {
#if defined(HAVE_SEM)
        sem_destroy(&_sem);
#endif
    }

    void post(size_t n = 1) {
#if defined(HAVE_SEM)
        while (n--) {
            sem_post(&_sem);
        }
#else
        std::unique_lock<std::recursive_mutex> lock(_mutex);
        _count += n;
        if (n == 1) {
            _condition.notify_one();
        } else {
            _condition.notify_all();
        }
#endif
    }

    void wait() {
#if defined(HAVE_SEM)
        sem_wait(&_sem);
#else
        std::unique_lock<std::recursive_mutex> lock(_mutex);
        while (_count == 0) {
            _condition.wait(lock);
        }
        --_count;
#endif
    }

private:
#if defined(HAVE_SEM)
    sem_t _sem;
#else
    size_t _count;
    std::recursive_mutex _mutex;
    std::condition_variable_any _condition;
#endif
};

} /* namespace toolkit */
#endif /* SEMAPHORE_H_ */

!TaskQueue

实现了一个基于函数对象的任务列队,该列队是**线程安全(互斥量)的,任务列队任务数由信号量(生产者消费者模型)**控制

#ifndef TASKQUEUE_H_
#define TASKQUEUE_H_

#include <mutex>
#include "Util/List.h"
#include "semaphore.h"

namespace toolkit {

//实现了一个基于函数对象的任务列队,该列队是线程安全的,任务列队任务数由信号量控制
template<typename T>
class TaskQueue {
public:
    //打入任务至列队
    template<typename C>
    void push_task(C &&task_func) {
        {
            std::lock_guard<decltype(_mutex)> lock(_mutex);
            _queue.emplace_back(std::forward<C>(task_func));
        }
        _sem.post();
    }

    template<typename C>
    void push_task_first(C &&task_func) {
        {
            std::lock_guard<decltype(_mutex)> lock(_mutex);
            _queue.emplace_front(std::forward<C>(task_func));
        }
        _sem.post();
    }

    //清空任务列队
    void push_exit(size_t n) {
        _sem.post(n);
    }

    //从列队获取一个任务,由执行线程执行
    bool get_task(T &tsk) {
        _sem.wait();
        std::lock_guard<decltype(_mutex)> lock(_mutex);
        if (_queue.empty()) {
            return false;
        }
        tsk = std::move(_queue.front());
        _queue.pop_front();
        return true;
    }

    size_t size() const {
        std::lock_guard<decltype(_mutex)> lock(_mutex);
        return _queue.size();
    }

private:
    List <T> _queue;
    mutable std::mutex _mutex;
    semaphore _sem;
};

} /* namespace toolkit */
#endif /* TASKQUEUE_H_ */

thread_group

unordered_map<thread::id, shared_ptr<thread>> 指向线程的智能指针

#ifndef THREADGROUP_H_
#define THREADGROUP_H_

#include <thread>
#include <unordered_map>

namespace toolkit {

class thread_group {
private:
    thread_group(thread_group const &);
    thread_group &operator=(thread_group const &);

public:
    thread_group() {}

    ~thread_group() {
        _threads.clear();
    }

    bool is_this_thread_in() { //当前调用线程在线程组中嘛
        auto thread_id = std::this_thread::get_id();
        if (_thread_id == thread_id) {
            return true;
        }
        return _threads.find(thread_id) != _threads.end();
    }

    bool is_thread_in(std::thread *thrd) {//该线程在线程组中嘛
        if (!thrd) {
            return false;
        }
        auto it = _threads.find(thrd->get_id());
        return it != _threads.end();
    }

    template<typename F>
    std::thread *create_thread(F &&threadfunc) {
        auto thread_new = std::make_shared<std::thread>(threadfunc);
        _thread_id = thread_new->get_id();
        _threads[_thread_id] = thread_new;
        return thread_new.get();
    }

    void remove_thread(std::thread *thrd) {
        auto it = _threads.find(thrd->get_id());
        if (it != _threads.end()) {
            _threads.erase(it);
        }
    }

    void join_all() {
        if (is_this_thread_in()) {
            throw std::runtime_error("thread_group: trying joining itself");
        }
        for (auto &it : _threads) {
            if (it.second->joinable()) {
                it.second->join(); //等待线程主动退出
            }
        }
        _threads.clear();
    }

    size_t size() {
        return _threads.size();
    }

private:
    std::thread::id _thread_id;
    std::unordered_map<std::thread::id, std::shared_ptr<std::thread>> _threads;
};

}

UML

在这里插入图片描述

结构

	size_t _thread_num;
    TaskQueue<Task::Ptr> _queue;
    thread_group _thread_group;
    Priority _priority;
    Logger::Ptr _logger;

async (post)

生产_queue中的任务(post)

//把任务打入线程池并异步执行
Task::Ptr async(TaskIn task, bool may_sync = true) override {
    if (may_sync && _thread_group.is_this_thread_in()) {
        task();
        return nullptr;
    }
    auto ret = std::make_shared<Task>(std::move(task));
    _queue.push_task(ret); //post
    return ret;
}

!run (wait)

start开启N个线程,消费_queue中的任务

void start() {
    if (_thread_num <= 0) {
        return;
    }
    size_t total = _thread_num - _thread_group.size();
    for (size_t i = 0; i < total; ++i) {
        _thread_group.create_thread(std::bind(&ThreadPool::run, this));
    }
}

run消费_queue中的任务,没有就等 get_task (wait)

void run() {
    ThreadPool::setPriority(_priority);
    Task::Ptr task;
    while (true) {
        startSleep();
        if (!_queue.get_task(task)) {
            //空任务,退出线程
            break;
        }
        sleepWakeUp();
        try {
            (*task)();
            task = nullptr;
        } catch (std::exception &ex) {
            ErrorL << "ThreadPool执行任务捕获到异常:" << ex.what();
        }
    }
}

shutdown

结束的操作很有趣,发出线程数量的空任务,run在get_task中取到了空任务,跳出while循环可以被join掉了

void shutdown()
{
    _queue.push_exit(_thread_num);
}

线程池中线程的调度优先级

浅谈pthread_setschedparam的使用

struct sched_param params;
params.sched_priority = Priorities[priority];
return pthread_setschedparam(threadId, SCHED_OTHER, &params) == 0;

总结

  • 线程池的async执行方式,作为生产者添加任务到队列中

  • 生产者消费者的实现,互斥量保证线程安全(不可同时访问),生产者(添加任务的线程)发信号量通知消费者(工作线程)进行同步(访问的先后顺序)

  • 如何设置线程调度优先级pthread_setschedparam

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/3835.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux命令从入门到实战 ----查找文件和目录压缩和解压缩

文章目录搜索查找find查找文件和目录locate快速定位文件路径grep 过滤查找| 管道符which命令用于查找文件。whereis压缩和解压缩gzip/gunzip 压缩zip/unzip压缩tar打包总结搜索查找 find查找文件和目录 find指令将从指令指定目录下向下遍历其各个子目录&#xff0c;将满足条件…

算法60天:day46

算法60天&#xff1a;day46动态规划-单词拆分动态规划-多重背包问题动态规划-背包总结动态规划-单词拆分 力扣链接 class Solution { public:bool wordBreak(string s, vector<string>& wordDict) {unordered_set<string> wordSet(wordDict.begin(),wordDict…

RabbitMQ消息发送和接收(实例)

消息发送&#xff1a; 1.首先启动rabbitmq 2.查看防火墙状态&#xff0c;如果是开启状态则需要关闭防火墙 3.通过浏览器访问rabbitmq控制台&#xff0c;ip15672端口号 &#xff0c;例如http://192.168.174.129:15672 登录时输入自己的此前设置的登录名和密码 4.打开idea,创建r…

农产品果蔬商城交易系统(Java+Web+MySQL)

目录 摘要 I Abstract II 前言 1 1 课题简介 2 1.1 选题背景 2 1.2 课题的意义 2 1.3 系统目标 3 2. 可行性研究 5 2.1 技术可行性 5 2.2 经济可行性 5 2.3 操作可行性 5 2.4 法律可行性 6 3. 需求分析 7 3.1 系统需要解决的主要问题 7 3.2 系统具备的基本功能 7 3.3 数据流图…

MySQL开篇:简单的库操作,表操作,数据类型

✨博客主页: 心荣~ ✨系列专栏:【MySQL】 ✨一句短话: 难在坚持,贵在坚持,成在坚持! 文章目录一. 什么是MySQL二. 基础库操作1. 创建数据库2. 查看所有数据库3. 选中数据库4. 删除数据库三. 设置数据库的编码字符集四. MySQL数据类型1. 数值类型2. 字符串类型3. 日期数据类型五…

_Linux 动态库

文章目录0. 前言1. 生成动态库1.1 我们把静态库和动态库打包1.2 当动静库同时存在的时候默认生成的是动态库1.3 -static2. 动态库的使用2.1 运行动态库的方法3. 库文件名称和引入库的名称0. 前言 链接&#xff1a;静态库文章 上一章我们讲解了静态库的生成和两种使用&#xff…

Netty架构设计

目录 Selector模型 SelectableChannel Channel注册到Selector SelectionKey 遍历SelectionKey 事件驱动 责任链模式 Selector模型 Java NIO是基于Selector模型来实现非阻塞IO&#xff0c;Netty底层基于Java NIO实现的&#xff0c;因此也使用了Selector模型。 Selector提…

Go语言五大主流web框架

以下 star数截止2022年11月份 1.Gin&#xff08;64.1K&#xff09; 项目简介&#xff1a;Gin 是一个用 Go (Golang) 编写的 HTTP Web 框架。 它具有类似 Martini 的 API&#xff0c;但性能比 Martini 快 40 倍。 仓库地址&#xff1a;https://github.com/gin-gonic/ginhttps…

TensorRt安装和命令行测试

1、选择TensorRt版本 安装tensorrt前&#xff0c;需要先了解自己的显卡算力、架构等&#xff0c;点击 算力列表链接 对号入座。 这里仅展示RTX和Titan系列&#xff0c;其他系列可在当前网页选择。 1.1、cuda版本 首先需要安装cuda&#xff0c;其版本并不是最新就好&#xf…

基于SSM跨境电商网站的设计与实现/海外购物平台的设计

通过对跨境电商网站的编写&#xff0c;使得自己对于javaweb技术和数据库理论有了更深的认识。课题设计javaweb&#xff0c;能够学习网页编程知识。此课题设计的知识有HTML&#xff0c;CSS和MVC模式等。还跟javaScript的知识有关。在不断的学习过程中提高自己的编程能力。本跨境…

TCP/IP网络参考模型

目录 TCP/IP四/五层模型 应用层常见协议——传输数据PDU 传输层协议——传输数据段 端口号 TCP面向连接服务 UDP无面向连接服务 网络层协议——传输数据包 IP协议 数据链路层——传输数据帧 Ethernet帧格式 IEEE802.3帧格式 TCP/IP四/五层模型 标准定义的TCP/IP模型…

使用idea自动开发springMVC程序及表单标签

1.新建项目 选择Spring—>SpringMVC——>Download 点击next&#xff0c;起好项目名称project name&#xff0c;我这里项目名是MVCTag&#xff0c;选择好项目的路径project location&#xff0c;然后点击确定就会自动加载SpringMVC所需要的全部jar包 项目新建完成&…

【Spring5】基于注解的Bean管理简直是Spring中的Spring

文章目录1 什么是注解2 基于注解方式实现对象的创建3 组件扫描配置的细节4 基于注解实现属性的注入4.1 几种注解概述4.2 Autowire注解演示4.3 Qualifier注解演示4.4 Value注解演示5 纯注解的开发模式写在最后1 什么是注解 注解是代码中特殊的标记&#xff0c;格式如下&#xf…

Latex论文排版

O、部分参考&#xff1a; https://blog.csdn.net/qq_41982200/article/details/123051883?spm1001.2014.3001.5506 https://blog.csdn.net/qq_27353621/article/details/127170340 一、基础知识 1、空一行 → 分一段 空很多行也只是分一段 2、加粗、斜体 3、新章节 4…

JAVA学习笔记(二)

JAVA学习笔记 包1.1基本使用 2.2包的命名 2.3常用的包 2.4使用细节 访问修饰符面向对象 3.1面向对象三大特征 封装、继承、多态 3.2封装介绍 3.3封装的理解和好处 3.4封装的实现步骤 3.5构造器和setXXX结合 3.6继承&#xff08;细节&#xff09; 3.7super基本用法 3.8super给…

Kotlin基础学习笔记之第六章——kotlin的类型系统

一、本章简介 与java相比&#xff0c;kotlin中引入了一些新特性&#xff0c;他们是提升代码可读性的基本要素&#xff0c;比如&#xff1a;对可空的类型和只读集合的支持。与此同时&#xff0c;kotlin去掉了一些java类型系统中不必要的或者有问题的特性&#xff0c;比如把数组作…

docker安装mysql同步数据到linux与docker容器卷

可以去dockerhub搜索mysql寻找命令 docker run -p 3310:3306 --name mysql57 -v /home/mysql/conf:/etc/mysql/conf.d -v /home/mysql/datadir:/var/lib/mysql -e MYSQL_ROOT_PASSWORD123456 -d mysql:5.7 -e配置启动容器mysql 需要配置密码 -v 是绑定容器卷到linux 上 …

阿里二面,前端开发在web3.0中该如何应用,答完面试官对我笑了笑

近期听说周星驰也开始招募web3.0的人才了&#xff0c;可见其火爆程度真是不一般啊&#xff0c;不得不说的是&#xff0c;这又是一场新的革命&#xff0c;必将带来腥风血雨。 对于前端开发来说&#xff0c;很多人可能刚刚准备学习&#xff0c;刚刚入门&#xff0c;刚刚在企业中找…

目标检测(4)—— 经典算法和常用指标

一、深度学习的经典算法 two-stage&#xff08;两阶段&#xff09;&#xff1a;RCNNone-stage&#xff08;一阶段&#xff09;&#xff1a;YOLO&#xff0c;SSD&#xff08;这个好像很牛&#xff09; one-stage&#xff1a; 将图片输入到CNN里&#xff0c;经过特征提取&#…

rabbitMQ:绑定Exchange发送和接收消息(topic)

topic交换机和fanout交换机类似&#xff0c;也是广播机制&#xff0c;但是topic需要绑定RoutingKey&#xff0c;绑定RoutingKey时可以使用通配符&#xff08;*,#&#xff09;代替。 *&#xff1a;只能一个单词 #&#xff1a;0个或多个单词 编写topic消息发送类 1.编写Recei…