C++ 线程池实现

news2025/1/4 20:04:56

思路

创建多个工作线程同时维护一个公共的任务队列, 任务队列非空时通过信号量唤醒阻塞等待的工作线程, 工作线程通过互斥锁互斥的从任务队列中取出任务, 然后执行任务

实现

信号量类

class sem {//封装信号量类
public:
    sem(int num = 0) {
        if (sem_init(&m_sem, 0, num) != 0) {
            throw std::exception();
        }
    }

    ~sem() {
        sem_destroy(&m_sem);
    }

    bool wait() {
        return sem_wait(&m_sem) == 0;
    }

    bool post() {
        return sem_post(&m_sem) == 0;
    }

private:
    sem_t m_sem;
};

互斥锁类

class locker {//封装互斥锁类
public:
    locker() {
        if (pthread_mutex_init(&m_mutex, NULL) != 0) {
            throw std::exception();
        }
    }

    ~locker() {
        pthread_mutex_destroy(&m_mutex);
    }

    bool lock() {
        return pthread_mutex_lock(&m_mutex) == 0;
    }

    bool unlock() {
        return pthread_mutex_unlock(&m_mutex) == 0;
    }

    pthread_mutex_t *get() {
        return &m_mutex;
    }

private:
    pthread_mutex_t m_mutex;
};

线程池类

template<class T>
class threadpool {//线程池类
public:
    threadpool(int thread_number, int max_request = 1000);

    ~threadpool();

    bool append(T *request);//向请求队列种添加请求任务

private:
    static void *worker(void *arg);//工作线程运行的函数: 不断从工作队列中取出任务并执行

    void run();

private:
    int m_thread_number;//线程池中的线程数
    int m_max_requests;//请求队列中允许的最大请求数
    pthread_t *m_threads;//线程ID数组,其大小为m_thread_number
    std::list<T *> m_workqueue;//存放请求任务的队列
    locker m_queuelocker;//保护请求队列的互斥锁
    sem m_queuestat;//是否有任务需要处理
    bool m_stop;//是否结束线程
};


template<class T>
threadpool<T>::threadpool(int thread_number, int max_request):m_thread_number(thread_number), m_max_requests(max_request), m_stop(false) {
    if (thread_number <= 0 || max_request <= 0)
        throw std::exception();
    m_threads = new pthread_t[m_thread_number];
    if (!m_threads)
        throw std::exception();
    for (int i = 0; i < thread_number; i++) {
        if (pthread_create(m_threads + i, nullptr, worker, this) != 0 || pthread_detach(m_threads[i]) != 0) {//创建子线程并将其从主线程种分离
            delete[]m_threads;
            throw std::exception();
        }
    }
}

template<class T>
threadpool<T>::~threadpool() {
    delete[] m_threads;
    m_stop = true;// 修改标记, 使工作线程退出
}

template<class T>
bool threadpool<T>::append(T *request) {
    m_queuelocker.lock();//互斥锁加锁
    if (m_workqueue.size() > m_max_requests) {
        m_queuelocker.unlock();//互斥锁解锁
        return false;
    }
    m_workqueue.push_back(request);
    m_queuelocker.unlock();//互斥锁解锁
    m_queuestat.post();//唤醒阻塞的工作线程
}

template<class T>
void *threadpool<T>::worker(void *arg) {
    threadpool<T> *pool = (threadpool<T> *) arg;
    pool->run();//线程池对象创建出的工作线程能访问该线程池对象的成员
    return pool;
}

template<class T>
void threadpool<T>::run() {//不断从工作队列中取出任务并执行
    while (!m_stop) {
        m_queuestat.wait();//等待唤醒
        m_queuelocker.lock();//互斥锁加锁
        if (m_workqueue.empty()) {
            m_queuelocker.unlock();//互斥锁解锁
            continue;
        }
        T *request = m_workqueue.front();
        m_workqueue.pop_front();
        m_queuelocker.unlock();//互斥锁解锁
        if (request)
            request->process();//执行当前请求任务
    }
}

测试任务类

class S {//用于测试的任务类
public:
    S(int v = 0) : id(v) {}

    void process() {//执行任务
        printf("cur thread_id: %d, cur task_id: %d\n", std::this_thread::get_id(), id);//输出当前线程id和当前任务对象的val成员
        fflush(stdout);//立即输出
        _sleep(500);//暂停0.5s
    }

private:
    int id;
};

测试代码

#include<bits/stdc++.h>
#include <semaphore.h>

using namespace std;


class sem {//封装信号量类
public:
    sem(int num = 0) {
        if (sem_init(&m_sem, 0, num) != 0) {
            throw std::exception();
        }
    }

    ~sem() {
        sem_destroy(&m_sem);
    }

    bool wait() {
        return sem_wait(&m_sem) == 0;
    }

    bool post() {
        return sem_post(&m_sem) == 0;
    }

private:
    sem_t m_sem;
};

class locker {//封装互斥锁类
public:
    locker() {
        if (pthread_mutex_init(&m_mutex, NULL) != 0) {
            throw std::exception();
        }
    }

    ~locker() {
        pthread_mutex_destroy(&m_mutex);
    }

    bool lock() {
        return pthread_mutex_lock(&m_mutex) == 0;
    }

    bool unlock() {
        return pthread_mutex_unlock(&m_mutex) == 0;
    }

    pthread_mutex_t *get() {
        return &m_mutex;
    }

private:
    pthread_mutex_t m_mutex;
};


template<class T>
class threadpool {//线程池类
public:
    threadpool(int thread_number, int max_request = 1000);

    ~threadpool();

    bool append(T *request);//向请求队列种添加请求任务

private:
    static void *worker(void *arg);//工作线程运行的函数: 不断从工作队列中取出任务并执行

    void run();

private:
    int m_thread_number;//线程池中的线程数
    int m_max_requests;//请求队列中允许的最大请求数
    pthread_t *m_threads;//线程ID数组,其大小为m_thread_number
    std::list<T *> m_workqueue;//存放请求任务的队列
    locker m_queuelocker;//保护请求队列的互斥锁
    sem m_queuestat;//是否有任务需要处理
    bool m_stop;//是否结束线程
};


template<class T>
threadpool<T>::threadpool(int thread_number, int max_request):m_thread_number(thread_number), m_max_requests(max_request), m_stop(false) {
    if (thread_number <= 0 || max_request <= 0)
        throw std::exception();
    m_threads = new pthread_t[m_thread_number];
    if (!m_threads)
        throw std::exception();
    for (int i = 0; i < thread_number; i++) {
        if (pthread_create(m_threads + i, nullptr, worker, this) != 0 || pthread_detach(m_threads[i]) != 0) {//创建子线程并将其从主线程种分离
            delete[]m_threads;
            throw std::exception();
        }
    }
}

template<class T>
threadpool<T>::~threadpool() {
    delete[] m_threads;
    m_stop = true;// 修改标记, 使工作线程退出
}

template<class T>
bool threadpool<T>::append(T *request) {
    m_queuelocker.lock();//互斥锁加锁
    if (m_workqueue.size() > m_max_requests) {
        m_queuelocker.unlock();//互斥锁解锁
        return false;
    }
    m_workqueue.push_back(request);
    m_queuelocker.unlock();//互斥锁解锁
    m_queuestat.post();//唤醒阻塞的工作线程
}

template<class T>
void *threadpool<T>::worker(void *arg) {
    threadpool<T> *pool = (threadpool<T> *) arg;
    pool->run();//线程池对象创建出的工作线程能访问该线程池对象的成员
    return pool;
}

template<class T>
void threadpool<T>::run() {//不断从工作队列中取出任务并执行
    while (!m_stop) {
        m_queuestat.wait();//等待唤醒
        m_queuelocker.lock();//互斥锁加锁
        if (m_workqueue.empty()) {
            m_queuelocker.unlock();//互斥锁解锁
            continue;
        }
        T *request = m_workqueue.front();
        m_workqueue.pop_front();
        m_queuelocker.unlock();//互斥锁解锁
        if (request)
            request->process();//执行当前请求任务
    }
}

class S {//用于测试的任务类
public:
    S(int v = 0) : id(v) {}

    void process() {//执行任务
        printf("cur thread_id: %d, cur task_id: %d\n", std::this_thread::get_id(), id);//输出当前线程id和当前任务对象的val成员
        fflush(stdout);//立即输出
        _sleep(500);//暂停0.5s
    }

private:
    int id;
};


int main() {
    threadpool<S> tp(3);//创建最大工作线程数为3的线程池
    for (int i = 0; i < 9; i++) {//执行9个测试任务
        tp.append(new S(i));
    }
    _sleep(1500);
}

运行结果如下(输出顺序非固定), 可以看出所有任务都被执行且3个工作线程都能执行任务

在这里插入图片描述

参考:
最新版Web服务器项目详解 - 03 半同步半反应堆线程池(下)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/736730.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Kernel-Pwn-FGKASLR保护绕过

FGKASLR FGASLR&#xff08;Function Granular KASLR&#xff09;是KASLR的加强版&#xff0c;增加了更细粒度的地址随机化。因此在开启了FGASLR的内核中&#xff0c;即使泄露了内核的程序基地址也不能调用任意的内核函数。 layout_randomized_image 在fgkaslr.c文件中存在着…

支持中文手写和多画布的Handraw

什么是 Handraw ? Handraw 是支持中文手写和多画布的 Excalidraw 白板工具。 官网上项目名称还是 Excalidraw-CN&#xff0c;所以 Handraw 应该是基于 Excalidraw 二开的&#xff0c;特点是支持中文手写字体和多画布 官方也提供了免费使用的站点&#xff1a;https://handraw.t…

ModaHub魔搭社区:向量数据库Zilliz Cloud插入 Entity教程

目录 开始前 插入单个 Entity 批量插入 Entity 准备数据 插入数据 写入操作 本文介绍如何将 Entity 插入到 Zilliz Cloud 集群中的 Collection。 Entity 是 Collection 中的基本数据单元。同一个 Collection 中的 Entity 具有相同的属性,这些属性共同定义在 Schema 中…

低代码开发平台助力解决企业开发效率问题

编者按&#xff1a;随着企业应用需求的不断增加&#xff0c;提高企业开发效率已经成为许多企业的目标。传统的开发方法显然不适用&#xff0c;开发平台通过可视化拖拉拽搭建等易用性和高扩展性可以帮助企业解决这个问题。 关键词&#xff1a;可视化开发、私有化部署、前后端分离…

设计模式--------行为型模式

行为型模式 行为型模式用于描述程序在运行时复杂的流程控制&#xff0c;即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务&#xff0c;它涉及算法与对象间职责的分配。 行为型模式分为类行为模式和对象行为模式&#xff0c;前者采用继承机制来在类间…

飞控学习笔记-IMU姿态算法

扩展卡尔曼滤波算法 传感器融合算法 卡尔曼滤波算法 最小二乘法 毕卡逼近法 对上式进行泰勒展开 得到四元数各阶近似算法&#xff1a; 梯度下降算法 梯度下降 互补滤波算法 chatgpt解释&#xff1a; 互补滤波&#xff08;Complementary Filter&#xff09;算法是一种常用…

zk-IMG:对抗虚假信息

1. 引言 前序博客&#xff1a; ZKP图片授权——PhotoProof&#xff1a;proofs of permissible photo edits Daniel Kang等人2022年论文《ZK-IMG: Attested Images via Zero-Knowledge Proofs to Fight Disinformation》&#xff0c;在该论文中提供了一个简单的deep fake ima…

高级编程技巧之Python装饰器详解

概要 装饰器是Python中一种强大而灵活的编程技巧&#xff0c;它可以用于修改或扩展函数的行为&#xff0c;同时又不需要修改函数的源代码。本文将介绍Python中的装饰器的基本概念、使用方法以及高级技巧&#xff0c;帮助你从入门到精通装饰器的使用。 一、基本概念 在深入学习…

【面试必考点】这一次带你彻底学会this的指向问题

文章目录 前言一、this的指向问题1.1 全局中的this1.2 普通函数中的this1.3 定时器中的this1.4 事件处理函数中的this1.5 构造函数中的this1.6 构造函数静态方法中的this1.7 箭头函数中的this 二、修改函数中的this指向2.1 call2.2 apply2.3 bind 三、 this指向练习3.1 某小游戏…

【软件分析/静态分析】chapter6 课程08 指针分析(Pointer Analysis)

&#x1f517; 课程链接&#xff1a;李樾老师和谭天老师的&#xff1a; 南京大学《软件分析》课程08&#xff08;Pointer Analysis&#xff09;_哔哩哔哩_bilibili 目录 第六章 指针分析&#xff08;Pointer Analysis&#xff09; 6.1 为什么需要指针分析 6.2 指针分析的基本…

AMAT 工业输入输出模块0100-77037

W;① ⑧ 0 ③ 0 ① 7 7 7 ⑤ 9 AMAT 工业输入输出模块0100-77037 0100-76124 0100-71313 0100-71311 0100-71309 0100-71278 0100-71267 0100-71229 0100-71224 0100-20100 IGBT 和 IGCT 是四层器件&#xff0c;乍一看并没有什么不同。但是&#xff0c;当您“ 深入了解…

Spring Boot原理分析(三):IoC容器的继承层次

文章目录 一、Spring Ioc容器的继承层次1.BeanFactory2.ListableBeanFactory3.HierarchicalBeanFactory4.ApplicationContext 二、常用的ApplicationContext的实现类1.ClassPathXmlApplicationContext&#xff08;基于XML配置&#xff09;2.AnnotationConfigApplicationContext…

[Android]使用jni实现高斯模糊

1.高斯模糊的原理&#xff1a; 根据周边的像素值来确定自己的像素值&#xff0c;平均值&#xff0c;最大值&#xff0c;最小值&#xff0c;正太分布值 2.均值模糊blur 函数声明&#xff1a; CV_EXPORTS_W void blur( InputArray src, OutputArray dst,Size ksize, Point anc…

python绘制分组条形图

文章目录 数据导入多组条形图堆叠条形图 数据导入 我们经常会遇到对比多个统计量随时间变化的图像&#xff0c;比如想知道中国、美国以及欧盟最近几年GDP变化&#xff0c;如下表所示&#xff0c;单位是万亿美元。 中国美国欧盟201813.8920.5315.98201914.2821.3815.69202014.…

转换或是克隆的虚拟机无法联网,网络服务无法启动

新转换的虚拟机&#xff0c;无法联网&#xff0c;启动网络服务&#xff0c;报错&#xff1a; systemctl start network.service job for network.service failed because the control process exited with error code. 查看网络服务状态&#xff0c;systemctl status network…

SpringMVC 中的控制器如何处理文件上传

SpringMVC 中的控制器如何处理文件上传 Spring MVC 是一个基于 Java 的 Web 框架&#xff0c;它是 Spring 框架的一部分&#xff0c;提供了一系列的组件和工具&#xff0c;帮助开发人员构建 Web 应用程序。其中&#xff0c;控制器是 Spring MVC 中的核心组件之一&#xff0c;它…

SpringMVC 中的控制器如何返回 JSON 数据

SpringMVC 中的控制器如何返回 JSON 数据 SpringMVC 是一个基于 Spring 框架的 Web 框架&#xff0c;它提供了一种方便的方式来处理 HTTP 请求和响应。在 SpringMVC 中&#xff0c;控制器是用来处理请求的组件&#xff0c;它们负责接收请求、处理请求并返回响应。在本文中&…

三大城市分会场精彩呈现—2023架构·可持续未来峰会圆满收官!

2023年6月30日&#xff0c;由The Open Group主办的2023架构可持续未来峰会三大城市分会场成功举办&#xff0c;也代表着本次The Open Group半年度架构峰会圆满收官&#xff01; 本次大会以“可持续未来”为主题&#xff0c;采用“13”&#xff0c;即北京主会场上海/成都/深圳三…

svg修改图标颜色

对于svg图标&#xff0c;想通过hover或者active 添加颜色&#xff0c;没有办法修改&#xff0c;解决办法&#xff1a; 1. 修改svg图片源 最开始的svg图标&#xff1a; 修改这个fill"currentColor" 要是要修改线条颜色就修改stroke属性&#xff1a; fill属性设置对象…

青岛大学_王卓老师【数据结构与算法】Week05_02_栈的定义和特点_学习笔记

本文是个人学习笔记&#xff0c;素材来自青岛大学王卓老师的教学视频。 一方面用于学习记录与分享&#xff0c; 另一方面是想让更多的人看到这么好的《数据结构与算法》的学习视频。 如有侵权&#xff0c;请留言作删文处理。 课程视频链接&#xff1a; 数据结构与算法基础…