文章目录
- 线程池的定义
- 使用线程池的原因
- 基于POSIX实现的线程池
- 基于block队列的线程池实现
- 基于ring队列的线程池实现
- 设计单例模式线程池
线程池的定义
线程池就一堆已经创建好的任务线程,初始它们都处于空闲等待状态,当有新的任务需要处理的时候,就从这线程池里取一个空闲的线程来处理任务,当任务处理完成后再次把线程返回线程池里(把线程置于空闲等待状态),以供后面线程继续使用。当线程池里所有的线程都处于忙碌状态时,可以根据情况进行等待或创建一个新的线程放入线程池里。
使用线程池的原因
线程的创建和销毁相对于进程的创建和销毁来说是轻量级的(即开销小),但是当我们的任务需要进行大量线程的创建和销毁时,这些开销合在一起就比较大了。比如,当设计一个压力性能测试框架时,需要连续产生大量的并发操作。线程池在这种场合是非常适用的。线程池的好处就在于线程复用,某个线程在处理完一个任务后,可以继续处理下一个任务,不用重新创建和销毁,避免了无谓的开销,因此线程池适用于连续产生大量并发任务的场合。
基于POSIX实现的线程池
在了解线程池的基本原理,下面我们用c++传统的方式也就POSIX来实现一个基本的线程池,该线程池虽然简单,但能体现线程池的基本工作原理。线程池的实现千变万化,有时候要根据实际的场合来定制,当原理都是一样的,
基于block队列的线程池实现
- 实现block队列
- 创建一堆线程,线程从队列拿取任务
#pragma once
#include <pthread.h>
#include <iostream>
#include <queue>
#include <semaphore.h>
#include<unistd.h>
// 总结:我们应该清楚生产者消费者模型的目的是为了让生产和消费解耦,解耦的好处在于,生产和消费能并发执行,
// 在多生产者多消费者模型中,目的就为了能合理控制生产消费线程的个数达到合理利用资源。
namespace ns_block_queue_pthread_pool
{
template <class T>
class pthread_pool
{
private:
int num_; // 线程数量
std::queue<T> task_queue_; // 任务队列,供给线程池使用
pthread_cond_t task_queue_cond_; // 任务队列的条件变量
// 拿取数据的过程要消费者间互斥,消费者与生产者互斥同步,所以我们需要添加锁来进行保护,并且加入条件变同步
pthread_mutex_t mtx_; // 多生产者多消费者维护关系的共有锁资源,
private:
// 对成员变量的访问目的为了让静态函数通过接收this参数访问成员函数并且访问成员变量。
void Lock()
{
pthread_mutex_lock(&mtx_);
}
void UnLock()
{
pthread_mutex_unlock(&mtx_);
}
bool IsEmpty()
{
return task_queue_.empty();
}
void WakeUpThread()
{
pthread_cond_signal(&task_queue_cond_);
}
public:
// 构造函数
pthread_pool(int num /*线程数量*/)
: num_(num)
{
pthread_cond_init(&task_queue_cond_, nullptr);
pthread_mutex_init(&mtx_, nullptr);
}
// 析构函数
~pthread_pool()
{
pthread_cond_destroy(&task_queue_cond_);
pthread_mutex_destroy(&mtx_);
}
void TaskPop(T &out)
{
// 拿取数据的过程要消费者间互斥,消费者与生产者互斥同步,所以我们需要添加锁来进行保护,并且加入条件变同步
Lock();
// 线程在满拿取条件时拿取数据处理,否则等待条件变量
while (IsEmpty())
{
// 线程池的线程充当消费者
pthread_cond_wait(&task_queue_cond_, &mtx_);
}
out = task_queue_.front();
task_queue_.pop();
UnLock();
}
void TaskPush(T &in)
{
Lock();
task_queue_.push(in);
UnLock();
WakeUpThread();
}
// 线程池线程
// 我们必须设置成静态的函数,原因是成员函数有this参数
static void *routine(void *agrs)
{
//
pthread_pool<T> *pp = (pthread_pool<T> *)agrs;
while (true)
{
T task;
pp->TaskPop(task);
// 执行任务;
std::cout << "线程:" << pthread_self() << "执行数据:" << task << "执行任务完成" << std::endl;
}
}
// 初始化线程池
void PthreadPoolInit()
{
pthread_t id;
for (int i = 0; i < num_; i++)
{
pthread_create(&id, nullptr, routine, this /*线程池对象this指针*/);
}
}
};
}
基于ring队列的线程池实现
- 实现ring队列
- 创建一堆线程
namespace ns_ring_queue_pthread_pool
{
template <class T>
class pthread_pool
{
const int queue_cap = 5;
private:
int num_; // 线程数量
std::vector<T> task_queue; // 任务队列,供给线程池使用
size_t p_pos;
size_t c_pos;
sem_t q_blank;
sem_t q_data;1
pthread_mutex_t mtx_; // 多生产者多消费者维护关系的共有锁资源,
pthread_mutex_t c_mutex;
pthread_mutex_t p_mutex;
private:
// 对成员变量的访问目的为了让静态函数通过接收this参数访问成员函数并且访问成员变量。
void Lock(pthread_mutex_t &mutex)
{
pthread_mutex_lock(&mutex);
}
void UnLock(pthread_mutex_t &mutex)
{
pthread_mutex_unlock(&mutex);
}
public:
// 构造函数
pthread_pool(int num /*线程数量*/)
: num_(num), c_pos(0), p_pos(0)
{
task_queue.resize(queue_cap);
pthread_mutex_init(&mtx_, nullptr);
pthread_mutex_init(&p_mutex,nullptr);
pthread_mutex_init(&c_mutex,nullptr);
sem_init(&q_blank, NULL, queue_cap);
sem_init(&q_data, NULL, 0);
std::cout<<"pthread_pool 初始化成功"<<std::endl;
}
// 析构函数
~pthread_pool()
{
sem_destroy(&q_blank);
sem_destroy(&q_data);
pthread_mutex_destroy(&mtx_);
pthread_mutex_destroy(&c_mutex);
pthread_mutex_destroy(&p_mutex);
}
void TaskPop(T &out)
{
sem_wait(&q_data);
Lock(c_mutex);
out = task_queue[c_pos];
sem_post(&q_blank);
c_pos++;
c_pos %= queue_cap;
UnLock(c_mutex);
}
void TaskPush(T &in)
{
sem_wait(&q_blank);
Lock(p_mutex);
task_queue[p_pos] = in;
sem_post(&q_data);
p_pos++;
p_pos %= queue_cap;
UnLock(p_mutex);
}
// 线程池线程
// 我们必须设置成静态的函数,原因是成员函数有this参数
static void *routine(void *agrs)
{
//
pthread_pool<T> *pp = (pthread_pool<T> *)agrs;
while (true)
{
T task;
pp->TaskPop(task);
// 执行任务;
std::cout << "线程:" << pthread_self() << "执行数据:" << task << "执行任务完成" << std::endl;
sleep(4);
}
}
// 初始化线程池
void PthreadPoolInit()
{
pthread_t id;
for (int i = 0; i < num_; i++)
{
pthread_create(&id, nullptr, routine, this /*线程池对象this指针*/);
}
}
};
}
注意:
这里没有对线程池销毁线程操作,声明周期随进程。
设计单例模式线程池
线程池本身会在任何场景,任何环境下被调用,因此我们可以设计成单例模式,由一个单例对线程池进行管理。
#pragma once
#include <pthread.h>
#include <iostream>
#include <queue>
// 总结:我们应该清楚生产者消费者模型的目的是为了让生产和消费解耦,解耦的好处在于,生产和消费能并发执行,
// 在多生产者多消费者模型中,目的就为了能合理控制生产消费线程的个数达到合理利用资源。
namespace ns_pthread_pool
{
const int g_default_val = 3;
template <class T>
class pthread_pool
{
private:
int num_; // 线程数量
std::queue<T> task_queue_; // 任务队列,供给线程池使用
pthread_cond_t task_queue_cond_; // 任务队列的条件变量
// 拿取数据的过程要消费者间互斥,消费者与生产者互斥同步,所以我们需要添加锁来进行保护,并且加入条件变同步
pthread_mutex_t mtx_; // 多生产者多消费者维护关系的共有锁资源,
static pthread_pool<T> *ins_;
// 私有构造函数
pthread_pool(int num = g_default_val /*线程数量*/)
: num_(num)
{
pthread_cond_init(&task_queue_cond_, nullptr);
pthread_mutex_init(&mtx_, nullptr);
}
private:
// 对成员变量的访问目的为了让静态函数通过接收this参数访问成员函数并且访问成员变量。
void Lock()
{
pthread_mutex_lock(&mtx_);
}
void UnLock()
{
pthread_mutex_unlock(&mtx_);
}
bool IsEmpty()
{
return task_queue_.empty();
}
void WakeUpThread()
{
pthread_cond_signal(&task_queue_cond_);
}
public:
static pthread_pool<T> *GetInstance()
{
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// 当前单例对象还没有被创建
if (ins_ == nullptr) // 双判定,减少锁的争用,提高获取单例的效率!
{
pthread_mutex_lock(&lock);
// ins为空说明该类没有创建过对象
if (ins_ == nullptr)
{
ins_ = new pthread_pool<T>();
ins_->PthreadPoolInit();
std::cout << "该类第一次创建对象" << std::endl;
}
}
return ins_;
}
pthread_pool(const pthread_pool<T> &pool) = delete;
pthread_pool<T> &operator=(const pthread_pool<T> &pool) = delete;
// 析构函数
~pthread_pool()
{
pthread_cond_destroy(&task_queue_cond_);
pthread_mutex_destroy(&mtx_);
}
void TaskPop(T &out)
{
// 拿取数据的过程要消费者间互斥,消费者与生产者互斥同步,所以我们需要添加锁来进行保护,并且加入条件变同步
Lock();
// 线程在满拿取条件时拿取数据处理,否则等待条件变量
while (IsEmpty())
{
// 线程池的线程充当消费者
pthread_cond_wait(&task_queue_cond_, &mtx_);
}
out = task_queue_.front();
task_queue_.pop();
UnLock();
}
void TaskPush(T &in)
{
Lock();
task_queue_.push(in);
UnLock();
WakeUpThread();
}
// 线程池线程
// 我们必须设置成静态的函数,原因是成员函数有this参数
static void *routine(void *agrs)
{
//
pthread_pool<T> *pp = (pthread_pool<T> *)agrs;
while (true)
{
T task;
pp->TaskPop(task);
// 执行任务;
std::cout << "线程:" << pthread_self() << "执行数据:" << task() << "执行任务完成" << std::endl;
}
}
// 初始化线程池
void PthreadPoolInit()
{
pthread_t id;
for (int i = 0; i < num_; i++)
{
pthread_create(&id, nullptr, routine, this /*线程池对象this指针*/);
}
}
};
template <class T>
pthread_pool<T> *pthread_pool<T>::ins_ = nullptr;
}