一.什么是线程池?
线程池就是创建若干个可执行的线程放到容器中,有任务处理时,会提交到线程池中的任务队列中,线程处理完不是销毁,而是阻塞等待下一个任务。
二.为何要使用线程池?
- 降低资源消耗。重复利用创建好的线程减少线程创建和销毁造成的损耗。
- 提高响应速度。当任务到来时,不用等待创建就可以立即执行
三.线程池的设计实现
1、线程池的内部结构主要由四部分组成
- 1、第一部分是线程池管理器,它主要负责管理线程池的创建、销毁、添加任务等管理操作,它是整个线程池的管家。
- 2、第二部分是工作线程,也就是图中的线程 t0~t9,这些线程勤勤恳恳地从任务队列中获取任务并执行。
- 3、第三部分是任务队列,作为一种缓冲机制,线程池会把当下没有处理的任务放入任务队列中,由于多线程同时从任务队列中获取任务是并发场景,此时就需要任务队列满足线程安全的要求,所以线程池中任务队列采用 BlockingQueue 来保障线程安全。
- 4、 第四部分是任务,任务要求实现统一的接口,以便工作线程可以处理和执行。
C++实现一:
pthread_pool.hpp
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
using std::endl;
using std::cin;
using std::cout;
using std::queue;
template<class T>
class PthreadPool {
public:
PthreadPool(int num = 8)
:_num(num) {
pthread_mutex_init(&_lock, NULL);
pthread_cond_init(&_cond, NULL);
}
~PthreadPool() {
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
void Lock() {
pthread_mutex_lock(&_lock); //上锁
}
void Unlock() {
pthread_mutex_unlock(&_lock); //解锁
}
void WakeUp() { //唤醒进程, 通知消费者线程有任务了
pthread_cond_signal(&_cond); //唤醒单个进程
}
void Wait() { //没有任务, 消费者线程先阻塞起来等待任务
pthread_cond_wait(&_cond, &_lock); //条件变量要和一个锁关联起来才可以
}
bool IsEmptyQueue() {
return _taskqueue.empty();
}
static void* Routine(void* args)
{
PthreadPool* self = (PthreadPool*)args;
while (1) {
self->Lock();
while (self->IsEmptyQueue()) { //while 防止伪唤醒,
//wait
self->Wait();
}
//说明存在 task了/
T t;
self->PopTask(t);
self->Unlock();
t.Run(); //解锁后处理任务.... 做啥???
}
}
void PushTask(const T& in) { //传入参数, push task
Lock();
_taskqueue.push(in); //操作临界资源加锁
Unlock();
WakeUp();
}
void PopTask(T& out) { //传出参数 拿取任务
out = _taskqueue.front(); //拿取pop任务
_taskqueue.pop(); //任务队列pop 任务
}
void InitPthreadPool() {
pthread_t tid;
for (int i = 0; i < _num; ++i) {
pthread_create(&tid, NULL, Routine, this);
pthread_detach(tid); //回收线程资源
}
}
private:
int _num; //工作线程的数目
queue<T> _taskqueue; //任务队列
pthread_mutex_t _lock; //锁保证临界资源的互斥访问
pthread_cond_t _cond; //条件变量控制资源到来时候的唤醒工作
};
task.hpp
#pragma once
#include <iostream>
#include <pthread.h>
using std::endl;
using std::cout;
using std::cerr;
//typedef int (*handler_t ) (int, int, char);
class Task {
public:
Task(int x = 1, int y = 1, char op = '+') //默认构造
: _x(x)
, _y(y)
, _op(op)
{}
~Task(){}
void Run()
{
int z = 0;
switch(_op)
{
case '+':
{
z = _x + _y;
break;
}
case '-':
{
z = _x - _y;
break;
}
default:
{
cerr << "not support operator!" << endl;
break;
}
}
cout << "thread: [" << pthread_self() << "]: "<< _x << _op << _y << "=" << z << endl;
}
private:
int _x;
int _y;
char _op;
};
main.cpp
PthreadPool<Task> * tp = new PthreadPool<Task>();
tp->InitPthreadPool();
srand((unsigned long)time(nullptr));
const char* ops = "+-*/%";
while (1) {
int x = rand() % 123 + 1;
int y = rand() % 123 + 1;
Task t(x, y, ops[rand() % 5]);
tp->PushTask(t);
sleep(1);
}