【C++】线程池实现

news2025/2/5 17:43:10

目录

  • 一、线程池简介
    • 线程池的核心组件
    • 实现步骤
  • 二、C++11实现线程池
    • 源码
  • 三、线程池源码解析
    • 1. 成员变量
    • 2. 构造函数
      • 2.1 线程初始化
      • 2.2 工作线程逻辑
    • 3. 任务提交(enqueue方法)
      • 3.1 方法签名
      • 3.2 任务封装
      • 3.3 任务入队
    • 4. 析构函数
      • 4.1 停机控制
    • 5. 关键技术点解析
      • 5.1 完美转发实现
      • 5.2 异常传播机制
      • 5.3 内存管理模型
  • 四、 性能特征分析
  • 五、 扩展优化方向
  • 六、 典型问题排查指南
  • 七、 测试用例
    • 如果这篇文章对你有所帮助,渴望获得你的一个点赞!

一、线程池简介

线程池是一种并发编程技术,通过预先创建一组线程并复用它们来执行多个任务,避免了频繁创建和销毁线程的开销。它特别适合处理大量短生命周期任务的场景(如服务器请求、并行计算)。

线程池的核心组件

1. 任务队列(Task Queue)
存储待执行的任务(通常是函数对象或可调用对象)。

2. 工作线程(Worker Threads)
一组预先创建的线程,不断从队列中取出任务并执行。

3. 同步机制
互斥锁(Mutex):保护任务队列的线程安全访问。
条件变量(Condition Variable):通知线程任务到达或线程池终止。

实现步骤

1. 初始化线程池
创建固定数量的线程,每个线程循环等待任务。

2. 提交任务
将任务包装成函数对象,加入任务队列。

3. 任务执行
工作线程从队列中取出任务并执行。

4. 终止线程池
发送停止信号,等待所有线程完成当前任务后退出。

二、C++11实现线程池

源码

#include <vector>
#include <queue>
#include <future>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <stdexcept>

class ThreadPool 
{
public:
    //构造函数:根据输入的线程数(默认硬件并发数)创建工作线程。
    //每个工作线程执行一个循环,不断从任务队列中取出并执行任务。
    //explicit关键字防止隐式类型转换
    explicit ThreadPool(size_t threads = std::thread::hardware_concurrency())
        : stop(false) 
    {
        if (threads == 0) 
        {
            threads = 1;
        }

        for (size_t i = 0; i < threads; ++i) 
        {
            workers.emplace_back([this] 
            {
                for (;;) 
                {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        
                        //等待条件:线程通过条件变量等待任务到来或停止信号。(CPU使用率:休眠时接近0%,仅在任务到来时唤醒)
                        //lambda表达式作为谓词,当条件(停止信号为true 或 任务队列非空)为真时,才会解除阻塞。
                        this->condition.wait(lock, [this] {
                            return (this->stop || !this->tasks.empty());
                            });

                        /* 传统忙等待:while (!(stop || !tasks.empty())) {} // 空循环消耗CPU */

                        if (this->stop && this->tasks.empty())
                        {
                            //如果线程池需要终止且任务队列为空则直接return
                            return;
                        }

                        //任务提取:从队列中取出任务并执行,使用std::move避免拷贝开销。
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }
                    //执行任务
                    task();
                }
            });
        }
    }

    //任务提交(enqueue方法)
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args)
        -> std::future<typename std::result_of<F(Args...)>::type> 
    {
        using return_type = typename std::result_of<F(Args...)>::type;

        //任务封装:使用std::packaged_task包装用户任务,支持异步返回结果。
        //智能指针管理:shared_ptr确保任务对象的生命周期延续至执行完毕。
        //完美转发:通过std::forward保持参数的左值/右值特性。
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
            );

        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mutex);

            if (stop)
            {
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }  

            tasks.emplace([task]() { (*task)(); });
            /* push传入的对象需要事先构造好,再复制过去插入容器中;
               而emplace则可以自己使用构造函数所需的参数构造出对象,并直接插入容器中。
               emplace相比于push省去了复制的步骤,则使用emplace会更加节省内存。*/
        }
        condition.notify_one();
        return res;
    }

    ~ThreadPool() 
    {
        //设置stop标志,唤醒所有线程,等待任务队列清空。
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
        for (std::thread& worker : workers)
        {
            worker.join();
        }
    }

private:
    std::vector<std::thread> workers;        //存储工作线程对象
    std::queue<std::function<void()>> tasks; //任务队列,存储待执行的任务

    std::mutex queue_mutex;                  //保护任务队列的互斥锁
    std::condition_variable condition;       //线程间同步的条件变量
    bool stop;                               //线程池是否停止标志
};

三、线程池源码解析

1. 成员变量

std::vector<std::thread> workers;        // 工作线程容器
std::queue<std::function<void()>> tasks; // 任务队列
std::mutex queue_mutex;                  // 队列互斥锁
std::condition_variable condition;       // 条件变量
bool stop;                               // 停机标志

设计要点:

  • 采用生产者-消费者模式,任务队列作为共享资源

  • 组合使用mutex+condition_variable实现线程同步

  • vector存储线程对象便于统一管理生命周期


2. 构造函数

2.1 线程初始化

explicit ThreadPool(size_t threads = std::thread::hardware_concurrency())
    : stop(false)
{
    if (threads == 0) 
    {
        threads = 1;
    }
        
    for (size_t i = 0; i < threads; ++i) 
    {
        workers.emplace_back([this] { /* 工作线程逻辑 */ });
    }
}

设计要点:

  • explicit防止隐式类型转换(如ThreadPool pool = 4;

  • 默认使用硬件并发线程数(通过hardware_concurrency()

  • 最少创建1个线程避免空池

  • 使用emplace_back直接构造线程对象


2.2 工作线程逻辑

for (;;)
{
    std::function<void()> task;
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        condition.wait(lock, [this] {
            return stop || !tasks.empty();
        });

        if (stop && tasks.empty()) 
        {
           return; 
        }

        task = std::move(tasks.front());
        tasks.pop();
    }
    task();
}

核心机制:

  • unique_lock配合条件变量实现自动锁管理

  • 双重状态检查(停机标志+队列非空)

  • 任务提取使用移动语义避免拷贝

  • 任务执行在锁作用域外进行


3. 任务提交(enqueue方法)

3.1 方法签名

template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
    -> std::future<typename std::result_of<F(Args...)>::type>

类型推导:

  • 使用尾置返回类型声明
  • std::result_of推导可调用对象的返回类型
  • 完美转发参数(F&&+Args&&...

3.2 任务封装

auto task = std::make_shared<std::packaged_task<return_type()>>
    (std::bind(std::forward<F>(f), std::forward<Args>(args)...));

封装策略:

  • packaged_task包装任务用于异步获取结果
  • shared_ptr管理任务对象生命周期
  • std::bind绑定参数(注意C++11的参数转发限制)

3.3 任务入队

tasks.emplace([task]() { (*task)(); });

优化点:

  • 使用emplace直接构造队列元素
  • Lambda捕获shared_ptr保持任务有效性
  • 显式解引用执行packaged_task

4. 析构函数

4.1 停机控制

~ThreadPool() 
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for (auto& worker : workers)
    {
        worker.join();
    }  
}

停机协议:

  1. 设置停机标志原子操作
  2. 广播唤醒所有等待线程
  3. 等待所有工作线程退出

5. 关键技术点解析

5.1 完美转发实现

std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  • 保持参数的左右值特性
  • 支持移动语义参数的传递
  • C++11的限制:无法完美转发所有参数类型

5.2 异常传播机制

  • 任务异常通过future对象传播
  • packaged_task自动捕获异常
  • 用户通过future.get()获取异常

5.3 内存管理模型

         [任务提交者]
             |
             v
       [packaged_task] <---- shared_ptr ---- [任务队列]
             |
             v
         [future]
  • 三重生命周期保障:
    1. 提交者持有future
    2. 队列持有任务包装器
    3. 工作线程执行任务

四、 性能特征分析

1. 时间复杂度

操作时间复杂度
任务提交(enqueue)O(1)(加锁开销)
任务提取O(1)
线程唤醒取决于系统调度

2. 空间复杂度

组件空间占用
线程栈每线程MB级
任务队列与任务数成正比
同步原语固定大小

五、 扩展优化方向

1. 任务窃取(Work Stealing)

  • 实现多个任务队列
  • 空闲线程从其他队列窃取任务

2. 动态线程池

void adjust_workers(size_t new_size) 
{
    if (new_size > workers.size()) 
    {
     	// 扩容逻辑
    } 
    else 
    {
        // 缩容逻辑
    }
}

3. 优先级队列

using Task = std::pair<int, std::function<void()>>; // 优先级+任务
   std::priority_queue<Task> tasks;

4. 无锁队列

moodycamel::ConcurrentQueue<std::function<void()>> tasks;

六、 典型问题排查指南

现象可能原因解决方案
任务未执行线程池提前析构延长线程池生命周期
future.get()永久阻塞任务未提交/异常未处理检查任务提交路径
CPU利用率100%忙等待或锁竞争优化任务粒度/使用无锁结构
内存持续增长任务对象未正确释放检查智能指针使用

该实现完整展现了现代C++线程池的核心设计范式,开发者可根据具体需求在此基础进行功能扩展和性能优化。理解这个代码结构是掌握更高级并发模式的基础。

七、 测试用例

使用实例(C++11兼容):

#include <iostream>

int main() 
{
    ThreadPool pool(4);
    
    // 提交普通函数
    auto future1 = pool.enqueue([](int a, int b) {
        return a + b;
    }, 2, 3);
    
    // 提交成员函数
    struct Calculator 
    {
        int multiply(int a, int b) 
        { 
            return a * b; 
        }
    } calc;
    auto future2 = pool.enqueue(std::bind(&Calculator::multiply, &calc, 
                                        std::placeholders::_1, 
                                        std::placeholders::_2), 4, 5);
    
    // 异常处理示例
    auto future3 = pool.enqueue([]() -> int {
        throw std::runtime_error("example error");
        return 1;
    });
    
    std::cout << "2+3=" << future1.get() << std::endl;
    std::cout << "4*5=" << future2.get() << std::endl;
    
    try 
    {
        future3.get();
    } 
    catch(const std::exception& e)
    {
        std::cout << "Caught exception: " << e.what() << std::endl;
    }
    
    return 0;
}

如果这篇文章对你有所帮助,渴望获得你的一个点赞!

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2293415.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构实战之线性表(三)

目录 1.顺序表释放 2.顺序表增加空间 3.合并顺序表 4.线性表之链表实现 1.项目结构以及初始代码 2.初始化链表(不带头结点) 3.链表尾部插入数据并显示 4.链表头部插入数据 5.初始化链表&#xff08;带头结点&#xff09; 6.带头结点的链表头部插入数据并显示 7.带头结…

【python】python基于机器学习与数据分析的手机特性关联与分类预测(源码+数据集)【独一无二】

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;专__注&#x1f448;&#xff1a;专注主流机器人、人工智能等相关领域的开发、测试技术。 python基于机器学习与数据分析的手机特性关联与分类…

ZOJ 1007 Numerical Summation of a Series

原题目链接 生成该系列值的表格 对于x 的 2001 个值&#xff0c;x 0.000、0.001、0.002、…、2.000。表中的所有条目的绝对误差必须小于 0.5e-12&#xff08;精度为 12 位&#xff09;。此问题基于 Hamming (1962) 的一个问题&#xff0c;当时的大型机按今天的微型计算机标准来…

全面解析文件上传下载删除漏洞:风险与应对

在数字化转型的时代&#xff0c;文件上传、下载与删除功能已经成为各类应用程序的标准配置&#xff0c;从日常办公使用的协同平台&#xff0c;到云端存储服务&#xff0c;再到社交网络应用&#xff0c;这些功能在给用户带来便捷体验、显著提升工作效率的同时&#xff0c;也隐藏…

【C语言深入探索】结构体详解(二):使用场景

目录 一、复杂数据的表示 二、数据的封装 三、多态的模拟 四、回调函数的实现 五、多线程编程 六、通信协议的实现和文件操作 6.1. 使用结构体实现简单通信协议 6.2. 使用结构体进行文件操作 七、图形界面编程 结构体在C语言中具有广泛的应用场景&#xff0c;以下是一…

【大模型】AI 辅助编程操作实战使用详解

目录 一、前言 二、AI 编程介绍 2.1 AI 编程是什么 2.1.1 为什么需要AI辅助编程 2.2 AI 编程主要特点 2.3 AI编程底层核心技术 2.4 AI 编程核心应用场景 三、AI 代码辅助编程解决方案 3.1 AI 大模型平台 3.1.1 AI大模型平台代码生成优缺点 3.2 AI 编码插件 3.3 AI 编…

RK3566-移植5.10内核Ubuntu22.04

说明 记录了本人使用泰山派&#xff08;RK3566&#xff09;作为平台并且成功移植5.10.160版本kernel和ubuntu22.04&#xff0c;并且成功配置&连接网络的完整过程。 本文章所用ubuntu下载地址&#xff1a;ubuntu-cdimage-ubuntu-base-releases-22.04-release安装包下载_开源…

从零开始实现一个双向循环链表:C语言实战

文章目录 1链表的再次介绍2为什么选择双向循环链表&#xff1f;3代码实现&#xff1a;从初始化到销毁1. 定义链表节点2. 初始化链表3. 插入和删除节点4. 链表的其他操作5. 打印链表和判断链表是否为空6. 销毁链表 4测试代码5链表种类介绍6链表与顺序表的区别7存储金字塔L0: 寄存…

51单片机 06 定时器

51 单片机的定时器属于单片机的内部资源&#xff0c;其电路的连接和运转均在单片机内部完成。 作用&#xff1a;1、用于计时&#xff1b;2、替代长时间的Delay&#xff0c;提高CPU 运行效率和处理速度。 定时器个数&#xff1a;3个&#xff08;T0、T1、T2&#xff09;&#xf…

【C++】P1957 口算练习题

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;题目描述输入格式&#xff1a;输出格式&#xff1a; &#x1f4af;我的做法代码实现&#xff1a; &#x1f4af;老师的做法代码实现&#xff1a; &#x1f4af;对比分析&am…

Workbench 中的热源仿真

探索使用自定义工具对移动热源进行建模及其在不同行业中的应用。 了解热源动力学 对移动热源进行建模为各种工业过程和应用提供了有价值的见解。激光加热和材料加工使用许多激光束来加热、焊接或切割材料。尽管在某些情况下&#xff0c;热源 &#xff08;q&#xff09; 不是通…

CCF-GESP 等级考试 2023年12月认证C++八级真题解析

2023年12月真题 一、单选题&#xff08;每题2分&#xff0c;共30分&#xff09; 正确答案&#xff1a;C 考察知识点&#xff1a;数学问题 解析&#xff1a;本题可抽象为分类计数问题&#xff0c;应使用加法原理&#xff0c;而不是乘法原理。答案为 ACB 的方案数 2 加上 ADB 的…

vscode搭建git

vscode搭建git 一、安装git二、vscode上搭建git(1) 先创建本地仓库再上传到远程仓库&#xff0c;远程仓库名是根据本地仓库名一致(2) 先创建远程仓库&#xff0c;再将本地仓库上传到指定远程仓库 一、安装git 网络教程很多&#xff0c;在此就不赘述了 参考&#xff1a;git安装…

解决Mac安装软件的“已损坏,无法打开。 您应该将它移到废纸篓”问题

mac安装软件时&#xff0c;如果出现这个问题&#xff0c;其实很简单 首先打开终端&#xff0c;输入下面的命令 sudo xattr -r -d com.apple.quarantine 输入完成后&#xff0c;先不要回车&#xff0c;点击访达--应用程序--找到你无法打开的app图标&#xff0c;拖到终端窗口中…

ChatGPT-4o和ChatGPT-4o mini的差异点

在人工智能领域&#xff0c;OpenAI再次引领创新潮流&#xff0c;近日正式发布了其最新模型——ChatGPT-4o及其经济实惠的小型版本ChatGPT-4o Mini。这两款模型虽同属于ChatGPT系列&#xff0c;但在性能、应用场景及成本上展现出显著的差异。本文将通过图文并茂的方式&#xff0…

读书笔记--分布式架构的异步化和缓存技术原理及应用场景

本篇是在上一篇的基础上&#xff0c;主要对分布式应用架构下的异步化机制和缓存技术进行学习&#xff0c;主要记录和思考如下&#xff0c;供大家学习参考。大家知道原来传统的单一WAR应用中&#xff0c;由于所有数据都在同一个数据库中&#xff0c;因此事务问题一般借助数据库事…

BUU10 [极客大挑战 2019]LoveSQL1

万能用户名&#xff08;密码随便&#xff09; 登录进去以后发现是这个东西&#xff0c;然而并没有什么卵用 然后就开始爆破数据库名字--表名--列名 注意&#xff1a;这道题需要将所有的表名都爆出来&#xff0c;需要在payload里头写 group_concat()&#xff0c;否则页面只会显…

tomcat核心组件及原理概述

目录 1. tomcat概述 1.1 概念 1.2 官网地址 2. 基本使用 2.1下载 3. 整体架构 3.1 核心组件 3.2 从web.xml配置和模块对应角度 3.3 如何处理请求 4. 配置JVM参数 5. 附录 1. tomcat概述 1.1 概念 什么是tomcat Tomcat是一个开源、免费、轻量级的Web服务器。 Tomca…

冰蝎v4.0.5 来啦

webshell始终是渗透测试的热门&#xff0c;上次护网写冰蝎检测规则&#xff0c;加密流量&#xff0c;有点压力&#xff0c;今天终于有空来复现一下&#xff0c;我知道玩知乎的大佬很多&#xff0c;轻一点喷&#xff0c;学习新知识不丢人&#xff5e; ailx10 1949 次咨询 4.9 …

【C++】B2120 单词的长度

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;题目描述&#x1f4af;我的做法代码实现&#xff1a;思路解析&#xff1a; &#x1f4af;老师的第一种做法代码实现&#xff1a;思路解析&#xff1a; &#x1f4af;老师的…