C++实现线程池

news2025/1/26 15:41:05

C++实现线程池

  • 一、前言
  • 二、线程池的接口设计
    • 2.1、类封装
    • 2.2、线程池的初始化
    • 2.3、线程池的启动
    • 2.4、线程池的停止
    • 2.5、线程的执行函数run()
    • 2.6、任务的运行函数
    • 2.7、等待所有线程结束
  • 三、测试线程池
  • 四、源码地址
  • 总结

一、前言

C++实现的线程池,可能涉及以下知识点:

  • decltype。
  • packaged_task。
  • make_shared。
  • mutex。
  • unique_lock。
  • notify_one。
  • future。
  • queue。
  • bind。
  • thread。
    等等。
    线程池逻辑

二、线程池的接口设计

(1)封装一个线程池的类。
(2)线程池的初始化:设置线程的数量。
(3)启动线程池:创建线程等工作。
(4)执行任务的函数。
(5)停止线程池。
(6)等所有任务执行完成,退出执行函数。

2.1、类封装

线程池类,采用c++11来实现。

#ifndef _CPP_THREAD_POOL_H_
#define _CPP_THREAD_POOL_H_

#include <iostream>
#include <functional>
#include <memory>
#include <queue>
#include <mutex>
#include <vector>
#include <thread>
#include <future>

#ifdef WIN32
#include <windows.h>
#else
#include <sys/time.h>
#endif

using namespace std;

void getNow(timeval *tv);
int64_t getNowMs();

#define TNOW    getNow()
#define TNOWMS  getNowMs()

class CPP_ThreadPool{
protected:
    struct TaskFunc{
        TaskFunc(uint64_t expireTime):_expireTime(expireTime){}
        int64_t _expireTime=0;//超时的绝对时间
        function<void()> _func;
    };
    typedef shared_ptr<TaskFunc> TaskFuncPtr;

    /* 
     * @brief 获取任务 ** 
     *@return TaskFuncPtr 
     */
    bool get(TaskFuncPtr& task);

    /*
    * @brief 线程池是否退出
    */
    bool isTerminate()
    {
        return _bTerminate;
    }

    /*
    * @brief 线程运行态
    */
   void run();

public: 
    /*
     * @brief 构造函数 
     */
    CPP_ThreadPool(); 

    /* 
    * @brief 析构, 会停止所有线程 
    */
    virtual ~CPP_ThreadPool();

    /* 
     * * @brief 初始化. 
     * * @param num 工作线程个数 
     */
    bool init(size_t num);

    /*
    * @brief 停止所有线程, 会等待所有线程结束 
    */
   void stop();

   /*
   * @brief 启动所有线程 
   */
    bool start();

    /* 
     * @brief 等待当前任务队列中, 所有工作全部结束(队列无任务). 
     * @param millsecond 等待的时间(ms), -1:永远等待 
     * @return true, 所有工作都处理完毕 
     * false,超时退出 
     */
    bool waitForAllDone(int millsecond=-1);

   /*
    * @brief 获取线程个数.
    * @return size_t 线程个数 
    */
   size_t getThreadNum()
   {
        unique_lock<mutex> lock(_mutex);
        return _threads.size();
   }

   /*
    *  @brief 获取当前线程池的任务数
    * @return size_t 线程池的任务数 
    */
   size_t getJobNum()
   {
        unique_lock<mutex> lock(_mutex);
        return _tasks.size();
   }

   /*
   * @brief 用线程池启用任务(F是function, Args是参数) ** 
   * @param ParentFunctor 
   * @param tf 
   * @return 返回任务的future对象, 可以通过这个对象来获取返回值 
   */
    template <class F,class... Args>
    auto exec(F&& f, Args&&... args)->future<decltype(f(args...))>
    {
        return exec(0,f,args...);
    }

    /* 
    * unused.
    *
    * @brief 用线程池启用任务(F是function, Args是参数) 
    * @param 超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃 
    * @param bind function 
    * @return 返回任务的future对象, 可以通过这个对象来获取返回值 
    *
    * template <class F, class... Args> 
    * 它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数 
    * auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))> 
    * std::future<decltype(f(args...))>:返回future,调用者可以通过future获取返回值 
    * 返回值后置
    */
    template<class F,class... Args>
    auto exec(int64_t timeoutMs,F&& f,Args&&... args) -> future<decltype(f(args...))>
    {
        //获取现在时间
        int64_t expireTime=(timeoutMs==0)?0:TNOWMS+timeoutMs;
        // 定义返回值类型
        using retType=decltype(f(args...));
        // 封装任务
        auto task=make_shared<packaged_task<retType()>>(bind(forward<F>(f),forward<Args>(args)...));
        // 封装任务指针,设置过期时间
        TaskFuncPtr fPtr=make_shared<TaskFunc>(expireTime);
        fPtr->_func=[task](){
            (*task)();
        };

        unique_lock<mutex> lock(_mutex);
        // 插入任务
        _tasks.push(fPtr);
        // 唤醒阻塞的线程,可以考虑只有任务队列为空的情 况再去notify
        _condition.notify_one();

        return task->get_future();
    }

protected:
    size_t  _threadNum;//线程数量
    bool    _bTerminate;//判定是否终止线程池
    mutex   _mutex;     //唯一锁
    vector<thread*> _threads;   //工作线程数组
    queue<TaskFuncPtr> _tasks;  //任务队列
    condition_variable _condition;//条件变量
    atomic<int>         _atomic{0};//原子变量
};


#endif

使用示例:

CPP_ThreadPool tpool; 
tpool.init(5); //初始化线程池线程数 
//启动线程方式 
tpool.start(); 
//将任务丢到线程池中* 
tpool.exec(testFunction, 10); //参数和start相同 
//等待线程池结束 
tpool.waitForAllDone(1000); //参数<0时, 表示无限等待(注意有人调用stop也会推出) 
//此时: 外部需要结束线程池是调用 
tpool.stop(); 

注意:ZERO_ThreadPool::exec执行任务返回的是个future, 因此可以通过future异步获取结果, 比如:

int testInt(int i) 
{ 
    return i; 
} 
auto f = tpool.exec(testInt, 5); 
cout << f.get() << endl; //当testInt在线程池中执行后, f.get()会返回数值5 

class Test 
{ 
public: 
    int test(int i); 
}; 
Test t; 
auto f = tpool.exec(std::bind(&Test::test, &t, std::placeholders::_1), 10); 
//返回的future对象, 可以检查是否执行 
cout << f.get() << endl; 

2.2、线程池的初始化

主要是设置线程池中线程的数量,如果线程池已经存在则直接返回,防止重复初始化。

bool CPP_ThreadPool::init(size_t num)
{
    unique_lock<mutex> lock(_mutex);
    if(!_threads.empty())
        return false;
    _threadNum=num;
    return true;
}

2.3、线程池的启动

根据设置的线程数量,创建线程并保存在一个数组中。如果线程池已经存在则直接返回,防止重复启动。

bool CPP_ThreadPool::start()
{
    unique_lock<mutex> lock(_mutex);
    if(!_threads.empty())
        return false;
    
    for(size_t i=0;i<_threadNum;i++)
    {
        _threads.push_back(new thread(&CPP_ThreadPool::run,this));
    }
    return true;
}

2.4、线程池的停止

设置线程退出条件,并通知所有线程。停止时,要等待所有线程都执行完任务,再销毁线程。
需要注意锁的粒度。

void CPP_ThreadPool::stop()
{
    // 注意要有这个{},不然会死锁。
    {
        unique_lock<mutex> lock(_mutex);
        _bTerminate=true;
        _condition.notify_all();
    }
    size_t thdCount=_threads.size();
    for(size_t i=0;i<thdCount;i++)
    {
        if(_threads[i]->joinable())
        {
            _threads[i]->join();
        }
        delete _threads[i];
        _threads[i]=NULL;
    }
    unique_lock<mutex> lock(_mutex);
    _threads.clear();
}

2.5、线程的执行函数run()

读取任务:判断任务是否存在,如果任务队列为空,则进入等待状态直到任务队列不为空或退出线程池(这里需要两次判断,因为可能存在虚假唤醒)。
执行任务:调用匿名函数。
检测所有任务都是否执行完毕:这里使用了原子变量来检测任务是否都执行完,原因在于任务队列为空不代表任务已经执行完(任务可能还在运行中、也可能是任务刚弹出还没运行),使用原子变量来计数就更严谨。

bool CPP_ThreadPool::get(TaskFuncPtr& task)
{
    unique_lock<mutex> lock(_mutex);
    if(_tasks.empty())//判断任务是否存在
    {
        _condition.wait(lock,[this]{
            return _bTerminate || !_tasks.empty();//唤醒条件
        });
    }

    if(_bTerminate)
        return false;

    if(!_tasks.empty())//判断任务是否存在
    {
        task=move(_tasks.front());// 使用移动语义
        _tasks.pop();//弹出一个任务
        return true;
    }
    return false;
}

// 执行任务的线程
void CPP_ThreadPool::run()
{
    while(!isTerminate())
    {
        TaskFuncPtr task;
        // 读取任务
        bool ok=get(task);
        if(ok)
        {
            ++_atomic;
            try
            {
                if(task->_expireTime!=0 && task->_expireTime < TNOWMS)
                {
                    // 处理超时任务
                }
                else
                    task->_func();//执行任务
            }
            catch(...)
            {}
            --_atomic;
            // 任务执行完毕,这里只是为了通知waitForAllDone
            unique_lock<mutex> lock(_mutex);
            if(_atomic==0 && _tasks.empty())
                _condition.notify_all();
        }
    }
}

2.6、任务的运行函数

这里使用了可变模块参数、智能指针、bind、function、捕获列表的相关技术知识。
返回任务的future对象, 可以通过这个对象来获取返回值。
超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃。
可变模块参数对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数。

  /*
   * @brief 用线程池启用任务(F是function, Args是参数) ** 
   * @param ParentFunctor 
   * @param tf 
   * @return 返回任务的future对象, 可以通过这个对象来获取返回值 
   */
    template <class F,class... Args>
    auto exec(F&& f, Args&&... args)->future<decltype(f(args...))>
    {
        return exec(0,f,args...);
    }

    /* 
    * unused.
    *
    * @brief 用线程池启用任务(F是function, Args是参数) 
    * @param 超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃 
    * @param bind function 
    * @return 返回任务的future对象, 可以通过这个对象来获取返回值 
    *
    * template <class F, class... Args> 
    * 它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数 
    * auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))> 
    * std::future<decltype(f(args...))>:返回future,调用者可以通过future获取返回值 
    * 返回值后置
    */
    template<class F,class... Args>
    auto exec(int64_t timeoutMs,F&& f,Args&&... args) -> future<decltype(f(args...))>
    {
        //获取现在时间
        int64_t expireTime=(timeoutMs==0)?0:TNOWMS+timeoutMs;
        // 定义返回值类型
        using retType=decltype(f(args...));
        // 封装任务
        auto task=make_shared<packaged_task<retType()>>(bind(forward<F>(f),forward<Args>(args)...));
        // 封装任务指针,设置过期时间
        TaskFuncPtr fPtr=make_shared<TaskFunc>(expireTime);
        fPtr->_func=[task](){
            (*task)();
        };

        unique_lock<mutex> lock(_mutex);
        // 插入任务
        _tasks.push(fPtr);
        // 唤醒阻塞的线程,可以考虑只有任务队列为空的情 况再去notify
        _condition.notify_one();

        return task->get_future();
    }

2.7、等待所有线程结束

bool CPP_ThreadPool::waitForAllDone(int millsecond)
{
    unique_lock<mutex> lock(_mutex);
    if(_tasks.empty())
        return true;
    if(millsecond<0)
    {
        _condition.wait(lock,[this]{ return _tasks.empty();});
        return true;
    }
    else
    {
        return _condition.wait_for(lock,chrono::milliseconds(millsecond),[this]{ return _tasks.empty();});
    }
}

三、测试线程池

#include <iostream>
#include "cppThreadPool.h"

using namespace std;

void func1(int a) 
{ 
    cout << "func1() a=" << a << endl; 
}
void func2(int a, string b) 
{ 
    cout << "func2() a=" << a << ", b=" << b<< endl; 
}

void func3()
{
    cout<<"func3"<<endl;
}

void test01()
{
    cout<<"test 01"<<endl;
    CPP_ThreadPool threadpool;
    threadpool.init(2);
    threadpool.start();//启动线程池
    // 执行任务
    threadpool.exec(func1,10);
    threadpool.exec(func2,20,"FLY.");
    threadpool.exec(1000,func3);


    threadpool.waitForAllDone();
    threadpool.stop();
}

int func1_future(int a) 
{ 
    cout << "func1() a=" << a << endl; 
    return a; 
}

string func2_future(int a, string b) 
{ 
    cout << "func2() a=" << a << ", b=" << b<< endl; 
    return b; 
}

void test02()
{
    cout<<"test 02"<<endl;
    CPP_ThreadPool threadpool;
    threadpool.init(2);
    threadpool.start();//启动线程池

    future<decltype(func1_future(0))> ret01=threadpool.exec(func1_future,10);
    future<string> ret02=threadpool.exec(func2_future,20,"FLY.");

    threadpool.waitForAllDone();

    cout<<"ret01 = "<<ret01.get()<<endl;
    cout<<"ret02 = "<<ret02.get()<<endl;

    
    threadpool.stop();

}

class Test{
public:
    int test(int a)
    {
        cout<<_name<<": a = "<<a<<endl;
        return a+1;
    }
    void setname(string name)
    {
        _name=name;
    }
    string _name;

};

void test03()
{
    cout<<"test 03"<<endl;
    CPP_ThreadPool threadpool;
    threadpool.init(2);
    threadpool.start();//启动线程池
    Test t1;
    Test t2;
    t1.setname("Test 1");
    t2.setname("Test 2");
    auto f1=threadpool.exec(bind(&Test::test,&t1,placeholders::_1),10);
    auto f2=threadpool.exec(bind(&Test::test,&t2,placeholders::_1),20);

    threadpool.waitForAllDone();
    cout<<"f1 = "<<f1.get()<<endl;
    cout<<"f2 = "<<f2.get()<<endl;
    threadpool.stop();

}

int main(int argc,char **argv)
{
    // 简单测试线程池
    test01();
    // 测试任务函数返回值
    test02();
    // 测试类对象函数的绑定
    test03();

    return 0;
}

执行结果:

test 01
func1() a=10
func2() a=20, b=FLY.
func3
test 02
func1() a=10
func2() a=20, b=FLY.
ret01 = 10
ret02 = FLY.
test 03
Test 1: a = 10
Test 2: a = 20
f1 = 11
f2 = 21

四、源码地址

源码已经上传github。

总结

线程池的核心:初始化、线程启动、执行函数、线程停止。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/346363.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JVM - 垃圾回收

目录 垃圾回收基础和根搜索算法 垃圾回收概述 根搜索算法 引用分类 垃圾回收基础(跨代引用、记忆集、写屏障、判断垃圾的步骤、STW) 跨代引用 记忆集(Remembered Set) 写屏障 判断是否垃圾的步骤 GC类型 Stop-The-World 垃圾收集类型 判断类无用的条件 垃圾回收算…

Mybatis源码(2) - SqlSessionTemplate的介绍及创建过程

0. 前言1. Spring对SqlSessionTemplate的管理1.1. SqlSessionTemplate的创建&#xff1a;1.2. MapperProxy中sqlSession的来源&#xff1a;2. SqlSessionInterceptor中的getSqlSession0. 前言 众所周知&#x1f60f;:MyBatis通过SqlSessionFactory 创建SqlSession去调用Executo…

在VMware17 Pro中设置创建虚拟机Ubuntu 20

在VMware17 Pro中设置创建虚拟机Ubuntu 200 前言1 安装Ubuntu 20步骤0 前言 书接上回&#xff0c;安装好了VMware17 Pro之后&#xff0c;就是安装虚拟机了&#xff0c;前提是下好了系统安装包&#xff0c;以Ubuntu 20为例 1 安装Ubuntu 20步骤 首先点击创建新的虚拟机 新建…

利用NAS免费部署动态解析实现内网穿透

‍ 想要从外网访问家中的NAS等设备&#xff0c;一般来说我们需要知道家中路由器的公网IP。 现在固定的公网IP基本上很难免费申请到了&#xff0c;但是一般来说运营商可以免费提供一个动态变化的公网IP&#xff1a;当路由设备重启时&#xff0c;运营商会给你重新分配一个新的I…

PHP加载3D模型【WebGL】

这是另一篇关于如何使用 PHP加载 3D 模型的文章。 在这里&#xff0c;我使用 Laravel 作为后端及其存储。 我在前端使用 Three.Js 库来渲染和显示 3D 模型。 我将向您展示如何上传 3D 模型以及如何从 Laravel 存储加载 3D 模型。 请仔细完成以下步骤。 大家可以在评论区提出任何…

8Manage PPM项目管理系统独特的功能:项目完整性保护

项目有其内在复杂性&#xff08;项目管理的科学部分&#xff09;&#xff0c;这种复杂性可以进行划分和克服。项目也有人为的或偶然的复杂性&#xff08;项目管理的艺术部分&#xff09;&#xff0c;这种复杂性无法进行划分和克服。偶然的高复杂性会影响并使内在复杂性难以管理…

【系统架构设计师】计算机组成与体系结构 ① ( 计算机组成 | CPU | 存储器 | 总线 | IO 外设 | CPU 组成 | 运算器 | 控制器 )

文章目录一、计算机组成与体系结构二、计算机组成结构三、CPU 组成1、运算器2、控制器一、计算机组成与体系结构 计算机组成与体系结构 对应 大学的 计算机组成原理 课程 , 主要分为 : 计算机组成体系结构划分 两大知识板块 ; 在架构师考试时 , 平均分值是 3 分 ; 计算机组成…

三相可控全桥整流与DC Buck变换电路设计仿真问题汇总

目 录 问题 一、开关管没有打开的情况下&#xff0c;DC Buck输出负电压&#xff1f; 二、问题分析 1.输出端存在与母线电压反相的电压&#xff0c;因此可以确定为差模感应电压&#xff0c;如果输出端与母线端产生的是大小相等&#xff0c;方向相同的同相电压&#xff0c;则为共…

大数据框架之Hadoop:HDFS(六)DataNode(面试开发重点)

6.1DataNode工作机制 DataNode工作机制&#xff0c;如下图所示。 1&#xff09;一个数据块在DataNode上以文件形式存储在磁盘上&#xff0c;包括两个文件&#xff0c;一个是数据本身&#xff0c;一个是元数据包括数据块的长度&#xff0c;块数据的校验和&#xff0c;以及时间戳…

ROS笔记(4)——发布者Publisher与订阅者Subscribe的编程实现

发布者 以小海龟的话题消息为例,编程实现发布者通过/turtle1/cmd_vel 话题向 turtlesim节点发送消息&#xff0c;流程如图 步骤一 创建功能包&#xff08;工作空间为~/catkin_ws/src&#xff09; $ cd ~/catkin_ws/src $ catkin_create_pkg learning_topic roscpp rospy s…

FLStudio水果最新版本V21支持中文语言

FL Studio简称FL&#xff0c;全称&#xff1a;Fruity Loops Studio习惯叫它水果。软件现有版本是FLStudio21&#xff0c;已全面升级支持简体中文语言界面 。FL Studio 能让你的计算机就像是全功能的录音室一样&#xff0c;完成编曲、剪辑、录音、混音等工作&#xff0c;帮助爱好…

2023大厂高频软件测试面试真题(附答案)

一、接口测试面试题&#xff1a;1.接口测试是怎么做的&#xff0c;如何分析数据&#xff1f;接口测试实际跟一般测试不同就是测试用例的设计部分。获取接口规范。设计接口测试功能用例&#xff08;主要从用户角度出发看接口能否实现业务需求&#xff0c;用例设计就是黑盒用例那…

2022财年净新增1159家门店,百胜中国门店高速扩张背后有何阳谋?

中国最大餐饮企业百胜中国控股有限公司&#xff08;下称“百胜中国”&#xff09;&#xff0c;2月8日发布了2022年度第四季度及全年未经审核的财务业绩。 从财报数据来看&#xff0c;这家拥有肯德基、必胜客、黄记煌等诸多餐饮品牌的巨头&#xff0c;已经顺利渡过了疫情笼罩下…

计算机网络(第7版)第五章(物理层)知识点整理

计算机网络 参考书目&#xff1a;《计算机网络&#xff08;第7版&#xff09;》&#xff1a;谢希仁——电子工业出版社 《精通Windows Sockets网络开发--基于Visual C实现》&#xff1a;孙海民——人民邮电出版社 第五章&#xff1a;物理层计算机网络一、基本概念二、传输媒体…

和数集团打造《神念无界:源起山海》,诠释链游领域创新与责任

首先&#xff0c;根据网上资料显示&#xff0c;一部《传奇》&#xff0c;二十年热血依旧。 《传奇》所缔造的成绩&#xff0c;承载的是多少人的青春回忆&#xff0c;《传奇》无疑已经在游戏史上写下了浓墨重彩的一笔。 相比《传奇》及背后的研发运营公司娱美德名声大噪&#x…

uniapp上高德(百度)地图API的使用(APP安卓)

前言由于在app中没有document,window等对象&#xff0c;所以使用在pc端传统方法引入的方式&#xff0c;将会发现无法引用成功&#xff0c;会出现白屏现象。目前有两种解决方式&#xff1a;使用uniapp的web-view方式&#xff08;百度地图&#xff09;使用renderjs来调用document…

【51媒体网】媒体邀约行业诞生及其前景预测

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。一&#xff0c;媒体邀约行业的诞生媒体邀约行业是随着现代社会媒体的普及而逐渐形成的。随着互联网和社交媒体的快速发展&#xff0c;媒体作为信息传播的重要渠道之一&#xff0c;越来越成…

PPP协议实验及配置

PPP协议实验拓扑图PPP认证配置PAP认证CHAP认证接口地址不在一个网段&#xff1f;地址自动协商通过IPCP方式获取到默认路由拓扑图 首先在设备上增添两个Serial接口&#xff1a; PPP认证配置 PAP认证 AR1作为认证方&#xff0c;AR2作为被认证方&#xff1a; AR1&#xff1a…

Flask-SQLAlchemy的安装使用 一对多 多对多join查询

Flask-SQLAlchemy安装及设置 SQLALchemy 实际上是对数据库的抽象&#xff0c;让开发者不用直接和 SQL 语句打交道&#xff0c;而是通过 Python 对象来操作数据库&#xff0c;在舍弃一些性能开销的同时&#xff0c;换来的是开发效率的较大提升SQLAlchemy是一个关系型数据库框架…

yaml配置文件

最近在写代码&#xff0c;发现随着网络的增加&#xff0c;代码变得越来越冗余&#xff0c;所以就想着写一个网络的配置文件&#xff0c;把网络的配置放到一个文件中&#xff0c;而不再主函数中&#xff0c;这样代码开起来就好看了&#xff0c;调试的时候也方便了。之前写过一篇…