C++ 面向对象技术实战:实现基于 POSIX 线程标准封装的线程池

news2025/1/10 5:39:13

线程池基础概述

为什么要有线程池?假设没有使用线程池时,一个请求用一个子线程来处理。每来一个请求,都得创建子线程,子线程执行请求,关闭子线程。当请求量(并发)比较大的时候,频繁地创建和关闭子线程,也是有开销的。

因此提出线程池,提前开辟好N个子线程,当有任务过来的时候,先放到任务队列中,之后N个子线程从任务队列中获取任务并执行,这样能大大提高程序的执行效率。

其实当任务数大于线程池中子线程的数目的时候,就需要将任务放到缓冲区(队列)里面,所以本质上还是一个生产者消费者模型。

补充一个查看线程的状态的命令:

ps -elLf | grep xxx 1

面向对象的线程池封装

UML 类图设计

在这里插入图片描述

而 TaskQueue 类的类图设计是在之前的文章里就介绍过的,这里只添加了一点改动,将原来的 int 类型数据给改成了一个 Elem 类型,关于 TaskQueue 类可以参考我之前的文章:C++ 面向对象技术实战:实现基于 POSIX 线程标准的线程封装并解决生产者消费者问题;

代码实现

首先本文的代码内容全部建立在之前的解决生产者消费者问题的基础之上,因此代码会复用之前的,如果还没看过的话可以先去学习一下上一篇文章:C++ 面向对象技术实战:实现基于 POSIX 线程标准的线程封装并解决生产者消费者问题;

那么接下来我们就正式开始线程池的代码编写!

首先将之前的代码拷贝进当前目录:

在这里插入图片描述

现在我们先对 TaskQueue.h 进行修改:

#ifndef _TASKQUEUE_H__
#define _TASKQUEUE_H__

#include <queue>
#include "MutexLock.h"
#include "Condition.h"
#include "Task.h"

using std::queue;
//先重定义一个任务类型Task,泛化接收各种数据
using Elem = Task*;

class TaskQueue{
public:
  TaskQueue(size_t _queSize);
  ~TaskQueue();
  bool empty() const;
  bool full() const;
  void push(const Elem &value);
  Elem pop();
  
  //用来唤醒 ThreadPool 中阻塞线程使用的函数
  //将所有等待在_notEmpty上的线程唤醒
  void wakeup();

private:
  size_t _queSize;
  queue<Elem> _que;
  MutexLock _mutex;
  Condition _notEmpty;
  Condition _notFull;
  bool _flag;
};

#endif

因为引入了 Task 类型,因此我们现在补一下 Task.h :

#ifndef _TASK_H__
#define _TASK_H__

//对于 Task 类只需要有一个虚方法process即可
//因此 Task 实际上是个接口
class Task{
  public:
    //其子类当实现这个接口以提供给线程池要执行的任务
    virtual void process() = 0;
    
    //如果一个类里面含有虚函数,那么这个时候将它的
    //析构函数也给设为虚函数是一种最佳实践
    virtual ~Task(){

    }
};

#endif

现在我们就可以去写一个 TaskQueue.cc 实现文件了:

#include "TaskQueue.h"

TaskQueue::TaskQueue(size_t queSize)
:_queSize(queSize)
  ,_que()
  ,_mutex()
  ,_notEmpty(_mutex)
  ,_notFull(_mutex)
  ,_flag(true)
{

}

TaskQueue::~TaskQueue(){

}

bool TaskQueue::empty() const{
  return _que.size() == 0;
}

bool TaskQueue::full() const{
  return _que.size() == _queSize;
}

void TaskQueue::push(const Elem &value){
  MutexLockGuard autoLock(_mutex);
  while(full()){
    _notFull.wait();
  }
  _que.push(value);

  _notEmpty.notify();
}

Elem TaskQueue::pop(){
  MutexLockGuard autoLock(_mutex);
  while(_flag && empty()){
    _notEmpty.wait();
  }
  if(_flag){
    Elem res = _que.front();
    _que.pop();
  
    _notFull.notify();
  
    return res;
  } 
  else{
    return nullptr;
  }
}

void TaskQueue::wakeup(){
  _flag = false;
  _notEmpty.notifyAll();
}

接下来就开始实现重头戏,也就是线程池这个类 ThreadPool.h :

#ifndef __THREADPOOL_H__
#define __THREADPOOL_H__

#include "Thread.h"
#include "TaskQueue.h"
#include <vector>
#include <memory>

using std::vector;
using std::unique_ptr;

class ThreadPool{
  //这样WorkThread才能访问私有的 threadFunc 方法
  friend class WorkThread;
  public:
    ThreadPool(size_t threadNum,size_t queSize);
    ~ThreadPool();

    void start(); //线程池启动,也就是初始化所有子线程
    void stop(); //线程池关闭,也就是join掉所有子线程
    void addTask(Task*); //往线程池中的任务队列里添加任务
    Task* getTask(); //从任务队列中获取任务
  
  private:
    void threadFunc(); //子线程真正执行的操作

  private:
    size_t _threadNum; //线程池大小,也就是线程数量
    size_t _queSize; //任务队列大小
    vector<unique_ptr<Thread>> _threads; //用来存放线程的容器
    TaskQueue _taskQue; //任务队列
    bool _isExit; //线程池是否退出
};

#endif

然后是实现文件 ThreadPool.cc:

#include "ThreadPool.h"
#include "WorkThread.h"
#include <unistd.h>

ThreadPool::ThreadPool(size_t threadNum,size_t queSize)
: _threadNum(threadNum)
  ,_queSize(queSize)
  ,_taskQue(_queSize)
  ,_isExit(false)
{
  //初始化 vector 的空间大小
  _threads.reserve(_threadNum);
}

ThreadPool::~ThreadPool(){
  if(!_isExit){
    stop();
    _isExit = true;
  }
}

//线程池开始执行的时候,其实就是工作线程已经开启
void ThreadPool::start(){
  //先将全部的线程创建并保存起来
  for(size_t idx=0; idx<_threadNum; ++idx){
    //创建线程
    unique_ptr<Thread> up(new WorkThread(*this));
    //存入线程容器中
    //注意对于 unique_ptr而言是没有拷贝构造函数的
    //因此要用move将其转为右值调用移动语义来完成push_back
    _threads.push_back(std::move(up));
  }

  //然后再全部启动起来
  for(auto& th : _threads){
    th->start();//创建工作线程的id,让工作线程开始运行
  }
} 

//线程池关闭,也就是join掉所有子线程
//子线程也就是工作线程WorkThread就是从Thread继承过来的
//所以每个工作线程执行join方法即可
void ThreadPool::stop(){
  //确保_taskQue任务队列中的任务都被做完了才开始关闭线程池
  while(!_taskQue.empty()){
    sleep(1);
  }
  //表示线程池将要退出了
  _isExit = true;
  //唤醒一下可能正在沉睡的线程以进行join
  _taskQue.wakeup();
  //进行线程回收
  for(auto& th : _threads){
    th->join();
  }
} 

//往线程池中的任务队列里添加任务
//等同于生产者消费者问题中的生产者角色
void ThreadPool::addTask(Task* ptask){
  if(ptask){
    _taskQue.push(ptask);
  }
} 

//从任务队列中获取任务
Task* ThreadPool::getTask(){
  return _taskQue.pop();
} 
  
//在线程池中封装的任务,这个任务的实际执行者就是WorkThread
//这个threadFunc方法也就是子线程真正所要执行的操作
void ThreadPool::threadFunc(){
  //只要线程池还没有退出,那么就应该一直获取并执行任务
  while(!_isExit){
    //先获取任务
    //但如果getTask()执行的过快的话,会来不及将_isExit设置为 true
    //会一直卡在getTask这里,也就是卡在Condition的wait上面,
    //那这样的话就没法正常退出线程池了
    //因此我们要让getTask()执行的慢一点,让线程池的stop也有可能先执行,
    //让stop函数将_isExit 设置为 true,这样就能够顺利退出线程池
    Task* ptask = getTask();
    //再执行任务
    if(ptask){
      ptask->process();
    }
  }
} 

在编写 ThreadPool.cc 的过程中我们用到了 WorkThread 类,因此我们现在去将 WorkThread 类实现一下,依然是先实现头文件 WorkThread.h:

#ifndef __WORKTHREAD_H__
#define __WORKTHREAD_H__

#include "Thread.h"
#include "ThreadPool.h"

class WorkThread: public Thread{
public:
  WorkThread(ThreadPool& threadPool): _threadPool(threadPool){

  }

  void run() override{
    _threadPool.threadFunc();
  }

  ~WorkThread(){

  }

private:
  ThreadPool& _threadPool;

};

#endif

现在只剩 MyTask 文件还没实现了,不过为了方便起见,我们可以将这个类直接写在 TestThreadPool.cc 测试文件中即可,因为毕竟测试也要用到这个嘛,来写一下 TestThreadPool.cc 测试文件:

#include "Task.h"
#include "ThreadPool.h"
#include <stdlib.h>
#include <unistd.h>
#include <iostream>

using namespace std;

class MyTask: public Task{
private:
  void process() override{
    ::srand(clock());
    int number = ::rand()%100;
    cout << "MyTask number = " << number << endl;
    //sleep(1);
  }
};

int main(){
  //创建一个线程池,线程数量为5,任务队列大小为10
  ThreadPool threadPool(5, 10);
  //创建任务
  unique_ptr<Task> task(new MyTask());
  //启动线程池,五个子线程创建并运行
  threadPool.start();
  
  //向线程池中投放任务,放20个任务
  int cnt = 20;
  while(cnt-- > 0){
    //因为task是智能指针,是个对象,而addTask函数的参数是Task*
    //因此我们可以使用unique_ptr的get函数获得其所托管内存资源的原始裸指针
    threadPool.addTask(task.get());
    cout << "cnt = " << cnt << endl;
    //sleep(1);
  }

  //关闭线程池,五个子线程进行退出操作
  threadPool.stop();
  return 0;
}

运行结果如下:

在这里插入图片描述

总结

最后的线程池项目目录结构如下:

在这里插入图片描述

最后如果有需要源码的朋友,可以直接访问我的仓库获取源码:基于面向对象的方式实现的线程池;

以上就是本文的全部内容。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2039118.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

新能源遇“秋老虎”,8月第二周销量集体下滑,问界惨遭腰斩

文/王俣祺 导语&#xff1a;随着日前7月份乘用车销量的公布&#xff0c;我们发现7月并没有因6月各车企的“冲量”行为迎来反噬&#xff0c;对于这种“淡季不淡”的现象市场上一片看好。但从近日公布的8月销量数据来看&#xff0c;人们对于“秋老虎”的恐怖可以说是一无所知。随…

MySQL学习[5] ——MySQL日志

五、MySQL日志 5.1 MySQL中有哪些日志&#xff1f; MySQL中主要有三种日志&#xff1a;undo log&#xff08;回滚日志&#xff09;、redo log&#xff08;重做日志&#xff09;、binlog&#xff08;归档日志&#xff09;&#xff0c;简单介绍&#xff1a; undo log&#xff…

Redis 高级篇(分布式缓存)

一、Redis分布式缓存 单点Redis问题&#xff1a; 数据丢失&#xff08;实现Redis数据持久化&#xff09;并发能力&#xff08;搭建主从集群&#xff0c;实现读写分离&#xff09;存储能力&#xff08;搭建分片集群&#xff0c;利用插槽机制实现动态扩容&#xff09;故障恢复能…

张宇36讲重点勾划+30天保底120带刷计划

先说结论&#xff0c;张宇36讲不适合目标100分的同学去用&#xff01; 张宇36讲&#xff0c;有一个问题&#xff0c;就是内容太多了&#xff1a; 页数达到了1200多页。这个恐怖的内容量&#xff0c;恐怕没有人可以看完。 但是张宇老师一开始说&#xff0c;不会删减任何内容&…

TinyWebserver的复现与改进(5):HTTP报文的解析与响应

GitHub - yzfzzz/MyWebServer: Linux高并发服务器项目&#xff0c;参考了TinyWebServer&#xff0c;将在此基础上进行性能改进与功能增加。为方便读者学习&#xff0c;附带详细注释和博客&#xff01; TinyWebserver的复现与改进&#xff08;1&#xff09;&#xff1a;服务器环…

模型训练与验证minicpm-v

minicpm-v 模型进行微调并进行验证 训练使用混合数据集进行训练&#xff0c;对minicpm-V进行lora微调&#xff0c;微调后使用llama3_1对输出结果与标签值进行比对&#xff0c;计算准确率。 验证代码为: # URL https://swift.readthedocs.io/zh-cn/latest/LLM/VLLM%E6%8E%A8%…

PMP到底有什么用?

PMP 就是项目管理证书&#xff0c;全称是项目管理专业人士资格认证&#xff0c;对于一个在项目管理岗位混迹五年的老油条来说&#xff0c;PMP 证书是敲开项目管理岗位的第一块砖&#xff0c;每年考 PMP 的人都很多&#xff0c;要是 PMP 证书没有价值&#xff0c;还会有那么多人…

Tomcat下载安装文档

简介 Tomcat服务器软件是一个免费的开源的web应用服务器。是Apache软件基金会的一个核心项目。由Apache&#xff0c;Sun和其他一些公司及个人共同开发而成。 由于Tomcat只支持Servlet/JSP少量JavaEE规范&#xff0c;所以是一个开源免费的轻量级Web服务器。 JavaEE规范&#x…

Java IO流使用方法 (常见方法)

Java系列文章目录 补充内容 Windows通过SSH连接Linux 第一章 Linux基本命令的学习与Linux历史 文章目录 Java系列文章目录一、前言二、学习内容&#xff1a;三、问题描述四、解决方案&#xff1a;4.1 File 的使用4.2 防止乱码问题 五、总结&#xff1a;5.1 学习总结&#xff1…

IPFS、IPNS 网站部署

目录 概念IPFS 网站IPNS 网站网站迁移到 IPFS/IPNS1. 连接 Github2. 选择仓库3. 配置 Build4. 绑定域名5. 绑定 IPNS 域名6. 检查 DNSLink概念 以 https://bhitdao.com/ 为例 IPFS 网站 链接为 Hash: ipfs://bafybeifxwlnnvuhbxiszvs2kkckxkxfy36chzoy2f7nrempkpznxrudbsm/…

开源AI智能名片微信小程序:以人性洞察与资源优化为驱动的社群营销新策略

摘要&#xff1a;随着科技的飞速发展&#xff0c;特别是人工智能&#xff08;AI&#xff09;技术的广泛应用&#xff0c;传统营销模式正经历着前所未有的变革。本文旨在探讨开源AI智能名片微信小程序如何凭借其独特的功能特性&#xff0c;结合人性洞察、需求解决、资源优化以及…

CLAMP-1

一、信息收集 1、主机发现 nmap 192.168.236.0/24 2、端口扫描 nmap 192.168.236.173 -p- -A 3、目录扫描 dirb http://192.168.236.173 二、漏洞探测 访问80端口 访问 /nt4stopc/ 下面有一些问题&#xff0c;提示必须收集答案 都是一些判断题&#xff0c;对与错对应1与0&…

SQL注入(原理、分类、union、POST注入)

目录 【学习目标、重难点知识】 【学习目标】 【重难点知识】 SQL注入简介 SQL注入原理 SQL注入类型 MySQL与SQL注入的相关知识 information_schema 数据库的结构 数据库查询语句 limit的用法 需要记住的几个函数 注释符号 SQL注入探测方法 SQL注入漏洞攻击流程…

gerrit的使用

配置SSH密钥 用记事本打开电脑里以下文件&#xff0c;复制内容 在gerrit代码库设置里找到菜单 SSH Keys&#xff0c;将以上复制的内容粘贴到New SSH Key处&#xff0c;点击ADD NEW SSH KEY即可。 克隆代码 git clone ssh://..... 下载commit-msg文件 复制代码下载地址里的…

java之校验QQ号是否正确以及如何用正则表达式进行优化

public class RegexDemo {public static void main(String[] args) {String qq"123456789";System.out.println(checkQQ(qq));}public static boolean checkQQ(String qq){//规则:6位到20位之内,0不能在开头,必须全部是数字//核心思想://先把异常数据过滤//下面的…

外部排序(败者树、置换-选择排序、最佳归并树)

外部排序可能会考查相关概念、方法和排序过程&#xff0c;外部排序的算法比较复杂&#xff0c;不会在算法设计上进行考查。 一、外部排序的基本概念与方法 外部排序指待排序文件较大&#xff0c;内存一次放不下&#xff0c;需存放在外存的文件的排序。 1. 基本概念 在许多应用…

python入门之命令提示符和文本创建.py文件

1.命令提示符 程序 快捷键&#xff1a;windowsR 在安装完python以后&#xff0c;可以直接在命令提示符程序上敲代码进行初步尝试。 python解释器 计算机是不认识python代码的&#xff0c;计算机只能识别0和1这个二进制的数&#xff0c;所以需要一个翻译官“python翻译器”。 …

【Linux系列】known_hosts详解

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

【Linux网络(一)】Socket编程

文章目录 1. 预备知识1.1 认识端口号1.2 初识TCP协议1.3 初识UDP协议1.4 网络字节序1.5 socket编程接口1.5.1 套接字编程的种类1.5.2 sockaddr结构体1.5.3 socket 常见API1.5.4 地址转换函数 2. 编写UDP服务器与客户端2.1 UDP服务器的创建2.2 UDP服务器接收/发送数据2.3 补充知…

动态规划——背包问题(01背包、完全背包,分组背包与二进制优化)

本蒟蒻写二进制优化开始的时候写昏了&#xff0c;并且昏了一下午。但好在有神犇救命&#xff0c;这篇博客才得以面世——躲着人群 一、01背包 概述&#xff1a; 其常见的问题形式为&#xff1a;给出n个物品&#xff0c;每个物品有对应的价值和体积。给出背包容量后求不超过背…