思路
创建多个工作线程同时维护一个公共的任务队列, 任务队列非空时通过信号量唤醒阻塞等待的工作线程, 工作线程通过互斥锁互斥的从任务队列中取出任务, 然后执行任务
实现
信号量类
class sem {//封装信号量类
public:
sem(int num = 0) {
if (sem_init(&m_sem, 0, num) != 0) {
throw std::exception();
}
}
~sem() {
sem_destroy(&m_sem);
}
bool wait() {
return sem_wait(&m_sem) == 0;
}
bool post() {
return sem_post(&m_sem) == 0;
}
private:
sem_t m_sem;
};
互斥锁类
class locker {//封装互斥锁类
public:
locker() {
if (pthread_mutex_init(&m_mutex, NULL) != 0) {
throw std::exception();
}
}
~locker() {
pthread_mutex_destroy(&m_mutex);
}
bool lock() {
return pthread_mutex_lock(&m_mutex) == 0;
}
bool unlock() {
return pthread_mutex_unlock(&m_mutex) == 0;
}
pthread_mutex_t *get() {
return &m_mutex;
}
private:
pthread_mutex_t m_mutex;
};
线程池类
template<class T>
class threadpool {//线程池类
public:
threadpool(int thread_number, int max_request = 1000);
~threadpool();
bool append(T *request);//向请求队列种添加请求任务
private:
static void *worker(void *arg);//工作线程运行的函数: 不断从工作队列中取出任务并执行
void run();
private:
int m_thread_number;//线程池中的线程数
int m_max_requests;//请求队列中允许的最大请求数
pthread_t *m_threads;//线程ID数组,其大小为m_thread_number
std::list<T *> m_workqueue;//存放请求任务的队列
locker m_queuelocker;//保护请求队列的互斥锁
sem m_queuestat;//是否有任务需要处理
bool m_stop;//是否结束线程
};
template<class T>
threadpool<T>::threadpool(int thread_number, int max_request):m_thread_number(thread_number), m_max_requests(max_request), m_stop(false) {
if (thread_number <= 0 || max_request <= 0)
throw std::exception();
m_threads = new pthread_t[m_thread_number];
if (!m_threads)
throw std::exception();
for (int i = 0; i < thread_number; i++) {
if (pthread_create(m_threads + i, nullptr, worker, this) != 0 || pthread_detach(m_threads[i]) != 0) {//创建子线程并将其从主线程种分离
delete[]m_threads;
throw std::exception();
}
}
}
template<class T>
threadpool<T>::~threadpool() {
delete[] m_threads;
m_stop = true;// 修改标记, 使工作线程退出
}
template<class T>
bool threadpool<T>::append(T *request) {
m_queuelocker.lock();//互斥锁加锁
if (m_workqueue.size() > m_max_requests) {
m_queuelocker.unlock();//互斥锁解锁
return false;
}
m_workqueue.push_back(request);
m_queuelocker.unlock();//互斥锁解锁
m_queuestat.post();//唤醒阻塞的工作线程
}
template<class T>
void *threadpool<T>::worker(void *arg) {
threadpool<T> *pool = (threadpool<T> *) arg;
pool->run();//线程池对象创建出的工作线程能访问该线程池对象的成员
return pool;
}
template<class T>
void threadpool<T>::run() {//不断从工作队列中取出任务并执行
while (!m_stop) {
m_queuestat.wait();//等待唤醒
m_queuelocker.lock();//互斥锁加锁
if (m_workqueue.empty()) {
m_queuelocker.unlock();//互斥锁解锁
continue;
}
T *request = m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock();//互斥锁解锁
if (request)
request->process();//执行当前请求任务
}
}
测试任务类
class S {//用于测试的任务类
public:
S(int v = 0) : id(v) {}
void process() {//执行任务
printf("cur thread_id: %d, cur task_id: %d\n", std::this_thread::get_id(), id);//输出当前线程id和当前任务对象的val成员
fflush(stdout);//立即输出
_sleep(500);//暂停0.5s
}
private:
int id;
};
测试代码
#include<bits/stdc++.h>
#include <semaphore.h>
using namespace std;
class sem {//封装信号量类
public:
sem(int num = 0) {
if (sem_init(&m_sem, 0, num) != 0) {
throw std::exception();
}
}
~sem() {
sem_destroy(&m_sem);
}
bool wait() {
return sem_wait(&m_sem) == 0;
}
bool post() {
return sem_post(&m_sem) == 0;
}
private:
sem_t m_sem;
};
class locker {//封装互斥锁类
public:
locker() {
if (pthread_mutex_init(&m_mutex, NULL) != 0) {
throw std::exception();
}
}
~locker() {
pthread_mutex_destroy(&m_mutex);
}
bool lock() {
return pthread_mutex_lock(&m_mutex) == 0;
}
bool unlock() {
return pthread_mutex_unlock(&m_mutex) == 0;
}
pthread_mutex_t *get() {
return &m_mutex;
}
private:
pthread_mutex_t m_mutex;
};
template<class T>
class threadpool {//线程池类
public:
threadpool(int thread_number, int max_request = 1000);
~threadpool();
bool append(T *request);//向请求队列种添加请求任务
private:
static void *worker(void *arg);//工作线程运行的函数: 不断从工作队列中取出任务并执行
void run();
private:
int m_thread_number;//线程池中的线程数
int m_max_requests;//请求队列中允许的最大请求数
pthread_t *m_threads;//线程ID数组,其大小为m_thread_number
std::list<T *> m_workqueue;//存放请求任务的队列
locker m_queuelocker;//保护请求队列的互斥锁
sem m_queuestat;//是否有任务需要处理
bool m_stop;//是否结束线程
};
template<class T>
threadpool<T>::threadpool(int thread_number, int max_request):m_thread_number(thread_number), m_max_requests(max_request), m_stop(false) {
if (thread_number <= 0 || max_request <= 0)
throw std::exception();
m_threads = new pthread_t[m_thread_number];
if (!m_threads)
throw std::exception();
for (int i = 0; i < thread_number; i++) {
if (pthread_create(m_threads + i, nullptr, worker, this) != 0 || pthread_detach(m_threads[i]) != 0) {//创建子线程并将其从主线程种分离
delete[]m_threads;
throw std::exception();
}
}
}
template<class T>
threadpool<T>::~threadpool() {
delete[] m_threads;
m_stop = true;// 修改标记, 使工作线程退出
}
template<class T>
bool threadpool<T>::append(T *request) {
m_queuelocker.lock();//互斥锁加锁
if (m_workqueue.size() > m_max_requests) {
m_queuelocker.unlock();//互斥锁解锁
return false;
}
m_workqueue.push_back(request);
m_queuelocker.unlock();//互斥锁解锁
m_queuestat.post();//唤醒阻塞的工作线程
}
template<class T>
void *threadpool<T>::worker(void *arg) {
threadpool<T> *pool = (threadpool<T> *) arg;
pool->run();//线程池对象创建出的工作线程能访问该线程池对象的成员
return pool;
}
template<class T>
void threadpool<T>::run() {//不断从工作队列中取出任务并执行
while (!m_stop) {
m_queuestat.wait();//等待唤醒
m_queuelocker.lock();//互斥锁加锁
if (m_workqueue.empty()) {
m_queuelocker.unlock();//互斥锁解锁
continue;
}
T *request = m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock();//互斥锁解锁
if (request)
request->process();//执行当前请求任务
}
}
class S {//用于测试的任务类
public:
S(int v = 0) : id(v) {}
void process() {//执行任务
printf("cur thread_id: %d, cur task_id: %d\n", std::this_thread::get_id(), id);//输出当前线程id和当前任务对象的val成员
fflush(stdout);//立即输出
_sleep(500);//暂停0.5s
}
private:
int id;
};
int main() {
threadpool<S> tp(3);//创建最大工作线程数为3的线程池
for (int i = 0; i < 9; i++) {//执行9个测试任务
tp.append(new S(i));
}
_sleep(1500);
}
运行结果如下(输出顺序非固定), 可以看出所有任务都被执行且3个工作线程都能执行任务
参考:
最新版Web服务器项目详解 - 03 半同步半反应堆线程池(下)