C++写一个线程池

news2024/11/13 16:21:59

C++写一个线程池

文章目录

  • C++写一个线程池
    • 设计思路
    • 测试数据的实现
    • 任务类的实现
    • 线程池类的实现
      • 线程池构造函数
      • 线程池入口函数
      • 队列中取任务
      • 添加任务函数
      • 线程池终止函数
    • 源码

之前用C语言写了一个线程池,详情请见:
C语言写一个线程池

这次换成C++了!由于C++支持泛型编程,所以代码的灵活性提高了不知道多少倍!!!!!

设计思路

线程池的原理就不解释了,这次主要来讲一下我使用C++进行面向过程、面向对象、泛型设计时的思想。

线程池的工作原理是存在一个具有多个线程的空间,我们对这些线程进行一个统一的管理(利用C++提供的线程类)。然后具有一个存放任务的队列,这些线程依次从中取出任务然后执行。

从上面的过程中发现可以将线程池作为一个对象来进行设计。这个对象中的成员至少有:

  • 存放n个线程对象的空间,可以使用 vector<std::thread> 进行管理。
  • 标记每一个线程的工作状态的容器,这里可以使用 unordered_map<<std::thread::id, bool>来进行管理。
  • 存放任务的队列
  • 等等…(在设计的过程中会发现)

我希望设计一个很万能的线程池,即可以接受任何任务,我只需要将对应的函数和参数传入进队列中,就可以让线程自动执行。

因此,设计一个任务类是必不可少的。因为,我们从函数外部传进去的函数和参数不一定相同,而且不同功能的任务之间没有一个合适的管理方式,因此我们需要设计一个任务类来兼容不同参数,并且将参数和工作函数绑定到一块的情况。

测试数据的实现

我个人比较喜欢在设计代码之前假设他已经设计好,然后先写出他的测试方法和数据,之后一点点来实现,这次我选择的测试方法是求1~50000000中的素数个数,不使用素数筛:

先实现判断素数的功能函数:

// 判断素数n是不是素数
bool is_prime(int n) {
		for (int i = 2; i * i < n; ++i) {
				if (n % i == 0) return false;
		}
		return true;
}

// 求出 start 到 end 返回的素数个数
int prime_count(int start, int end) {
		int ans = 0;
		for (int i = start; i <= end; ++i) {
				ans += is_prime(i);
		}
		return ans;
}

// 需要传入到线程池内的函数,求出 l 到 r 的素数个数然后保存在 ret 中
void worker(int l, int r, int &ret) {
		cout << this_thread::get_id() << ": begin" << endl;
		ret = prime_count(l, r);
		cout << this_thread::get_id() << ": end" << endl;
		return ;
}

下面是主函数:

int main() {
		#define MAX_N 5000000  // 这里假设需要处理10次,因此每次处理五百万的数据
		thread_pool tp(5);     // 假设传入的参数是线程池中的线程个数
		int ret[10];           // 存放10次结果
		for (int i = 0. j = 1; i < 10; ++i, j += MAX_N) {
				tp.add_task(worker, j, j + MAX_N - 1, ref(ret[i]));  // ref是用来传入引用的
		}
		tp.stop();              // 停止线程池的运转
		int ans = 0;            // 计算出结果
		for (int i = 0; i < 10; ++i) {
				ans += ret[i];
		}
		cout << "ans = " << ans << endl;
		return 0;
}

任务类的实现

明确一下目的:
实现一个类,第一个参数是一个函数地址,后面为变参列表,该类会将函数与参数打包成一个新的函数,作为任务队列中的一个元素,当空闲线程将其取出之后,可以执行这个新的函数。

这个需要用到模板:

template <typename FUNC_T, typename ...ARGS>
class Task {
		Task(FUNC_T func, ARGS... args) {
				...
		}
private:
		...
};

绑定之后我们需要一个变量来存放这个函数,因此需要添加一个函数指针对象 function<void()>
使用 bind 函数进行绑定。
在给bind函数传入参数列表时,需要维持左右值原型,因此需要工具类 std::forward<ARGS>(args)... 来解析参数类型。

template <typename FUNC_T, typename ...ARGS>
class Task {
		Task(FUNC_T func, ARGS... args) {
				this->_func = bind(functionn, std::forward<ARGS>(args)...);
		}
private:
		std::function<void()> _func;
};

最后需要一个方法来运行这个函数:
别忘了析构函数。

template <typename FUNC_T, typename ...ARGS>
class Task {
		Task(FUNC_T func, ARGS... args) {
				this->_func = bind(functionn, std::forward<ARGS>(args)...);
		}
		~Task() {
				delete _func;
		}
		void run() {
				_func();
				return ;
		}
private:
		std::function<void()> _func;
};

线程池类的实现

根据一开始的测试数据,发现线程池的操作对外就支持两个操作:

  • 压入任务
  • 停止

根据一开始所分析的:

  • 存放n个线程对象的空间,可以使用 vector<std::thread> 进行管理。
  • 标记每一个线程的工作状态的容器,这里可以使用 unordered_map<std::thread::id, bool>来进行管理。
  • 存放任务的队列
  • 等等…(在设计的过程中会发现)

我们可以先将成员和已知的方法写上:

template <typename FUNC_T, typename ...ARGS>
class thread_pool {
public:
		thread_pool() {}
		template <typename FUNC_T, typename ...ARGS>
		void add_task(FUNC_T func, ARGS... args) {...}
		void stop() {...}
		
private:
		std::vector<std::thread *> _threads;
		unordered_map<std::thread::id, bool> _running;
		std::queue<Task *> _tasks;
};

线程池构造函数

先来尝试完善一下构造函数,我们使用参数来控制线程池中的线程个数,默认线程数量我们可以设置成为1

创建出的线程空间放在堆区,因此使用 new 关键字来创建:

thread_pool(int n = 1) {
		for(int i = 0; i < n; ++i) {
				_threads[i] = new thread(&thread_pool::worker, this); // 别忘了内部方法的第一个隐藏参数this传入进去
		}
}

线程池入口函数

这个时候我们发现,由于thread的构造函数需要传入一个需要运行的函数,因此发现又多了一个函数就是工作函数 worker

简单来说,这个函数的功能规定了所有线程的行为——从队列中取出任务并执行。

在工作函数内部,我们需要将该线程 id 记录下来(表示他是否存活),然后不断判断这个线程是否存活,如果存活就继续等待队列中的任务

这个工作函数的作用就是不断检查队列中是否有可以取出的任务,然后执行。

void worker() {
		auto id = this_thread::get_id();
		_running[id] = true; // 表示这个线程被记录下来,在运行状态
		while (_running[id]) {
				Task *t = get_task();   // 从队列中取任务,这里又诞生出一个新函数
				t->run();               // 运行任务
				delete t;
		}
}

队列中取任务

可以发现又有了新的函数需求就是从队列中取出一个任务。

这个函数并不对外表现,所以应该设置为私有成员方法

主要逻辑就是检查队列头部是否有任务对象,如果有的话就返回这个任务的地址。

由于队列是临界资源,所以需要上锁,此时不免又多了两个成员变量

std::mutex m_mutex;
std::condition_variable m_cond;

Task *get_task() {
		unique_task<mutex> (m_mutex); // 上锁
		while (_tasks.empty()) {      // 如果队列为空则释放锁并等待队列被放入任务的条件
				m_cond.wait(lock)
		}
		Task *t = _tasks.front();
		_tasks.pop();
		return t;
}

这样一来,我们就实现了线程的初始化以及任务的取出

接下来,还剩下任务的压入,这个操作由 add_task() 实现,因此我们先来实现这个函数

添加任务函数

由于他也是访问临界资源,因此,也需要上锁,同时在添加成功之后释放一个信号。

同样的,这个函数需要在外部调用,因此设置成为共有成员方法:

template <typename FUNC_T, typename ...ARGS>
void add_task(FUNC_T func, ARGS... args) {
		_tasks.push(new Task(func, std:forward<ARGS>(args)...));
		lock.unlock();
		m_cond.notify_one();
		return ;
}

线程池终止函数

线程不再运行之后可以选择终止他们来节省计算机资源,因此这个函数是必不可少的,主要的操作方式如下,我们想队列中压入等于同于线程数量的特殊任务这个特殊任务会把线程标记为非活动的,然后后等到他们全部执行完任务后,再依次释放掉他们的资源。

void stop() {
		for (int i = 0; i < _thread.size(); ++i) {
				this->add_task(&thread_pool::stop_running, this);   // 毒药方法
		}
		for(int i = 0; i < _thread_size(); ++i) {
					_threads[i]->join();
		}
		for(int i = 0; i < _thread.size(); ++i) {
				delete _threads[i];
				_threads[i] = nullptr;
		}
		return ;
}

其中涉及到了 stop_running() 这个毒药方法,这个方法只在函数内部调用,因此我们把他设计成为私有成员方法。

这个函数的逻辑就是将当前线程标记为非活动的状态。

void stop_running() {
		auto id = this_thread::get_id();
		_running[id] = false;
		return ;
}

到此为止,线程池的大部分功能就设计的差不多了,之后我又进行了一下细微的调整,相信读者应该自己也能读懂,这里就不过多解释了。

源码

#include <condition_variable>
#include <iostream>
#include <algorithm>
#include <vector>
#include <queue>
#include <stack>
#include <set>
#include <map>
#include <cstdio>
#include <cstdlib>
#include <ctype.h>
#include <cmath>
#include <string>
#include <sstream>
#include <functional>
#include <thread>
#include <mutex>
#include <condition_variable>

#define TEST_BEGINS(x) namespace x {
#define TEST_ENDS(x) } // end of namespace x
using namespace std;

TEST_BEGINS(thread_pool)

bool is_prime(int n) {
    for (int i = 2; i * i <= n; ++i) {
        if (n % i == 0) return false;
    }
    return true;
}

int prime_count(int start, int end) {
    int ans = 0;
    for (int i = start; i <= end; ++i) {
        ans += is_prime(i);
    }
    return ans;
}

// 多线程入口函数
void worker(int l, int r, int &ret) {
    cout << this_thread::get_id() << " : begin" << endl;
    ret = prime_count(l, r);
    cout << this_thread::get_id() << " : done" << endl;
    return ;
}

class Task {
public:
    template <typename FUNC_T, typename ...ARGS>
    Task(FUNC_T function, ARGS ...args) {
        this->func = bind(function, std::forward<ARGS>(args)...);
    }
    void run() {
        func();
        return ;
    }
private:
    function<void()> func;
};

class thread_pool {
public:
    thread_pool(int n = 1) : _thread_size(n), _threads(n), starting(false) {
        this->start();
    }

    ~thread_pool() {
        this->stop();
        while (!_tasks.empty()) {
            delete _tasks.front();
            _tasks.pop();
        }
        return ;
    }

    /*
    * 入口函数:不断从队列中取任务,然后运行,最后释放资源。
    */
    void worker() {
        auto id = this_thread::get_id();
        _running[id] = true;
        while (_running[id]) {
            Task *t = get_task();
            t->run();
            delete t;
        }
        return ;
    }

    void start() {
        if (starting == true) return ;
        for (int i = 0; i < _thread_size; ++i) {
            _threads[i] = new thread(&thread_pool::worker, this);
        }
        starting = true;          // 标记线程池运行
		return ;
    }

    void stop() {
        if (starting == false) return ;
        for (int i = 0; i < _threads.size(); ++i) {
            this->add_task(&thread_pool::stop_running, this);
        }
        for (int i = 0; i < _threads.size(); ++i) {
            _threads[i]->join();
        }
        for (int i = 0; i < _threads.size(); ++i) {
            delete _threads[i];
            _threads[i] = nullptr;
        }
        starting = false;
        return ;
    }

    template <typename FUNC_T, typename ...ARGS>
    void add_task(FUNC_T func, ARGS... args) {
        unique_lock<mutex> lock(m_mutex);
        _tasks.push(new Task(func, std::forward<ARGS>(args)...));
        lock.unlock();
        m_cond.notify_one();
        return ;
    }

private:
    Task *get_task() {
        unique_lock<mutex> lock(m_mutex);
        while (_tasks.empty()) {          // 避免虚假唤醒
            m_cond.wait(lock);
        }
        Task *t = _tasks.front();
        _tasks.pop();
        return t;
    }
    void stop_running() {
        auto id = this_thread::get_id();
        _running[id] = false;                  // 毒药方法
        return ;
    }

    bool starting;
    int _thread_size;  													// 记录线程个数
    std::mutex m_mutex;													// 互斥锁
    std::condition_variable m_cond; 									// 条件变量
    vector<thread *> _threads;  										// 线程区
    unordered_map<std::thread::id, bool> _running;   					// 线程活动标记  
    queue<Task *> _tasks; 												// 任务队列
};

int main() {
    #define MAX_N 5000000
    thread_pool tp(10);
    int ret[10];
    for (int i = 0, j = 1; i < 10; ++i, j += batch) {
        tp.add_task(worker, j, j + MAX_N - 1, ref(ret[i]));
    }
    tp.stop();
    int ans = 0;
    for (auto x : ret) ans += x;
    cout << ans << endl;
    return 0;
}

TEST_ENDS(thread_pool)

int main() {
    thread_pool::main();
    return 0;
}

运行结果:
1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1935725.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C#知识|账号管理系统-账号信息管理界面[1]:账号分类选择框、Panel面板设置

哈喽,你好啊,我是雷工! 前一节实现了多条件查询后端代码的编写, 接下来继续学习账号信息管理界面的功能编写,本节主要记录账号分类选择框和Panel的设置, 以下为学习笔记。 01 功能说明 本节实现以下功能: ①:账号分类选择框只能选择,无法自由输入; ②:账号分类框默认…

大语言模型与扩散模型的“爱恨情仇”:Kolors和Auraflow的技术解析

近年来&#xff0c;随着深度学习技术的发展&#xff0c;生成模型在多个领域取得了显著进展。特别是大语言模型&#xff08;LLM&#xff09;和扩散模型&#xff08;Diffusion Model&#xff09;这两类模型&#xff0c;在自然语言处理&#xff08;NLP&#xff09;和图像生成任务中…

找国内API,用哪家API平台?

随着人工智能技术的飞速发展&#xff0c;AI已经成为推动各行各业创新和转型的重要力量。在中国&#xff0c;API平台的发展尤为迅速&#xff0c;涌现出许多优秀的API服务提供商。这些平台不仅提供了丰富的API资源&#xff0c;还通过创新的技术和服务&#xff0c;帮助开发者和企业…

VXLAN到底强在哪?网络虚拟化的彻底突破?

号主&#xff1a;老杨丨11年资深网络工程师&#xff0c;更多网工提升干货&#xff0c;请关注公众号&#xff1a;网络工程师俱乐部 你们好&#xff0c;我的网工朋友。 网络虚拟化作为一项关键技术&#xff0c;不仅提高了资源的利用效率&#xff0c;还增强了业务的敏捷性。然而&a…

gemini-pro-vision 看图说话

一、安装 pip install -U langchain-google-vertexai 二、设置访问权限 申请服务账号json格式key 三、完整代码 import gradio as gr import json import base64 from pathlib import Path import os import time import requests from fastapi import FastAPI, UploadFile,…

使用崖山YMP 迁移 Oracle/MySQL 至YashanDB 23.2 验证测试

前言 首届YashanDB「迁移体验官」开放后&#xff0c;陆续收到「体验官」们的投稿&#xff0c;小崖在此把优秀的投稿文章分享给大家~今天分享的用户文章是《使用崖山YMP 迁移 Oracle/MySQL 至YashanDB 23.2 验证测试》&#xff08;作者&#xff1a;尚雷&#xff09;&#xff0c…

独立游戏《星尘异变》UE5 C++程序开发日志5——实现物流系统

目录 一、进出口清单 二、路径计算 三、包裹 1.包裹的数据结构 2.包裹在场景中的运动 四、道路 1.道路的数据结构 2.道路的建造 3.道路的销毁 4.某个有道路连接的建筑被删除 作为一个工厂类模拟经营游戏&#xff0c;各个工厂之间的运输必不可少&#xff0c;本游戏采用的…

Java语言程序设计基础篇_编程练习题15.7(使用鼠标改变颜色)

15.7(使用鼠标改变颜色) 编写一个程序&#xff0c;显示一个圆的颜色&#xff0c;当按下鼠标键时颜色为黑色&#xff0c;释放鼠标时颜色为白色 代码展示&#xff1a;编程练习题15_7CircleColor.java package chapter_15;import javafx.application.Application; import javafx.…

STM32之八:IIC通信协议

目录 1. IIC协议简介 1.1 主从模式 1.2 2根通信线 2. IIC协议时序 2.1 起始条件和终止条件 2.2 发送一个字节 2.3 接收一个字节 2.4 应答信号 1. IIC协议简介 IIC协议是一个半双工、同步、一主多从、多主多从的串行通用数据总线。该通信模式需要2根线&#xff1a;SCL、…

数据监控电商平台价格心得分享

一、引言 在当今竞争激烈的电商环境中&#xff0c;价格是影响消费者购买决策的重要因素之一。对于电商从业者和商家来说&#xff0c;有效地监控电商平台的价格变动至关重要。通过数据监控&#xff0c;我们可以及时了解市场动态、调整策略&#xff0c;以保持竞争力并实现利润最大…

泰迪科技2024年高校(本科/职业院校)大数据实验室建设及大数据实训平台整体解决方案

高校大数据应用人才培养目标 大数据专业是面向信息技术行业&#xff0c;培养德智体美劳全面发展的大数据领域的高素质管理型专门人才&#xff0c;毕业生具备扎实的管理学、经济学、自然科学、技术应用、人文社科的基本理论, 系统深入的大数据管理专业知识和实践能力&#xff0c…

04 Git与远程仓库

第4章&#xff1a;Git与远程仓库 一、Gitee介绍及创建仓库 一&#xff09;获取远程仓库 ​ 使用在线的代码托管平台&#xff0c;如Gitee&#xff08;码云&#xff09;、GitHub等 ​ 自行搭建Git代码托管平台&#xff0c;如GitLab 二&#xff09;Gitee创建仓库 ​ gitee官…

四种垃圾收集算法详解(JVM)

一、标记清除 1、原理 从根集合节点进行扫描&#xff0c;标记出所有的存活对象&#xff0c;最后扫描整个内存空间并清除没有标记的对象&#xff08;即死亡对象) 标记后 &#xff08;黑色&#xff1a;可回收 | 灰色&#xff1a;存活对象 | 白色&#xff1a;未使用 &#xff0…

HarmonyOS鸿蒙- 跳转系统应用能力

一、通过弹窗点击设置跳转系统应用能力 1、 自定义弹窗效果图 2、 自定义弹窗代码 import { common, Want } from kit.AbilityKit; import { BusinessError } from kit.BasicServicesKit;export function alertDialog() {AlertDialog.show({title: ,message: 当前功能依赖定位…

算法力扣刷题记录 五十一【654.最大二叉树】

前言 二叉树篇&#xff0c;继续。 记录 五十一【654.最大二叉树】 一、题目阅读 给定一个不重复的整数数组 nums 。 最大二叉树 可以用下面的算法从 nums 递归地构建: 创建一个根节点&#xff0c;其值为 nums 中的最大值。递归地在最大值 左边 的 子数组前缀上 构建左子树。…

【Linux】安装PHP扩展-Swoole

说明 本文档是在centos7.6的环境下&#xff0c;安装PHP7.4之后&#xff0c;安装对应的PHP扩展Swoole。 一、swoole简述 Swoole 是一个为 PHP 设计的高性能的异步并行网络通信引擎&#xff0c;它以扩展&#xff08;extension&#xff09;的形式存在&#xff0c;极大地提升了 …

Linux--YUM仓库部署及NFS共享存储

目录 一、YUM仓库服务 1.1 YUM介绍 1.2 yum 常用的命令 1.3 YUM 源的提供方式 1.3.1 配置本地 yum 源仓库 1.3.2 配置 ftp 源 1.3.3 配置http服务源 二、NFS 共享存储 2.1 NFS基本概述 2.2 为什么使用 NFS 共享存储 2.3 NFS 应用场景 2.4 NFS 实现原理 2.5 NFS文件…

【python学习】爬虫中常使用的urllib和requests库的的背景、定义、特点、功能、代码示例以及两者的区别

引言 urllib是Python标准库中的一个模块&#xff0c;它提供了一系列用于操作URL的功能 requests是一个Python第三方库&#xff0c;由Kenneth Reitz创建&#xff0c;用于简化HTTP客户端的编程 一、urllib的定义 urllib可以操作url&#xff0c;主要分为以下几个子模块&#xff1…

Nginx详解(超级详细)

目录 Nginx简介 1. 为什么使用Nginx 2. 安装Nginx Nginx的核心功能 1. Nginx反向代理功能 2. Nginx的负载均衡 3 Nginx动静分离 Nginx简介 Nginx是一款轻量级的Web 服务器/反向代理服务器及电子邮件&#xff08;IMAP/POP3&#xff09;代理服务器&#xff0c;在BSD-like 协…

深入Redis集群部署:从安装配置到测试验证的完整指南

&#x1f3e1;作者主页&#xff1a;点击&#xff01; &#x1f427;Linux基础知识(初学)&#xff1a;点击&#xff01; &#x1f427;Linux高级管理防护和群集专栏&#xff1a;点击&#xff01; &#x1f510;Linux中firewalld防火墙&#xff1a;点击&#xff01; ⏰️创作…