C++之深入解析如何实现一个线程池

news2024/11/24 1:55:19

一、基础概念

  • 当进行并行的任务作业操作时,线程的建立与销毁的开销是,阻碍性能进步的关键,因此线程池,由此产生。使用多个线程,无限制循环等待队列,进行计算和操作,帮助快速降低和减少性能损耗。
  • 线程池的组成:
    • 线程池管理器:初始化和创建线程,启动和停止线程,调配任务,管理线程池;
    • 工作线程:线程池中等待并执行分配的任务;
    • 任务接口:添加任务的接口,以提供工作线程调度任务的执行;
    • 任务队列:用于存放没有处理的任务,提供一种缓冲机制,同时具有调度功能,高优先级的任务放在队列前面。

二、线程池工作情况

① 没有任务要执行,缓冲队列为空

在这里插入图片描述

② 队列中任务数量,小于等于线程池中线程任务数量

在这里插入图片描述

③ 任务数量大于线程池数量,缓冲队列未满

在这里插入图片描述

④ 任务数量大于线程池数量,缓冲队列已满

在这里插入图片描述

三、线程池的 C++ 实现

  • 线程池的主要组成有如下三个部分:
    • 任务队列(Task Quene);
    • 线程池(Thread Pool);
    • 完成队列(Completed Tasks)。

在这里插入图片描述

① 队列

  • 可以使用队列来存储工作,因为它是更合理的数据结构,我们希望以与发送它相同的顺序启动工作。但是,这个队列有点特殊,线程是连续的查询队列要求工作,当有可用的工作时,线程从队列中获取工作并执行它。如果两个线程试图同时执行相同的工作会发生什么?不难知道,程序会崩溃,为了避免这种问题,在标准 C ++ Queue 上实现了一个包装器,它使用 mutex 来限制并发访问。
  • 来看一下 SafeQueue 类的一小部分示例:
void enqueue(T& t) {
    std::unique_lock<std::mutex> lock(m_mutex);
    m_queue.push(t);
}
  • 要排队做的第一件事就是锁定互斥锁以确保没有其它们人正在访问该资源,然后将元素推送到队列中,当锁超出范围时,它会自动释放,这样,使 Queue 线程安全,因此不必担心许多线程在相同的“时间”访问和/或修改它。

② 提交函数

  • 线程池最重要的方法是负责向队列添加工作的方法,不难理解它是如何工作的:
    • 接受任何参数的任何函数;
    • 立即返回“东西”以避免阻塞主线程,此返回的对象最终应包含操作的结果。
  • 完整的提交函数如下所示:
// Submit a function to be executed asynchronously by the pool template<typename F, typename...Args>

auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
    // Create a function with bounded parameters ready to execute

    std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
    // Encapsulate it into a shared ptr in order to be able to copy construct // assign 

    auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
    // Wrap packaged task into void function

    std::function<void()> wrapper_func = [task_ptr]() {
      (*task_ptr)(); 
    };

    // Enqueue generic wrapper function

    m_queue.enqueue(wrapperfunc);
    // Wake up one thread if its waiting

    m_conditional_lock.notify_one();
    // Return future from promise

    return task_ptr->get_future();
}
  • 队列完整源代码:
// SafeQueue.h

#pragma once

#include <mutex>

#include <queue>

// Thread safe implementation of a Queue using a std::queue

template <typename T>
class SafeQueue {
private:
  std::queue<T> m_queue; //利用模板函数构造队列

  std::mutex m_mutex;//访问互斥信号量

public:
  SafeQueue() { //空构造函数


  }

  SafeQueue(SafeQueue& other) {//拷贝构造函数

    //TODO:
  }

  ~SafeQueue() { //析构函数

  }


  bool empty() {  //队列是否为空

    std::unique_lock<std::mutex> lock(m_mutex); //互斥信号变量加锁,防止m_queue被改变

    return m_queue.empty();
  }

  int size() {
    std::unique_lock<std::mutex> lock(m_mutex); //互斥信号变量加锁,防止m_queue被改变

    return m_queue.size();
  }
//队列添加元素

  void enqueue(T& t) {
    std::unique_lock<std::mutex> lock(m_mutex);
    m_queue.push(t);
  }
//队列取出元素

  bool dequeue(T& t) {
    std::unique_lock<std::mutex> lock(m_mutex); //队列加锁

    if (m_queue.empty()) {
      return false;
    }
    t = std::move(m_queue.front()); //取出队首元素,返回队首元素值,并进行右值引用

    m_queue.pop(); //弹出入队的第一个元素

    return true;
  }
};
  • 线程池完整代码:
// ThreadPool.h

#pragma once
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <utility>
#include <vector>
#include "SafeQueue.h"

class ThreadPool {
private:
  class ThreadWorker {//内置线程工作类

  private:
    int m_id; //工作id

    ThreadPool * m_pool;//所属线程池

  public:
    //构造函数

    ThreadWorker(ThreadPool * pool, const int id) 
      : m_pool(pool), m_id(id) {
    }
    //重载`()`操作

    void operator()() {
      std::function<void()> func; //定义基础函数类func

      bool dequeued; //是否正在取出队列中元素

      //判断线程池是否关闭,没有关闭,循环提取

      while (!m_pool->m_shutdown) {
        {
          //为线程环境锁加锁,互访问工作线程的休眠和唤醒

          std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);
          //如果任务队列为空,阻塞当前线程

          if (m_pool->m_queue.empty()) {
            m_pool->m_conditional_lock.wait(lock); //等待条件变量通知,开启线程

          }
          //取出任务队列中的元素

          dequeued = m_pool->m_queue.dequeue(func);
        }
        //如果成功取出,执行工作函数

        if (dequeued) {
          func();
        }
      }
    }
  };

  bool m_shutdown; //线程池是否关闭

  SafeQueue<std::function<void()>> m_queue;//执行函数安全队列,即任务队列

  std::vector<std::thread> m_threads; //工作线程队列

  std::mutex m_conditional_mutex;//线程休眠锁互斥变量

  std::condition_variable m_conditional_lock; //线程环境锁,让线程可以处于休眠或者唤醒状态

public:
    //线程池构造函数

  ThreadPool(const int n_threads)
    : m_threads(std::vector<std::thread>(n_threads)), m_shutdown(false) {
  }

  ThreadPool(const ThreadPool &) = delete; //拷贝构造函数,并且取消默认父类构造函数

  ThreadPool(ThreadPool &&) = delete; // 拷贝构造函数,允许右值引用

  ThreadPool & operator=(const ThreadPool &) = delete; // 赋值操作

  ThreadPool & operator=(ThreadPool &&) = delete; //赋值操作

  // Inits thread pool

  void init() {
    for (int i = 0; i < m_threads.size(); ++i) {
      m_threads[i] = std::thread(ThreadWorker(this, i));//分配工作线程

    }
  }

  // Waits until threads finish their current task and shutdowns the pool

  void shutdown() {
    m_shutdown = true;
    m_conditional_lock.notify_all(); //通知 唤醒所有工作线程

    for (int i = 0; i < m_threads.size(); ++i) {
      if(m_threads[i].joinable()) { //判断线程是否正在等待

        m_threads[i].join();  //将线程加入等待队列

      }
    }
  }

  // Submit a function to be executed asynchronously by the pool
  //线程的主要工作函数,使用了后置返回类型,自动判断函数返回值

  template<typename F, typename...Args>
  auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
    // Create a function with bounded parameters ready to execute
    // 

    std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);//连接函数和参数定义,特殊函数类型,避免左、右值错误

    // Encapsulate it into a shared ptr in order to be able to copy construct // assign 
    //封装获取任务对象,方便另外一个线程查看结果

    auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);

    // Wrap packaged task into void function
    //利用正则表达式,返回一个函数对象

    std::function<void()> wrapper_func = [task_ptr]() {
      (*task_ptr)(); 
    };

    // 队列通用安全封包函数,并压入安全队列

    m_queue.enqueue(wrapper_func);

    // 唤醒一个等待中的线程

    m_conditional_lock.notify_one();

    // 返回先前注册的任务指针

    return task_ptr->get_future();
  }
};
  • 使用样例代码:
#include <iostream>

#include <random>

#include "../include/ThreadPool.h"

std::random_device rd; //真实随机数产生器

std::mt19937 mt(rd()); //生成计算随机数mt;

std::uniform_int_distribution<int> dist(-1000, 1000);//生成-1000到1000之间的离散均匀分部数

auto rnd = std::bind(dist, mt);

//设置线程睡眠时间

void simulate_hard_computation() {
  std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd()));
}

// 添加两个数字的简单函数并打印结果

void multiply(const int a, const int b) {
  simulate_hard_computation();
  const int res = a * b;
  std::cout << a << " * " << b << " = " << res << std::endl;
}

//添加并输出结果

void multiply_output(int & out, const int a, const int b) {
  simulate_hard_computation();
  out = a * b;
  std::cout << a << " * " << b << " = " << out << std::endl;
}

// 结果返回

int multiply_return(const int a, const int b) {
  simulate_hard_computation();
  const int res = a * b;
  std::cout << a << " * " << b << " = " << res << std::endl;
  return res;
}


void example() {
  // 创建3个线程的线程池

  ThreadPool pool(3);

  // 初始化线程池

  pool.init();

  // 提交乘法操作,总共30个

  for (int i = 1; i < 3; ++i) {
    for (int j = 1; j < 10; ++j) {
      pool.submit(multiply, i, j);
    }
  }

  // 使用ref传递的输出参数提交函数

  int output_ref;
  auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6);

  // 等待乘法输出完成

  future1.get();
  std::cout << "Last operation result is equals to " << output_ref << std::endl;

  // 使用return参数提交函数

  auto future2 = pool.submit(multiply_return, 5, 3);

  // 等待乘法输出完成

  int res = future2.get();
  std::cout << "Last operation result is equals to " << res << std::endl;

  //关闭线程池
  pool.shutdown();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/479500.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux-初学者系列4_rpm-yum软件包管理

Linux-初学者系列4_rpm-yum软件包管理 一、软件包管理 系统软件安装后默认目录路径&#xff1a; /user/local /opt这两个目录用来存放用户自编译安装软件的目录&#xff0c;对于通过源码包安装的软件&#xff0c;如果没有指定安装目录&#xff0c;一般会装在以上目录中。 使…

利用Python轻松实现视频合成!

&#x1f3a5; 利用Python轻松实现视频合成&#xff01;&#x1f4bb; 你是否曾经尝试过在一个视频中添加另一个小视频的场景呢&#xff1f;如果是&#xff0c;你一定会知道这是一项令人头疼的任务。但是&#xff0c;有了Python的 moviepy 库&#xff0c;这个任务将变得非常简单…

Java AIO(Java Asynchronous I/O:异步非阻塞IO)

1.基本介绍 1>.JDK7引入了Asynchronous I/O,即AIO.在进行I/O编程中,常用到两种模式:Reactor和Proactor; 2>.Java的NIO就是Reactor,当有事件触发时,服务器端得到通知,进行相应的处理; 3>.AIO即NIO2.0,叫做异步不阻塞的IO.AIO引入了异步通道的概念,采用了Proactor模式…

Java之类和对象

一、类和对象 C和Java都是面向对象编程的语言,而C和Go是面向过程编程的语言. 主要概述一下面向对象编程,也就是op.在面向对象的世界中,一切皆对象.解决问题的途径主要是靠对象之间的交互去完成. 类的模板 类就是对一种事物的概述,比如说猫类,狗类,人类等等,在这些类中,有成…

为什么WEB端通常采用3DTILES 格式进行发布?

为什么WEB端通常采用3DTILES 格式进行发布? 随着互联网技术的发展和普及&#xff0c;Web端三维数据可视化和呈现越来越受到关注和重视。在这个背景下&#xff0c;采用合适的标准格式进行数据发布和交换变得尤为重要。3DTILES是一种新型的三维数据标准格式&#xff0c;具有多种…

数据降维 | MATLAB实现基于LFDA基于局部费歇尔判别的分类数据降维可视化

数据降维 | MATLAB实现基于LFDA基于局部费歇尔判别的分类数据降维可视化 目录 数据降维 | MATLAB实现基于LFDA基于局部费歇尔判别的分类数据降维可视化基本介绍模型描述程序设计学习小结基本介绍 MATLAB实现基于LFDA基于局部费歇尔判别的分类数据降维可视化 模型描述 局部费歇尔…

Linux之【多线程】线程互斥(锁)线程同步(条件变量)

Linux之【多线程】线程互斥&#xff08;锁&#xff09;&线程同步&#xff08;条件变量&#xff09; 一、引入&#xff1a;线程安全问题二、浅谈""和"- -"非原子性操作三、Linux线程互斥3.1 互斥量-->mutex⚠️3.1.1 互斥锁的理解3.1.2 深入了解锁的…

倾斜摄影超大场景的三维模型的顶层合并的优势浅析

倾斜摄影超大场景的三维模型的顶层合并的优势浅析 倾斜摄影超大场景的三维模型的顶层合并具有以下优势&#xff1a; 1、三维可视化效果好&#xff1a;通过倾斜摄影技术可以获得高分辨率的地面影像&#xff0c;将其与三维建模相结合可以生成非常逼真的三维场景。这种高度可视化…

SpringBoot事务管理-5个面试核心类源码刨析

“简单的事重复做&#xff0c;你就是专家&#xff1b;重复的事用心做&#xff0c;你就是赢家。” 在开始讲解SpringBoot事务之前&#xff0c;我们先来整体回顾下事务的概念及特性&#xff0c;便于我们了解SpringBoot是如何解决事务相关问题的&#xff0c;另外这部分也是面试必…

Windows安装配置Tomcat服务器教程 - 外网远程访问

文章目录 前言1.本地Tomcat网页搭建1.1 Tomcat安装1.2 配置环境变量1.3 环境配置1.4 Tomcat运行测试1.5 Cpolar安装和注册 2.本地网页发布2.1.Cpolar云端设置2.2 Cpolar本地设置 3.公网访问测试4.结语 转载自cpolar文章&#xff1a;外网访问本地Tomcat服务器【cpolar内网穿透】…

Cadence (1) 手动制作SMD封装

前提&#xff1a;软件版本 焊盘设计 &#xff1a;Pad Designer16.6PCB设计 &#xff1a;PCB Editor16.6PCB参考&#xff1a;LP Viewer 10.2 文章目录 SMD封装制作(R0603)封装信息SMD焊盘制作新建工程添加焊盘库路径到PCB EditorPCB Editor设计预处理放置焊盘放置丝印放置1脚标识…

【10.HTML入门知识-CSS元素定位】

1 标准流&#xff08;Normal Flow&#xff09; 默认情况下&#xff0c;元素都是按照normal flow&#xff08;标准流、常规流、正常流、文档流【document flow】&#xff09;进行排布  从左到右、从上到下按顺序摆放好  默认情况下&#xff0c;互相之间不存在层叠现象 1.1…

【13.HTML-动画】

1 CSS属性 - transform 1.1 位移 - translate translate的百分比可以完成一个元素的水平和垂直居中&#xff1a; 1.2 缩放 - scale 1.3 旋转 - rotate 1.4 transform-origin 形变的原点 1.5 倾斜 - skew 1.6 transform设置多个值 2 transition动画 2.1 认识transition动画 2…

Java反射(原理剖析与使用)

一、反射机制是什么 1、Java反射机制的核心是在程序运行时动态加载类并获取类的详细信息&#xff0c;从而操作类或对象的属性和方法。本质是JVM得到class对象之后&#xff0c;再通过class对象进行反编译&#xff0c;从而获取对象的各种信息。 2、Java属于先编译再运行的语言&a…

2023年第二十届五一数学建模竞赛C题:“双碳”目标下低碳建筑研究-思路详解与代码答案

该题对于模型的考察难度较低&#xff0c;难度在于数据的搜集以及选取与处理。 这里推荐数据查询的网站&#xff1a;中国碳核算数据库&#xff08;CEADs&#xff09; https://www.ceads.net.cn/ 国家数据 国家数据​data.stats.gov.cn/easyquery.htm?cnC01 以及各省市《统…

第四届“长城杯”信息安全铁人三项赛决赛RE-obfuscating

这里主要是加了混淆 这里要用到IDA的一个插件D810和去混淆脚本deflat.py。值得注意的是deflat.py无法在主逻辑去混淆&#xff0c;这里可以参考这篇文章的脚本利用angr符号执行去除控制流平坦化 - 0x401RevTrain-Tools (bluesadi.github.io)。在使用deflat.py和这文章中的脚本轮…

【AI折腾录】stable web ui基础【sd安装、lora vae embedding hyperwork等基础概念】

目录 一 sd安装二 目标三 sd基础3.1 模型3.2 vae&#xff08;Variational autoencoder&#xff0c;变分自编码器&#xff09;3.3 embedding3.3.1 安装方式3.3.2 使用方式 3.4 Lora3.4.1 lora组成3.4.2 使用&#xff1a;3.4.3 效果3.4.4 测试不同CFG效果 3.5 hypernetworks 超网…

LeetCode_BFS_DFS_中等_1376.通知所有员工所需的时间

目录 1.题目2.思路3.代码实现&#xff08;Java&#xff09; 1.题目 公司里有 n 名员工&#xff0c;每个员工的 ID 都是独一无二的&#xff0c;编号从 0 到 n - 1。公司的总负责人通过 headID 进行标识。 在 manager 数组中&#xff0c;每个员工都有一个直属负责人&#xff0c…

UE5实现距离测量功能

文章目录 1.实现目标2.实现过程2.1 Widget2.2 蓝图实现3.参考资料1.实现目标 UE5在Runtime环境下测量两个空间点位之间的绝对距离,并支持多段线的距离测量,GIF动图如下所示: 2.实现过程 实现原理比较简单,首先是基于PDI绘制线,有关绘制点和绘制线的可以看本专栏之前的文章…

css弹性布局

目录 1、实现弹性布局的前提&#xff1a;给父元素设置display:flex; 2、flex-direction&#xff1a;确定主轴方向 3、flex-wrap&#xff1a;是否换行 4、justify-content&#xff1a;主轴对齐方式 5、align-items&#xff1a;交叉轴对齐方式 6、align-content&#xff1a…