【Linux】简易线程池项目

news2024/11/16 9:33:07

线程池是一个可以巩固一些线程相关接口 && 加强理解的一个小项目。

注意:这里的线程池使用的线程并不是Linux原生接口,而是经过封装的,具体请看线程封装,为什么不使用原生接口?

因为原生接口一旦进行pthread_create直接就去执行目标函数了,我们就不能很好的控制线程;
而封装过后的接口可以进行start,在适当的时机去执行。

这里具体说一下封装后的线程,构造时需要传入名字与将要执行的可调用对象(函数指针,Lambda,仿函数),启动时start即可。

目录

  • 目标图画展示 && 设计思想
  • 设计框架
    • main的大纲
    • ThreadPool.hpp大纲
  • 代码实现细节
  • 全部代码

目标图画展示 && 设计思想

在这里插入图片描述
每当我们有任务时直接将任务push到线程池中即可,新线程会帮助我们处理任务。

那么该如何设计?
从图中可以看出线程池本质上就是一个生产者消费者模型。
而这个模型考虑到3个点足以
其一:一个用来存放数据的数据结构
其二:参与这个模型有生产者与消费者的两个角色
其三:注意生产者与生产者,消费者与消费者,生产者与消费者之间的关系即可。

设计框架

main的大纲

我们一般都是先从主函数进行一个框架的大概设计,并逐步进行填充
当然,以下的main函数肯定不是最终版本。

int main()
{
    std::unique_ptr<ThreadPool> tp = std::make_unique<ThreadPool>();
    tp->Init();
    tp->Start();

    while (true)
    {
        // 生产数据
        tp->Equeue(/*push数据*/);
        sleep(1);
    }

    tp->Stop();

    return 0;
}

ThreadPool.hpp大纲

将T设置为模板,其中T为可调用对象,为我们的任务
下段代码中比较详细的介绍了各个部分需要做的事情

template <class T>
class ThreadPool
{
public:
    void HandlerTask() 
    {
        // 不断循环,直到_isrunning变为false && queue中没有任务后再退出
        while (true)
        {
            // 在这段区域中进行对queue的读取,并进行处理数据                
            // 这里本质上就是属于消费者部分的临界区了
            // 于是需要进行加锁,并且需要处理一下生产者与消费者的互斥关系
        }
    }

public:
    ThreadPool()
    {}
    ~ThreadPool()
    {}
    void Init()
    {
        // 创建一批线程(未启动),使用vector管理起来
        // 注意创建时需要将 线程名与HandlerTask 都传入
    }
    void Start()
    {
        // 启动线程
    }
    void Equeue()
    {
        // 这里是生产者进行放入数据的区域
        // 与消费者那里类似,同样需要注意对临界区的保护与处理 生产者与消费者的关系
    }
    void Stop()
    {
        // 将线程池运行标志位置为false,注意一些细节
    }

private:                            // 可以根据自己的需要进行增删,我这里只是一个实例

    std::vector<MyThread> _thds;    // 管理线程的数据结构
    int _cap;                       // 线程池容量
    std::queue<T> _q;               // 生产消费模型中的交易场所
    bool _isrunning;                // 线程池运行标志位
    int _sleep_num;                 // 睡眠线程个数
    pthread_mutex_t _mutex;         // 锁
    pthread_cond_t _cond;           // 条件变量
};

代码实现细节

我们先给出一个任务hpp,没什么好说的,没有一点细节

#include <iostream>

class Task
{
public:
    Task(int x, int y) : _x(x), _y(y)
    {
    }
    void operator()()
    {
        _result = _x + _y;
    }
    void Debug()
    {
        std::cout << _x << " + " << _y << " = ?" << std::endl;
    }
    void Result()
    {
        std::cout << _x << " + " << _y << " = " << _result << std::endl;
    }
private:
    int _x;
    int _y;
    int _result;
};

线程池部分完整代码

template <class T>
class ThreadPool
{
private:
    void Lock()
    {
        pthread_mutex_lock(&_mutex);
    }
    void Unlock()
    {
        pthread_mutex_unlock(&_mutex);
    }
    void WakeUp()
    {
        pthread_cond_signal(&_cond);
    }
    void WakeUpAll()
    {
        pthread_cond_broadcast(&_cond);
    }
    bool Empty()
    {
        return _q.empty();
    }
    void Sleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }

public:
    void HandlerTask(const std::string &name)
    {
        while (true)
        {
            Lock();
            while (Empty() && _isrunning)
            {
                _sleep_num++;
                Sleep();
                _sleep_num--;
            }
            if (Empty() && !_isrunning)
            {
                Unlock();
                break;
            }
            // 有任务
            T t = _q.front();
            _q.pop();
            Unlock();
            // 在自己独立的栈帧中并发执行任务。
            t();
            std::cout << name << " :";
            t.Result();
        }
    }

public:
    ThreadPool(int cap = defaultCap) : _cap(cap), _sleep_num(0), _isrunning(false)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
    void Init()
    {
        std::function<void(const std::string &)> f = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
        for (int i = 0; i < _cap; i++)
        {
            std::string name = "thread-" + std::to_string(i + 1);
            _thds.emplace_back(name, f);
        }
    }
    void Start()
    {
        _isrunning = true;
        for (auto &thd : _thds)
        {
            thd.Start();
        }
    }
    void Equeue(const T &in)
    {
        Lock();
        if (_isrunning)
        {
	        _q.emplace(in);
	        if (_sleep_num > 0)
	        {
	            WakeUp();
	        }
        }
        Unlock();
    }
    void Stop()
    {
        Lock();
        _isrunning = false;
        WakeUpAll();
        Unlock();
    }

private:
    std::vector<MyThread> _thds;
    int _cap;
    std::queue<T> _q;
    int _sleep_num;
    bool _isrunning;

    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};

其中这里有很关键的几个点

一.、HandlerTask中关于生产者与消费者关系的维护
我们的新线程没有任务时直接去休眠即可,但是由于我们还要控制线程池(_isrunning),于是这里需要增加一些逻辑,只有queue中没有任务并且_isrunning为true时才能休眠;
同样退出时需要_isrunning为false并且没有任务才能退出

二、生产者处理数据时要在锁外进行处理,否则在锁内的话就失去了多线程意义了。

三、init中,构造vector时,我们需要HandlerTask传过去,但是由于隐含的this指针,所以利用std::bind就会很舒服

四:stop中,我们除了设为false,还要进行wakeUpAll,原因在于 可能当你的queue中没有任务并且_isrunning还为true都在条件变量下休眠了,这样的话如果stop没有wakeUpAll,那么新线程就会永远sleep。

全部代码

模拟线程的代码:

#pragma once

#include <iostream>
#include <string>
#include <unistd.h>
#include <cstdio>
#include <pthread.h>
#include <functional>

namespace cyc
{

    class MyThread
    {
    public:
        // typedef void (*func_t)();
        using func_t = std::function<void(const std::string&)>;
        MyThread(const std::string &name, func_t func) : _func(func), _isRunning(false), _name(name)
        {
        }
        ~MyThread()
        {
        }

        void Excute()
        {
            _isRunning = true;
            _func(_name);
            _isRunning = false;
        }
        static void *routine(void *arg)
        {
            MyThread *self = static_cast<MyThread*>(arg);
            self->Excute();
            return nullptr;
        }

        void Start()
        {
            int n = ::pthread_create(&_tid, nullptr, routine, this);
            if (n != 0)
            {
                perror("pthread_create fail");
                exit(1);
            }
            std::cout << _name << " start..." << std::endl;
        }
        void Stop()
        {
            if (_isRunning)
            {
                pthread_cancel(_tid);
                _isRunning = false;
            }
        }
        void Join()
        {
            int n = pthread_join(_tid, nullptr);
            if (n != 0)
            {
                perror("pthread_join fail");
                exit(1);
            }
            std::cout << _name << " Join sucess..." << std::endl;
        }
        std::string GetStatus()
        {
            if (_isRunning)
                return "Running";
            else
                return "sleeping";
        }

    private:
        pthread_t _tid;
        func_t _func;
        std::string _name;
        bool _isRunning;
    };
}

任务代码:

#include <iostream>

class Task
{
public:
    Task(int x, int y) : _x(x), _y(y)
    {
    }
    void operator()()
    {
        _result = _x + _y;
    }
    void Debug()
    {
        std::cout << _x << " + " << _y << " = ?" << std::endl;
    }
    void Result()
    {
        std::cout << _x << " + " << _y << " = " << _result << std::endl;
    }
private:
    int _x;
    int _y;
    int _result;
};

线程池代码:

#pragma once

#include "MyThread.hpp"
#include <vector>
#include <queue>
#include <string>
#include <pthread.h>

using namespace cyc;

const int defaultCap = 5;

void test()
{
    while (true)
    {
        std::cout << "hello world" << std::endl;
    }
}

template <class T>
class ThreadPool
{
private:
    void Lock()
    {
        pthread_mutex_lock(&_mutex);
    }
    void Unlock()
    {
        pthread_mutex_unlock(&_mutex);
    }
    void WakeUp()
    {
        pthread_cond_signal(&_cond);
    }
    void WakeUpAll()
    {
        pthread_cond_broadcast(&_cond);
    }
    bool Empty()
    {
        return _q.empty();
    }
    void Sleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }

public:
    void HandlerTask(const std::string &name)
    {
        while (true)
        {
            Lock();
            while (Empty() && _isrunning)
            {
                _sleep_num++;
                Sleep();
                _sleep_num--;
            }
            if (Empty() && !_isrunning)
            {
                Unlock();
                break;
            }
            // 有任务
            T t = _q.front();
            _q.pop();
            Unlock();
            // 在自己独立的栈帧中并发执行任务。
            t();
            std::cout << name << " :";
            t.Result();
        }
    }

public:
    ThreadPool(int cap = defaultCap) : _cap(cap), _sleep_num(0), _isrunning(false)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
    void Init()
    {
        std::function<void(const std::string &)> f = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);
        for (int i = 0; i < _cap; i++)
        {
            std::string name = "thread-" + std::to_string(i + 1);
            _thds.emplace_back(name, f);
        }
    }
    void Start()
    {
        _isrunning = true;
        for (auto &thd : _thds)
        {
            thd.Start();
        }
    }
    void Equeue(const T &in)
    {
        Lock();
        if (_isrunning)
        {
	        _q.emplace(in);
	        if (_sleep_num > 0)
	        {
	            WakeUp();
	        }
        }
        Unlock();
    }
    void Stop()
    {
        Lock();
        _isrunning = false;
        WakeUpAll();
        Unlock();
    }

private:
    std::vector<MyThread> _thds;
    int _cap;
    std::queue<T> _q;
    int _sleep_num;
    bool _isrunning;

    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};

main代码:

#include <unistd.h>
#include <memory>

#include "ThreadPool.hpp"
#include "Task.hpp"


int main()
{
    ThreadPool<Task> *tp = new ThreadPool<Task>;

    tp->Init();
    tp->Start();
    // sleep(1);
    int count = 3;
    while (count--)
    {
        // 生产数据
        Task t(1, 2);
        tp->Equeue(t);
        sleep(1);
    }

    tp->Stop();

    delete tp;

    return 0;
}

有问随时找博主~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2043167.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2024最新easyrecovery 14中文破解版图文教程

使用EasyRecovery易恢复进行数据恢复非常简单。首先&#xff0c;用户需要选择需要恢复的数据类型&#xff0c;如文档、图片、视频等。然后&#xff0c;软件会对选定的存储设备进行全面扫描&#xff0c;以寻找可恢复的数据。在扫描过程中&#xff0c;用户可以预览部分已找到的文…

成化瓷器“制字衣横少越刀”--还有例外

孙瀛洲先生关于成化款瓷器的名言非常经典&#xff0c;但是&#xff0c;凡事总有以外。 图1&#xff0c;本人收藏成化斗彩鸡缸杯底款&#xff0c;制字的衣横越过双勾刀。 下面是两件台北故宫成化瓷器底款&#xff0c;制字下面的衣横也是越过刀了。 所以&#xff0c;凡事总有例外…

mysql5.7安装

1.创建一个software文件 2.先下载mysql的repo源 wget http://repo.mysql.com/mysql-community-release-el7-5.noarch.rpm 3安装源包 rpm -ivh mysql-community-release-el7-5.noarch.rpm 可能会报错 改成命令 rpm -ivh mysql-community-release-el7-5.noarch.rpm --nodeps…

优化图像处理:从旋转与缩放到水印添加

1. 旋转与缩放的仿射变换 在 OpenCV 中&#xff0c;cv2.getRotationMatrix2D() 函数可以生成旋转矩阵&#xff0c;该矩阵用于对图像进行旋转和缩放变换。旋转矩阵的主要参数是&#xff1a; Center&#xff1a;旋转中心点的坐标 (x, y)。 Angle&#xff1a;旋转角度&#xff0…

数据结构与算法--图的应用

文章目录 回顾提要连通图生成树最小生成树构造最小生成树的算法普里姆(Prim)算法克鲁斯卡尔(Kruskal)算法 最短路径狄杰斯特拉 (Dijkstra) 算法当前最短路径的更新拓扑排序拓扑排序方法拓扑排序示例总结 回顾 图的遍历方法&#xff1a; 深度优先遍历 (DFS)&#xff1a;从任意…

在centos7安装mysql

1.卸载旧环境 ps axj | grep mysql ps axj | grep mariabd 如果是这样就什么都不需要做。 如果不是 2.检查并卸载系统安装包 //检查安装包 rpm -qa | grep mysql//卸载安装包 rpm -qa | grep mysql | xargs yum -y remove 3.安装官方yum源 先查看系统的版本 比如我是7.9版…

力扣高频SQL 50题(基础版)第四十题之1164. 指定日期的产品价格

文章目录 力扣高频SQL 50题&#xff08;基础版&#xff09;第四十题1164. 指定日期的产品价格题目说明实现过程准备数据实现方式结果截图总结FIRST_VALUE()函数LAST_VALUE()函数NTH_VALUE()函数 LAST_VALUE()函数NTH_VALUE()函数 力扣高频SQL 50题&#xff08;基础版&#xff0…

YJ0043定制版抖音电商卷抢购系统带回收商城抖音电商优惠卷投资理财系统

系统是基于逍遥商城二开的系统&#xff0c;pc手机端都新增了邀请码验证 手机端重新定制的UI&#xff0c;前端产品不至于抖音卷也可以自行更改其他产品 用户前端下单&#xff0c;后台订单可以直接回收&#xff0c;后台支持设置默认邀请码和抢卷时间限制

动手学深度学习(pytorch)学习记录10-从零开始实现softmax回归[学习记录]

注&#xff1a;本代码在jupyter notebook上运行 封面图片来源 导包 import torch from IPython import display import torchvision from torchvision import transforms from torch.utils import data设置加载数据的线程数 def get_dataloader_workers(): ""&qu…

《学会 SpringBoot 系列 · spring.factories 详解》

&#x1f4e2; 大家好&#xff0c;我是 【战神刘玉栋】&#xff0c;有10多年的研发经验&#xff0c;致力于前后端技术栈的知识沉淀和传播。 &#x1f497; &#x1f33b; CSDN入驻不久&#xff0c;希望大家多多支持&#xff0c;后续会继续提升文章质量&#xff0c;绝不滥竽充数…

机器人阻抗控制之设计方法

机器人阻抗控制的设计方法主要围绕调整机器人与环境之间的动态关系&#xff0c;使其等效为由弹簧-阻尼-质量组成的二阶系统。这一控制策略不是直接控制机器人的运动或其与外界的接触力&#xff0c;而是控制这二者之间的动态关系。以下是机器人阻抗控制设计方法的详细阐述&#…

Centos7系统上安装docker

centos7安装docker 安装之前&#xff0c;一定查看是否安装docker&#xff0c;如果有&#xff0c;卸载老版本 我是虚拟机装的Centos7&#xff0c;linux 3.10 内核&#xff0c;docker官方说至少3.8以上&#xff0c;建议3.10以上&#xff08;ubuntu下要linux内核3.8以上&#xff…

LVS详细配置

目录 LVS简介 LVS集群体系结构 LVS相关术语 lvs集群的类型 1、NAT模式 NAT简介 NAT模式数据逻辑 2、DR模式 DR模式简介 DR模式数据逻辑 DR模式的特点 3、TUN模式 TUN模式简介 TUN模式数据传输过程 TUN模式特点 4、fullnet模式 LVS模式总结 LVS调度算法 LVS静…

python从入门到精通:函数

目录 1、函数介绍 2、函数的定义 3、函数的传入参数 4、函数的返回值 5、函数说明文档 6、函数的嵌套调用 7、变量的作用域 1、函数介绍 函数是组织好的&#xff0c;可重复使用的&#xff0c;用来实现特定功能的代码段。 name "zhangsan"; length len(nam…

机器学习(1)--数据可视化

文章目录 数据可视化作用可视化方法实现可视化 总结 数据可视化 数据可视化是将数据以图形、图像、动画等视觉形式表示出来&#xff0c;以便人们能够更直观地理解、分析和交流数据中的信息。 作用 一个整理的好好的数据&#xff0c;我们为什么要将其可视化呢&#xff1f;将它…

深入理解指针

前言&#xff1a;对于指针我们已经有了初步的了解&#xff0c;并已能够简单使用。今天我们来深入理解指针。让我们的指针功力更上一层楼。 1 使用指针访问数组 有了指针的知识&#xff0c;再结合数组的特点&#xff0c;我们就可以使用指针来访问数组了。 #include<stdio.…

线程的进阶学习

线程结束方式: 1.pthread_exit //pthread_join 2.从线程执行函数中return //此时效果等价于pthread_exit 3.pthread_cancel //线程可以被取消 4.任何一个线程调用了exit 或者 主线程 (main函数) return都会造成 进程结束 线程资源回收 ---pthread_join int pthread_ca…

汤姆·克鲁斯对妮可·基德曼经常对粉丝提起他们以前的事感到恼火

妮可基德曼最近回忆了她与前夫汤姆克鲁斯和导演斯坦利库布里克在 1999 年的电影《大开眼戒》中合作的时光。这似乎是对她职业生涯中某个时刻的无伤大雅的回顾&#xff0c;但据报道&#xff0c;有一个人对她在纪念该电影上映 25 周年时的谈话感到不满。 据报道&#xff0c;克鲁…

Polars简明基础教程十四:可视化(四)

数据帧交换协议 数据帧互换协议&#xff08;Dataframe Interchange Protocol&#xff09;&#xff0c;是为了提高不同数据帧库之间的互操作性而设计的。 想象一下&#xff0c;你有很多不同类型的储物箱&#xff08;在这里比喻为不同的数据帧库&#xff0c;如 Pandas、Polars、…

ArkTs基础语法-声明式UI-基本概念

声明式UI语法 基本概念声明式UI描述创建组件无参数有参数 配置属性配置事件 配置子组件 基本概念 装饰器&#xff1a;用于装饰类、结构、方法及变量&#xff0c;并赋予其特殊的含义。 例如&#xff1a; Entry 有该装饰器的自定义组件&#xff0c;可以在UIAbility中使用&#xf…