代码地址
5.5、线程池同步机制类封装及线程池实现
- 1.线程池
- 2.代码实现
- ①锁
- Ⅰ、locker.h
- Ⅱ、locker.cpp
- ②条件变量
- Ⅰ、cond.h
- Ⅱ、cond.cpp
- ③信号量
- Ⅰ、sem.h
- Ⅱ、sem.cpp
- ④线程池
- Ⅰ、threadpool.h
- Ⅱ、threadpool.cpp
1.线程池
线程池是由服务器预先创建的一组子线程,线程池中的线程数量应该和 CPU
数量差不多。线程池中的所有子线程都运行着相同的代码。当有新的任务到来时,主线程将通过某种方式选择线程池中的某一个子线程来为之服务。相比与动态的创建子线程,选择一个已经存在的子线程的代价显然要小得多。至于主线程选择哪个子线程来为新任务服务,则有多种方式:
- 主线程使用某种算法来主动选择子线程。最简单、最常用的算法是随机算法和
Round Robin(轮流选取)
算法,但更优秀、更智能的算法将使任务在各个工作线程中更均匀地分配,从而减轻服务器的整体压力。 - 主线程和所有子线程通过一个共享的工作队列来同步,子线程都睡眠在该工作队列上。当有新的任务到来时,主线程将任务添加到工作队列中。这将唤醒正在等待任务的子线程,不过只有一个子线程将获得新任务的”接管权“,它可以从工作队列中取出任务并执行之,而其他子线程将继续睡眠在工作队列上。
线程池中的线程数量最直接的限制因素是中央处理器
(CPU)
的处理器(processors/cores)
的数量N
:如果你的CPU
是4-cores
的,对于CPU密集型
的任务(如视频剪辑等消耗CPU
计算资源的任务)来说,那线程池中的线程数量最好也设置为4
(或者+1
防止其他因素造成的线程阻塞);对于IO密集型
的任务,一般要多于CPU
的核数,因为线程间竞争的不是CPU
的计算资源而是IO
,IO
的处理一
般较慢,多于cores
数的线程将为CPU
争取更多的任务,不至在线程处理IO
的过程造成CPU
空闲导致资源浪费。
- 空间换时间,浪费服务器的硬件资源,换取运行效率。
- 池是一组资源的集合,这组资源在服务器启动之初就被完全创建好并初始化,这称为静态资源。
- 当服务器进入正式运行阶段,开始处理客户请求的时候,如果它需要相关的资源,可以直接从池中获取,无需动态分配。
- 当服务器处理完一个客户连接后,可以把相关的资源放回池中,无需执行系统调用释放资源。
2.代码实现
①锁
Ⅰ、locker.h
#ifndef LOCK_H
#define LOCK_H
#include <pthread.h>
class Locker {
private:
// 互斥锁
pthread_mutex_t m_mutex;
public:
Locker();
~Locker();
// 加锁
bool lock();
// 解锁
bool unlock();
// 获取锁;
pthread_mutex_t * get();
};
#endif
Ⅱ、locker.cpp
#include "locker.h"
#include <exception>
// 构造函数,初始化 m_mutex
Locker::Locker() {
if (pthread_mutex_init(&m_mutex, NULL) != 0) {
throw std::exception();
}
}
// 析构函数,销毁 m_mutex
Locker::~Locker() {
pthread_mutex_destroy(&m_mutex);
}
// 给 m_mutex 加锁
bool Locker::lock() {
return pthread_mutex_lock(&m_mutex) == 0;
}
// 解开 m_mutex 的锁
bool Locker::unlock() {
return pthread_mutex_unlock(&m_mutex) == 0;
}
// 获取 m_mutex
pthread_mutex_t * Locker::get() {
return &m_mutex;
}
②条件变量
Ⅰ、cond.h
#ifndef COND_H
#define COND_H
#include <pthread.h>
#include <time.h>
class Cond {
private:
// 条件变量
pthread_cond_t m_cond;
public:
Cond();
~Cond();
// 阻塞等待唤醒
bool wait(pthread_mutex_t *);
// 超时等待
bool timedwait(pthread_mutex_t *, timespec);
// 唤醒单个等待条件
bool signal();
// 唤醒全部等待条件
bool broadcast();
};
#endif
Ⅱ、cond.cpp
#include "cond.h"
#include <exception>
// 构造函数,初始化 m_cond
Cond::Cond() {
if (pthread_cond_init(&m_cond, NULL) != 0) {
throw std::exception();
}
}
// 销毁 m_cond
Cond::~Cond() {
pthread_cond_destroy(&m_cond);
}
// 条件阻塞等待唤醒 m_cond
bool Cond::wait(pthread_mutex_t * mutex) {
return pthread_cond_wait(&m_cond, mutex) == 0;
}
// 超时等待条件阻塞
bool Cond::timedwait(pthread_mutex_t * mutex, timespec t) {
return pthread_cond_timedwait(&m_cond, mutex, &t) == 0;
}
// 唤醒等待进程
bool Cond::signal() {
return pthread_cond_signal(&m_cond) == 0;
}
// 唤醒全部进程
bool Cond::broadcast() {
return pthread_cond_broadcast(&m_cond) == 0;
}
③信号量
Ⅰ、sem.h
#ifndef SEM_H
#define SEM_H
#include <semaphore.h>
class Sem {
private:
// 信号量
sem_t m_sem;
public:
Sem();
Sem(int);
~Sem();
// 信号量的加锁,如果为0,就阻塞,调用一个减 1
bool wait();
// 调用一次 + 1
bool post();
};
#endif
Ⅱ、sem.cpp
#include "sem.h"
#include <exception>
// 构造函数
Sem::Sem() {
if (sem_init(&m_sem, 0, 0) != 0) {
throw std::exception();
}
}
// 构造函数带初始值 num
Sem::Sem(int num) {
if (sem_init(&m_sem, NULL, num) != 0) {
throw std::exception();
}
}
// 析构函数,销毁信号量 m_sem
Sem::~Sem() {
sem_destroy(&m_sem);
}
// 阻塞信号量,数值 -1
bool Sem::wait() {
return sem_wait(&m_sem) == 0;
}
// 增加信号的值,+1
bool Sem::post() {
return sem_post(&m_sem) == 0;
}
④线程池
Ⅰ、threadpool.h
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <list>
#include "locker.h"
#include "sem.h"
#include "cond.h"
// 线程池
template<class T>
class ThreadPool {
private:
// 线程的数量
int m_thread_number;
// 线程数组,大小为m_thread_number
pthread_t * m_threads;
// 允许等待的最大数量
int m_max_requests;
// 请求队列,需要处理的任务
std::list<T*> m_workqueue;
// 请求队列的互斥锁
locker m_queuelocker;
// 是否有任务需要处理,信号量的数量
sem m_queuestat;
// 是否结束进程
bool m_stop;
public:
//thread_number是线程池中线程的数量
//max_requests是请求队列中最多允许的、等待处理的请求的数量
ThreadPool(int thraed_number = 8, int max_requests = 10000);
// 析构函数,销毁线程的数据
~ThreadPool();
// 增加任务进工作队列中
bool append(T *);
private:
// 创建线程之后的运行函数
void * worker(void * arg);
// 取出队列中任务,不断的运行线程处理任务
void run();
};
#endif
Ⅱ、threadpool.cpp
#include "threadpool.h"
#include <iostream>
// 线程池的构造函数
template<class T>
ThreadPool<T>::ThreadPool(int thread_numebr, int max_requests) {
// 传入错误的参数
if (thread_number <= 0 || max_requests <= 0) {
throw std::exception();
}
// 属性赋值
m_thread_number = thread_numebr;
m_max_requests = max_requests;
m_stop = false;
m_queuelocker = locker();
m_queuestat = sem();
m_workqueue.clear();
// 创建线程数组
m_threads = new pthread_t[m_thread_number];
if (m_threads == nullptr) {
throw std::exception();
}
// 创建线程
for (int i = 0; i < m_thread_number; i ++ ) {
printf("create the %dth thread\n", i);
// 如果创建失败返回
if (pthread_create(m_threads[i], nullptr, worker, this) != 0) {
delete[] m_threads;
throw std::exception();
}
// 创建线程分离
if (pthread_detach(m_threads[i]) != 0) {
delete[] m_threads;
throw std::exception();
}
}
}
// 线程池的析构函数,销毁一些变量
template<class T>
ThreadPool<T>::~ThreadPool() {
delete[] m_threads;
m_stop = true;
m_workqueue.clear();
}
template<class T>
bool ThreadPool<T>::append(T * requests) {
// 操作工作队列时一定要加锁,因为它被所有线程共享。
m_queuelocker.lock();
if (m_workqueue.size() >= m_max_requests) {
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back(requests);
m_queuelocker.unlock();
m_queuestat.post();
return true;
}
// 创建的线程需要运行的函数
template<class T>
void * ThreadPool<T>::worker(void * arg) {
ThreadPool * p = (ThreadPool *) arg;
p->run();
return p;
}
// 线程池处理函数
template<class T>
void ThreadPool<T>::run() {
while (!m_stop) {
// 信号量不为0,可以运行
m_queuestat.wait();
m_queuelocker.lock();
// 获取队列第一个请求
T * requests = m_workqueue.front();
// 第一个请求已经取出执行,可以pop掉
m_workqueue.pop_front();
// 解锁
m_queuelocker.unlock();
if (requests == nullptr) {
continue;
}
// 请求的处理
requests->process();
}
}