C++ 11 实现简单线程池

news2025/1/24 8:45:28

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

C++ 11标准支持线程,但是未支持线程池,本文章使用C++11实现最简单线程池,无优先级

C++ 11 实现简单线程池

使用到c++11的新特性有

std::unique_lock
std::mutex
std::atomic_bool
std::thread
std::future
std::condition_variable
std::function
std::forward
std::packaged_task
std::make_shared

auto、decltype 自动类型推导

线程安全队列

基于std::queue<T> 实现,保证访问内部对象的线程安全性

/**
 * @brief  线程安全队列
 * @author GGX
 * @date   2023-08-13
 */
template <typename T>
class CSateQueue
{
public:
    CSateQueue() = default;
    ~CSateQueue() {}

    /**
     * @brief  队列是否为空
     * @param  无
     * @return true 空,false 非空
     * @author GGX
     * @date   2023-08-19
     */
    bool empty() const
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        return m_queue.empty();
    }

    /**
     * @brief  队列是否为空
     * @param  无
     * @return size_t 队列大小
     * @author GGX
     * @date   2023-08-19
     */
    size_t size() const
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        return m_queue.size();
    }

    /**
     * @brief  压栈
     * @param  对象
     * @return 无
     * @author GGX
     * @date   2023-08-19
     */
    void push(T &t)
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_queue.emplace(t);
    }

    /**
     * @brief  出栈
     * @param  对象
     * @return true 成功,false 失败
     * @author GGX
     * @date   2023-08-19
     */
    bool front(T &t)
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        if (m_queue.empty())
        {
            return false;
        }

        t = std::move(m_queue.front());
        m_queue.pop();
        return true;
    }
private:
    // 利用模板函数构造队列
    std::queue<T> m_queue;
    // 互斥量,可修改
    mutable std::mutex m_mutex;
};

线程池

常驻线程,有任务时,才启动线程

/**
 * @brief  线程池
 * @author GGX
 * @date   2023-08-13
 */
class ThreadPool
{
public:

    /**
     * @brief  构造函数,默认六个线程
     * @param  [in] size 线程数目
     * @return
     * @author GGX
     * @date   2023-08-19
     */
    explicit ThreadPool(size_t size = 6): m_size(size)
    {
        m_active = false;
    }

    ~ThreadPool()
    {
        stop();
    }

    /**
     * @brief  启动线程池
     * @param  无
     * @return 无
     * @author GGX
     * @date   2023-08-19
     */
    void start()
    {
        //标记线程启动
        m_active = true;
        m_threads.reserve(m_size);
        for (size_t i = 0; i < m_size; i++)
        {
            //创建线程池
            m_threads.emplace_back(ThreadPool::work, this);
        }
        std::cout << "ThreadPool start" << std::endl;
    }

    /**
     * @brief  停止线程池
     * @param  无
     * @return 无
     * @author GGX
     * @date   2023-08-19
     */
    void stop()
    {
        //标记线程停止
        m_active = false;
        //唤醒所有线程
        m_cv.notify_all();
        for (thread &th : m_threads)
        {
            //优雅的退出线程
            if (th.joinable())
            {
                th.join();
            }
        }
        std::cout << "ThreadPool stop" << std::endl;
    }

    void work()
    {
        std::function<void()> func; // 定义基础函数类func
        bool dequeued = false; // 是否正在取出队列中元素
        while (m_active)
        {
            dequeued = false;
            {
                std::unique_lock<std::mutex> lock(m_mutex);
                //判断任务队列是否为空,线程是否已停止
                auto funcR = [](CSateQueue<std::function<void()>> &tasks,
                                std::atomic_bool & bActive)->bool
                {
                    return !tasks.empty() || !bActive;
                };
                //等待线程被唤醒
                m_cv.wait(lock, std::bind(funcR, std::ref(m_tasks), std::ref(m_active)));

                if (!m_active)
                {
                    break;
                }

                dequeued = m_tasks.front(func);
            }
            //取到任务,则运行
            if (dequeued)
            {
                func();
            }
        }
    }

    /**
     * @brief  提交任务,让线程执行
     * @param  [in] f 任务
    * @param  [in] ...args 可变参
     * @return auto 返回值类型,自动推导
     * @author GGX
     * @date   2023-08-19
     */
    template <typename F, typename ...Args>
    auto submit(F && f, Args && ...args)->std::future<decltype(f(args...))>
    {
        //返回值类型
        using returnType = decltype(f(args...))();
        //函数包
        std::function<returnType> func =
            std::bind(std::forward<F>(f), std::forward<Args>(args)...);
        //包装任何可调用函数对象
        auto fu_ptr = std::make_shared<std::packaged_task<returnType>> (func);

        std::function<void()> task = [fu_ptr]
        {
            (*fu_ptr)();
        };
        //函数放入队列
        m_tasks.push(task);
        //唤醒一个线程
        m_cv.notify_one();
        //返回结果
        return fu_ptr->get_future();
    }

private:
    size_t m_size;	//线程大小
    std::atomic_bool m_active;	//线程是否运行,原子变量
    vector<thread> m_threads;	//线程容器
    std::condition_variable m_cv;//条件变量,唤醒线程
    std::mutex m_mutex;	//互斥量,保证线程安全
    CSateQueue<std::function<void()>> m_tasks; //任务队列
};

调用示例

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <vector>
#include <queue>
#include <functional>
#include <future>
#include <random>

using namespace std;

/**
 * @brief  线程安全队列
 * @author GGX
 * @date   2023-08-13
 */
template <typename T>
class CSateQueue
{
public:
    CSateQueue() = default;
    ~CSateQueue() {}

    /**
     * @brief  队列是否为空
     * @param  无
     * @return true 空,false 非空
     * @author GGX
     * @date   2023-08-19
     */
    bool empty() const
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        return m_queue.empty();
    }

    /**
     * @brief  队列是否为空
     * @param  无
     * @return size_t 队列大小
     * @author GGX
     * @date   2023-08-19
     */
    size_t size() const
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        return m_queue.size();
    }

    /**
     * @brief  压栈
     * @param  对象
     * @return 无
     * @author GGX
     * @date   2023-08-19
     */
    void push(T &t)
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_queue.emplace(t);
    }

    /**
     * @brief  出栈
     * @param  对象
     * @return true 成功,false 失败
     * @author GGX
     * @date   2023-08-19
     */
    bool front(T &t)
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        if (m_queue.empty())
        {
            return false;
        }

        t = std::move(m_queue.front());
        m_queue.pop();
        return true;
    }
private:
    // 利用模板函数构造队列
    std::queue<T> m_queue;
    // 互斥量,可修改
    mutable std::mutex m_mutex;
};


/**
 * @brief  线程池
 * @author GGX
 * @date   2023-08-13
 */
class ThreadPool
{
public:

    /**
     * @brief  构造函数,默认六个线程
     * @param  [in] size 线程数目
     * @return
     * @author GGX
     * @date   2023-08-19
     */
    explicit ThreadPool(size_t size = 6): m_size(size)
    {
        m_active = false;
    }

    ~ThreadPool()
    {
        stop();
    }

    /**
     * @brief  启动线程池
     * @param  无
     * @return 无
     * @author GGX
     * @date   2023-08-19
     */
    void start()
    {
        //标记线程启动
        m_active = true;
        m_threads.reserve(m_size);
        for (size_t i = 0; i < m_size; i++)
        {
            //创建线程池
            m_threads.emplace_back(ThreadPool::work, this);
        }
        std::cout << "ThreadPool start" << std::endl;
    }

    /**
     * @brief  停止线程池
     * @param  无
     * @return 无
     * @author GGX
     * @date   2023-08-19
     */
    void stop()
    {
        //标记线程停止
        m_active = false;
        //唤醒所有线程
        m_cv.notify_all();
        for (thread &th : m_threads)
        {
            //优雅的退出线程
            if (th.joinable())
            {
                th.join();
            }
        }
        std::cout << "ThreadPool stop" << std::endl;
    }

    void work()
    {
        std::function<void()> func; // 定义基础函数类func
        bool dequeued = false; // 是否正在取出队列中元素
        while (m_active)
        {
            dequeued = false;
            {
                std::unique_lock<std::mutex> lock(m_mutex);
                //判断任务队列是否为空,线程是否已停止
                auto funcR = [](CSateQueue<std::function<void()>> &tasks,
                                std::atomic_bool & bActive)->bool
                {
                    return !tasks.empty() || !bActive;
                };
                //等待线程被唤醒
                m_cv.wait(lock, std::bind(funcR, std::ref(m_tasks), std::ref(m_active)));

                if (!m_active)
                {
                    break;
                }

                dequeued = m_tasks.front(func);
            }
            //取到任务,则运行
            if (dequeued)
            {
                func();
            }
        }
    }

    /**
     * @brief  提交任务,让线程执行
     * @param  [in] f 任务
    * @param  [in] ...args 可变参
     * @return auto 返回值类型,自动推导
     * @author GGX
     * @date   2023-08-19
     */
    template <typename F, typename ...Args>
    auto submit(F && f, Args && ...args)->std::future<decltype(f(args...))>
    {
        //返回值类型
        using returnType = decltype(f(args...))();
        //函数包
        std::function<returnType> func =
            std::bind(std::forward<F>(f), std::forward<Args>(args)...);
        //包装任何可调用函数对象
        auto fu_ptr = std::make_shared<std::packaged_task<returnType>> (func);

        std::function<void()> task = [fu_ptr]
        {
            (*fu_ptr)();
        };
        //函数放入队列
        m_tasks.push(task);
        //唤醒一个线程
        m_cv.notify_one();
        //返回结果
        return fu_ptr->get_future();
    }

private:
    size_t m_size;	//线程大小
    std::atomic_bool m_active;	//线程是否运行,原子变量
    vector<thread> m_threads;	//线程容器
    std::condition_variable m_cv;//条件变量,唤醒线程
    std::mutex m_mutex;	//互斥量,保证线程安全
    CSateQueue<std::function<void()>> m_tasks; //任务队列
};

std::random_device rd; // 真实随机数产生器
std::mt19937 mt(rd()); //生成计算随机数mt
std::uniform_int_distribution<int> dist(-1000, 1000); //生成-1000到1000之间的离散均匀分布数
auto rnd = std::bind(dist, mt);

/**
 * @brief  测试线程池
 * @author GGX
 * @date   2023-08-13
 */
class CTest
{
public:

    void simulate_hard_computation()
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd()));
    }

    void multiply(const int a, const int b)
    {
        simulate_hard_computation();
        const int res = a * b;
        std::cout << "multiply thid: " << std::this_thread::get_id() << std::endl;
        std::cout << a << " * " << b << " = " << res << std::endl;
    }
    void multiply_output(int &out, const int a, const int b)
    {
        simulate_hard_computation();
        out = a * b;
        std::cout << "multiply_output thid: " << std::this_thread::get_id() << std::endl;
        std::cout << a << " * " << b << " = " << out << std::endl;
    }

    int multiply_return(const int a, const int b)
    {
        simulate_hard_computation();
        const int res = a * b;
        std::cout << "multiply_return thid: " << std::this_thread::get_id() << std::endl;
        std::cout << a << " * " << b << " = " << res << std::endl;
        return res;
    }
};

void example()
{
    CTest a;
    ThreadPool pool(6);
    pool.start();
    //对象方法,因此需要绑定对象
    std::function<void(int, int)> multiply =
        std::bind(&CTest::multiply, &a, std::placeholders::_1, std::placeholders::_2);

    for (int i = 1; i < 3; i++)
    {
        for (int j = 0; j < 3; j++)
        {
            //提交调用
            pool.submit(multiply, i, j);
        }
    }

    auto future = pool.submit(multiply, 3, 10);
    future.get();

    int output_ref;
    std::function<void(int&, int, int)> multiply_output =
        std::bind(&CTest::multiply_output, &a, std::placeholders::_1,
                  std::placeholders::_2, std::placeholders::_3);
    // 使用ref传递的输出参数提交函数
    auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6);
    future1.get();
    std::cout << "Last operation result is equals to " << output_ref << std::endl;

    // 使用return参数提交函数
    std::function<int(int, int)> multiply_return =
        std::bind(&CTest::multiply_return, &a, std::placeholders::_1, std::placeholders::_2);
    auto future2 = pool.submit(multiply_return, 5, 3);
    // 等待乘法输出完成
    int res = future2.get();
    std::cout << "Last operation result is equals to " << res << std::endl;
    pool.stop();
}

int main()
{
    std::cout << "std::thread::hardware_concurrency() "
              << std::thread::hardware_concurrency() << std::endl << std::endl;
    example();
    return 0;
}

输出

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/905868.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

新版QQ NT 桌面版如何实现内存优化

一、背景 QQ 作为国民级应用,从互联网兴起就一直陪伴着大家,是很多用户刚接触互联网就开始使用的应用。而 QQ 桌面版最近一次技术架构升级还是在移动互联网兴起之前,在多年迭代过程中,QQ 桌面版也积累了不少技术债务,随着业务的发展和技术的进步,当前的架构已经无法很好…

[git]gitpush提示remote: Permission to xxx.git denied to xxx

错误原因&#xff1a;git客户端你先前登录过其他用户导致&#xff0c;你用另一个账号push的时候用的先前用户 解决方法&#xff1a;删除先前用户用你想push用户重新登录 解决步骤&#xff1a; 打开控制面板-->查看方式选择大图标-->然后打开凭据管理器 找到github相关…

Mybatis的环境搭建

目录 一.Mybatis的环境搭建 1.创建项目 2.进行相关配置 3.安装插件 4.插件的使用 一.Mybatis的环境搭建 1.创建项目 1.1 创建Maven项目&#xff0c;配置好相应的JDK和archetype 1.2 给项目命名和创建目录结构 1.3 添加自定义Property自定义属性 2.进行相关配置 2.1 导入p…

Java代码审计12之反序列化漏洞的审计与利用

文章目录 1、重点函数&#xff0c;2、漏洞源码3、利用工具 ysoserial4、修复漏洞 1、重点函数&#xff0c; ObjectInputStream.readObjectObjectInputStream.readUnsharedXMLDecoder.readObjectYaml.loadXStream.fromXMLObjectMapper.readValueJSON.parseObject2、漏洞源码 接…

Golang GORM 单表删除

删除只有一个操作&#xff0c;delete。也是先找到再去删除。 可以删除单条记录&#xff0c;也可以删除多条记录。 var s Studentdb.Debug().Delete(&s, "age ?", 100)fmt.Println(s)[15.878ms] [rows:1] DELETE FROM student WHERE age 100var s Studentdb.De…

【AI生成】职场中每个人都应该具备识别、应对阴险领导的能力——微软晓晓朗读...

关于主题“职场中每个人都应该具备识别、应对阴险领导的能力”各种AI一共生成了8篇文章&#xff0c;如下&#xff1a; Claude(文章直白&#xff0c;简洁&#xff0c;全面) WebCopilot (创造力模式) 讯飞星火 谷歌Bard&#xff08;第一篇&#xff09; WebCopilot&#xff08;精确…

Javaweb基础学习(3)

Javaweb基础学习 web核心介绍一、HTTP1.1 HTTP介绍1.2、HTTP请求数据格式1.3、HTTP响应数据格式 二、Tomcat2.1 简介2.2 基本使用2.3 Tomcat配置2.4 Tomcat部署项目2.5 Web项目结构2.6 创建Maven Web项目 三、Servlet3.1、Servlet简介&快速入门3.2 创建Servlet步骤3.3 Serv…

怎么借助ChatGPT处理数据结构的问题

目录 使用ChatGPT进行数据格式化转换 代码示例 ChatGPT格式化数据提示语 代码示例 批量格式化数据提示语 代码示例 ChatGPT生成的格式化批处理代码 使用ChatGPT合并不同数据源的数据 合并数据提示语 自动合并数据提示语 ChatGPT生成的自动合并代码 结论 数据合并是…

1339. 分裂二叉树的最大乘积

链接&#xff1a; ​​​​​​1339. 分裂二叉树的最大乘积 题解&#xff1a; /*** Definition for a binary tree node.* struct TreeNode {* int val;* TreeNode *left;* TreeNode *right;* TreeNode() : val(0), left(nullptr), right(nullptr) {}* …

axi4 exclusive 原子操作

axi4 exclusive 原子操作 axi4

Java——左移、右移和其他特殊运算符

&#x1f61c;作 者&#xff1a;是江迪呀✒️本文关键词&#xff1a;微信小程序、页面跳转、移动端、前端☀️每日 一言&#xff1a;我宁要痛苦不要麻木&#xff01; 一、前言 当涉及位操作和位级运算时&#xff0c;Java 提供了一组特殊的运算符&#xff0c;即左移…

CSS自学框架之动画

这一节&#xff0c;自学CSS动画。主要学习了淡入淡出、淡入缩放、缩放、移动、旋转动画效果。先看一下成果。 优雅的过渡动画&#xff0c;为你的页面添加另一份趣味&#xff01; 在你的选择器里插入 animation 属性&#xff0c;并添加框架内置的 keyframes 即可实现&#xff0…

Typescript基础知识(类型拓宽、类型缩小)

系列文章目录 引入一&#xff1a;Typescript基础引入&#xff08;基础类型、元组、枚举&#xff09; 引入二&#xff1a;Typescript面向对象引入&#xff08;接口、类、多态、重写、抽象类、访问修饰符&#xff09; 第一章&#xff1a;Typescript基础知识&#xff08;Typescri…

Linux 线程同步——信号量、互斥锁、读写锁

一、线程同步的概念 这里的同步就是对程序的执行进行控制&#xff0c;因为如果不进行控制就会出现错误的问题&#xff0c;这里的控制是为了保证程序的正确性。 线程同步指的是当一个线程在对某个临界资源进行操作时&#xff0c;其他线程都不可以对这个资源进行操作&#xff0…

自然语言处理: 第九章DeepSpeed的实践

理论基础 仓库链接: microsoft/DeepSpeed: DeepSpeed is a deep learning optimization library that makes distributed training and inference easy, efficient, and effective. DeepSpees正如它官网介绍的一样&#xff0c;它为深度学习模型提供了一站式的快速以及大规模…

【SA8295P 源码分析】03 - SA8295P QNX Host上电开机流程分析

【SA8295P 源码分析】03 - SA8295P QNX Host上电开机流程分析 一、阶段1 固件开机自检 (SM BIST)&#xff1a;APPS PBL加载XBL后触发 INT_RESET进行Warm Reset二、阶段2 固件开机自检 (SM BIST)&#xff1a;加载TZ&#xff0c;初始Hypervisor&#xff0c;启动QNX Kernel&#x…

k8编写yaml文件小工具

在刚接触k8s的时候觉得yaml资源文件非常的难写&#xff0c;完全看不懂&#xff0c;经过一段时间的摸索学习&#xff0c;发现k8s平台中是提供了一系列的工具和技巧的&#xff0c;可以帮助我们很好的编写资源文件&#xff0c;提升编写yaml文件的能力&#xff0c;常用的命令工具是…

python爬虫9:实战2

python爬虫9&#xff1a;实战2 前言 ​ python实现网络爬虫非常简单&#xff0c;只需要掌握一定的基础知识和一定的库使用技巧即可。本系列目标旨在梳理相关知识点&#xff0c;方便以后复习。 申明 ​ 本系列所涉及的代码仅用于个人研究与讨论&#xff0c;并不会对网站产生不好…

时序预测 | MATLAB实现SO-CNN-LSTM蛇群算法优化卷积长短期记忆神经网络时间序列预测

时序预测 | MATLAB实现SO-CNN-LSTM蛇群算法优化卷积长短期记忆神经网络时间序列预测 目录 时序预测 | MATLAB实现SO-CNN-LSTM蛇群算法优化卷积长短期记忆神经网络时间序列预测预测效果基本介绍程序设计学习总结参考资料 预测效果 基本介绍 时序预测 | MATLAB实现SO-CNN-LSTM蛇群…