Linux:线程池
- 线程池概念
- 封装线程
- 基本结构
- 构造函数
- 相关接口
- 线程类总代码
- 封装线程池
- 基本结构
- 构造与析构
- 初始化
- 启动与回收
- 主线程放任务
- 其他线程读取任务
- 终止线程池
- 测试
- 线程池总代码
线程池概念
线程池是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
线程池的应用场景:
- 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,线程池的优点就不明显了
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,但是短时间内产生大量线程可能使内存到达极限,出现错误
接下来本博客就在Linux
上实现一个线程池。
封装线程
线程池本质是把多个线程组织起来,然后统一使用这些线程,给它们派发任务,每个线程拿到任务后各自执行。
基本结构
既然要将线程组织起来,我们就要先用一个类来描述一个线程,比如线程的TID
,线程的名字等等。
首先定义一个类Thread
,其包含以下成员:
template <typename T>
using func_t = std::function<void(T&)>;
template <typename T>
class Thread
{
public:
private:
pthread_t _tid; // 线程TID
std::string _threadName; // 线程名
func_t<T> _func; // 线程执行的函数
T _data; // 执行函数要传入的参数
};
第一个成员_tid
,就是该线程的TID
,第二个参数_threadName
就是线程的名字。
创建线程的目的,是为了让线程去执行函数,那么当然要有一个成员来记录这个线程执行什么函数。此处第三个成员_func
就是被线程执行的函数,其类型为func_t<T>
:
template <typename T>
using func_t = std::function<void(T&)>;
也即是说func_t<T>
类型,是一个void (T&)
类型的函数,返回值为空,可以传入一个T
类型的参数。而线程的最后一个成员_data
就是被传入的参数。
当用户使用这个线程类时,需要给出线程要执行的函数
,该函数的参数
,另外的还要线程的名字
。
构造函数
弄清需求后,我们就可以很好写出该线程的构造函数了:
Thread(func_t<T> func, const T& data, std::string threadName = "none")
: _func(func)
, _data(data)
, _threadName(threadName)
{}
构造函数有三个参数,第一个func
用于初始化线程调用的函数,第二个data
用于初始化要给函数传入的参数,第三个用于指定线程的名字,默认值为none
。
相关接口
那么我们的线程类又要提供哪些接口?
目前为止我们还没有真正创建一个线程,而是通过类成员保存了线程的相关信息,那么我们就要通过这些线程的相关信息,来创建线程了。
第一个问题便是:函数pthread_create
用于创建线程,要指定一个void* (*)(void*)
类型的函数指针,但是初始化线程是,用户传入的函数是void (T&)
类型,这要咋办?
很简单:先调用一个void* (void*)
类型的中间函数threadEntry
,pthread_create
先传入该函数,随后线程就会去执行threadEntry
,再在threadEntry
内部调用用户指定的函数并传入数据:_func(_data)
。
我先写一个版本:
template <typename T>
class Thread
{
public:
void* threadEntry(void* args)
{
_func(_data);
return nullptr;
}
bool start()
{
int ret = pthread_create(&_tid, nullptr, threadEntry, nullptr);
return ret == 0;
}
};
在start
函数中,通过pthread_create
创建了线程,线程的TID
交给类成员_tid
,随后线程去调用threadEntry
,在threadEntry
内部调用_func(_data)
,即调用用户传入的函数。
这可行吗?我尝试编译一下:
编译报错了,报错为:invalid use of non-static member function
,简单来说就是:错误的调用了非静态成员函数。
为什么呢?不妨再仔细想想threadEntry
的类型真的是void* (*)(void*)
吗?该函数处于类的内部,属于非静态成员函数,第一个参数为this
指针,因此我们要把这个函数用static
修饰,让其变为静态成员函数,此时它的类型才是void* (*)(void*)
。
static void* threadEntry(void* args)
{
_func(_data);
return nullptr;
}
现在问题又来了,由于没有this
指针,该函数是得不到_func
和_data
这两个成员的,这该怎么办?
别忘了,pthread_create
是可以给函数传参的,我们只需要把this
指针作为threadEntry
的参数传入,随后通过this
指针访问_func
和_data
。
template <typename T>
class Thread
{
public:
static void* threadEntry(void* args)
{
Thread* self = static_cast<Thread*>(args);
self->_func(self->_data);
return nullptr;
}
bool start()
{
int ret = pthread_create(&_tid, nullptr, threadEntry, this);
return ret == 0;
}
};
在pthread_create
中,第四个参数传入this
,那么函数threadEntry
的第一个参数args
就是this
指针了,通过 static_cast<Thread*>(args)
将其转化为Thread*
类型,赋值给self
变量。此时self->_func(self->_data);
就可以调用函数了。
最后再支持一下datach
线程分离,join
线程等待:
void deatch()
{
pthread_detach(_tid);
}
void join()
{
pthread_join(_tid, nullptr);
}
现在我们封装好了一个线程类
。
实验一下这个线程类是否有效:
void test(int args)
{
while(true)
{
cout << args << endl;
sleep(1);
}
}
int main()
{
Thread<int> t(test, 2024, "thread-1");
t.start();
t.join();
return 0;
}
在main
函数中,Thread<int> t(test, 5, "thread-1")
定义了一个线程对象,执行的函数为test
,给test
传入的参数为2024
,线程名为thread-1
。
如果创建成功,那么线程就会去执行test
函数,并且循环输出2024
。
输出结果:
输出正确,说明我们的线程类没有问题。
线程类总代码
我把这个Thread
类放进头文件thread.hpp
中,方便后续使用。
thread.hpp
代码如下:
#pragma once
#include <iostream>
#include <functional>
#include <unistd.h>
#include <pthread.h>
template <typename T>
using func_t = std::function<void(T&)>;
template <typename T>
class Thread
{
public:
Thread(func_t<T> func, const T& data, std::string threadName = "none")
: _func(func)
, _data(data)
, _threadName(threadName)
{}
static void* threadEntry(void* args)
{
Thread* self = static_cast<Thread*>(args);
self->_func(self->_data);
return nullptr;
}
bool start()
{
int ret = pthread_create(&_tid, nullptr, threadEntry, this);
return ret == 0;
}
void deatch()
{
pthread_detach(_tid);
}
void join()
{
pthread_join(_tid, nullptr);
}
private:
pthread_t _tid; // 线程TID
std::string _threadName; // 线程名
func_t<T> _func; // 线程执行的函数
T _data; // 执行函数要传入的参数
};
封装线程池
现在我们通过类Thread
描述了一个线程
,那么就可以用线程池来组织这些线程了。
当前目录结构如下:
内部有三个文件,第一个文件是主程序main.cpp
,以及两个自己的头文件,Thread.hpp
是刚刚封装的线程类,我们将在ThreadPool.hpp
内部实现线程池。
基本结构
线程池的运行模式如下:
线程池内部维护多个线程和一个任务队列,主线程往任务队列中放任务,线程池内部的线程则执行任务队列中的任务。
那么毫无疑问的就是:线程池内部至少要有一个数组
管理多个线程,以及一个队列
来放任务!
线程池threadPool
内部的成员如下:
template <typename T>
class threadPool
{
private:
int _threadNum; // 线程总数
int _waitNum; // 正在等待任务的线程数目
bool _isRunning; // 当前线程池是否运行
std::vector<Thread<std::string>> _threads; // 用数组管理多个线程
std::queue<T> _taskQueue; // 任务队列
};
在threadPool
中,有两个成员:数组_threads
,任务队列_task_queue
。
我们先前封装的Thread
中,模板参数T
用于给线程执行的函数指定参数类型。在此我固定其为string
类型,后续线程执行函数时,该参数用于传入线程的名字。
另外的,我还额外指定了三个成员:
_threadNum
:标识当前线程池的线程总数_waitNum
:当前有几个线程在等待任务_isRunning
:用于终止线程池
这三个成员都是对线程池本身的描述。
但是我们目前忽略了一个问题,也是多线程编程最重要的问题:线程的互斥与同步
。
我们的任务是:主线程往队列放任务,其它线程从队列拿任务。那么就要考虑以下几个问题:
- 多个线程可以同时拿任务吗?不能,任务队列是临界资源,线程与线程之间要互斥。
- 可以主线程放任务时,其他线程拿任务吗?不能,主线程与执行任务的线程也要互斥。
由于它们都在竞争任务队列这一个资源,我们只要用一把互斥锁即可完成以上的所有互斥。主线程和执行任务的线程都去争夺一把锁,争到锁的线程才可以访问任务队列。
接下来就是同步问题:
毫无疑问的是:只有任务队列里面有任务时,线程才能去任务队列中拿任务。因此要主线程先放任务,其他线程后拿任务,这就要一个条件变量来维护。
因此我们还要两个成员:
template <typename T>
class threadPool
{
private:
int _threadNum; // 线程总数
int _waitNum; // 正在等待任务的线程数目
bool _isRunning; // 当前线程池是否运行
std::vector<Thread<std::string>> _threads; // 用数组管理多个线程
std::queue<T> _taskQueue; // 任务队列
pthread_mutex_t _mutex; // 互斥锁,维护任务队列
pthread_cond_t _cond; // 条件变量,保证主线程与其他线程之间的同步
};
构造与析构
接下来先写线程池的构造函数和析构函数,在构造函数内部要完成的自然就包括:_mutex
的初始化,_cond
的初始化。而析构函数的任务自然是销毁它们。
template <typename T>
class threadPool
{
public:
threadPool(int threadNum = 5)
: _threadNum(threadNum)
, _waitNum(0)
, _isRunning(false)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
~threadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
};
用户创建线程池的时候,只要输入一个数字,表名该线程池内部要有几个线程即可。一开始_isRunning
是false
,表示线程还没有开始运行。
构造函数只是创建了锁,条件变量,以及各个线程内部的基本信息而已,此时数组_threads
,还没有任何元素。也就是说我们目前连线程对象都没创建出来。
初始化
我们在此用一个init
函数来初始化线程,创建出线程对象,创建Thread
对象时,要传入三个参数:
Thread(func_t<T> func, const T& data, std::string threadName = "none")
这是我们刚刚写的Thread
构造函数,第一个参数传线程要调用的函数,第二个参数传func
的第一个参数,第三个参数传线程名。
在此我们让线程去执行一个叫做handlerTask
的函数,这个函数内部实现线程的到任务队列拿任务的过程。
而handlerTask
的第一个参数也是线程的名字,以便在handlerTask
内部识别是哪一个线程执行了任务。
如下:
template <typename T>
class threadPool
{
public:
void handlerTask(std::string)
{
//执行任务队列的任务
}
void init()
{
for (int i = 1; i <= _threadNum; i++)
{
std::string name = "Thread-" + std::to_string(i);
_threads.emplace_back(handlerTask, name, name);
}
_isRunning = true;
}
};
init
中,一个for
循环创建_threadNum
个线程,第i
号线程的名字是Thread-i
。
数组尾插时,(handlerTask, name, name)
三个参数,分别是:线程要执行的函数
,线程名
,线程名
。我们在此传入了两个线程名,但是作用不一样。一个是handlerTask
的参数,一个是Thread
内部的成员。
所以线程构建完毕后,_isRunning = true
,表示线程开始运作了。
但是以上代码还是犯了一个相同的错误,Thread
的函数类型要求是void (T&)
,我们限制了T = string
,那就是void (string&)
。但是handlerTask
是非静态成员函数,所以要加static
。
在此我用C++中的包装器bind
来实现:
void init()
{
auto func = bind(&threadPool::handlerTask, this, std::placeholders::_1);
for (int i = 1; i <= _threadNum; i++)
{
std::string name = "Thread - " + std::to_string(i);
_threads.emplace_back(func, name, name);
}
_isRunning = true;
}
通过包装器,我把handlerTask
的第一个参数绑定为了this
,使得类型变为function<void(string&)>
,从而符合Thread
的构造函数。可以理解为,此时变量func
就是函数handlerTask
,不过类型变为了function<void(string&)>
,原先的第一个参数固定为this
指针。
这个handlerTask
函数我们稍后实现。
启动与回收
到目前为止,我们已经创建好了一批线程,并且指定了指向handlerTask
函数,但是限制线程还没有被启动。当时我们封装线程类时,给Therad
一个start
来启动线程。此处的线程池也要一个allStart
来调用所有线程start
。另外的,也要一个allJoin
来调用所有的join
,回收线程。
代码:
template <typename T>
class threadPool
{
public:
void allStart()
{
for (auto& th : _threads)
th.start();
}
void allJoin()
{
for (auto& th : _threads)
th.join();
}
};
主线程放任务
现在先写一个enQueue
接口,让主线程往任务队列中投放任务。
投放任务的要求是:
- 访问队列要与其他线程互斥,即对
_mutex
加锁 - 添加任务后,此时一个线程就可以去访问任务队列了,也就是线程同步
代码:
template <typename T>
class threadPool
{
public:
void enQueue(const T& task)
{
pthread_mutex_lock(&_mutex);
if (_isRunning)
{
_taskQueue.push(task);
if (_waitNum > 0)
pthread_cond_signal(&_cond);
}
pthread_mutex_unlock(&_mutex);
}
};
函数的参数为const T& task
,即我们的任务类型是T
,这个T
最好是一个可调用对象,后续其它线程从任务队列拿任务时,就可以调用这个函数。
首先对_mutex
加锁,确保主线程投放任务时,没有其他线程正在访问队列。随后通过push
把这个任务放进队列中。如果waitNum > 0
,说明当前有线程在等待任务,通过pthread_cond_signal
唤醒一个线程,让他来执行任务。
一切完毕后,释放自己的锁。
其他线程读取任务
现在就到了线程池最复杂的一部分,那就是其他线程读取任务的过程。
- 线程要保持互斥,从任务队列拿任务时,要对
_mutex
加锁 - 其它线程要与主线程同步,当任务队列为空,就去
_cond
下面等待
先写一个雏形:
template <typename T>
class threadPool
{
public:
void handlerTask(std::string name)
{
while (true)
{
pthread_mutex_lock(&_mutex);
while (_taskQueue.empty())
{
_waitNum++;
pthread_cond_wait(&_cond, &_mutex);
_waitNum--;
}
T task = _taskQueue.front();
_taskQueue.pop();
std::cout << name << " get a task..." << std::endl;
pthread_mutex_unlock(&_mutex);
task();
}
}
};
访问队列前,首先对_mutex
加锁,保证互斥。随后进行条件判断,taskQueue
是否有任务,如果有任务,就直接拿走任务然后执行。如果没有任务,就去_cond
下面等待。此时_waitNum++
表示等待的线程多了一个,当从pthread_cond_wait
等待结束后,就要_waitNum--
。
这个地方套了一个while
循环,而不是if
语句,这是因为哪怕当前线程被主线程唤醒了,也有可能发生伪唤醒
,其实_taskQueue
内部根本没有任务。所以还要进入下一次while
判断,确保访问_taskQueue
时一定是有任务的。
当从while
出来后,此时任务队列一定有任务,所以可以放心调用front
和pop
接口。拿到任务后,赋值给task
。这里要先解锁,后调度task
。因为调度task
时,已经不算访问临界资源了,而调度函数的时间可能很长,此时先把锁释放掉,让其他线程拿任务,而不是自己执行完任务后才让别的线程拿任务,这样和单线程就没有区别了。
但是目前还有一个问题:如果线程访问任务队列时,线程池已经被终止了咋办?
线程池的终止与否,是通过成员_isRunning
来判定的,在执行任务时判断一下_isRunning
的值:
- 如果当前没终止:正常运行
- 如果当前终止了:
- 如果任务队列还有任务:把任务执行完
- 如果任务队列没任务:当前线程退出
那么我们的代码就变成下面这样:
template <typename T>
class threadPool
{
public:
while (true)
{
pthread_mutex_lock(&_mutex);
while (_taskQueue.empty() && _isRunning)
{
_waitNum++;
pthread_cond_wait(&_cond, &_mutex);
_waitNum--;
}
//线程池终止了,并且队列中没有任务了 -> 线程退出
if (_taskQueue.empty() && !_isRunning)
{
pthread_mutex_unlock(&_mutex);
std::cout << name << " quit..." << std::endl;
break;//线程离开while循环,同时线程退出
}
//走到这一步:一定还有任务要执行,不论线程池有没有终止,都先把任务做完
T task = _taskQueue.front();
_taskQueue.pop();
std::cout << name << " get a task..." << std::endl;
pthread_mutex_unlock(&_mutex);
task();
}
};
以上代码大致分为三个区域:第一个while
判断_taskQueue.empty() && _isRunning
,如果进条件变量等待,那么必须是:线程池还没终止,并且当且队列为空。
如果线程池终止了,那么此时要么去拿任务,要么直接退出。如果队列不为空,毫无疑问就去拿任务。
随后进入第二个判断语句if (_taskQueue.empty() && !_isRunning)
,即判断刚刚的while
循环是哪一种情况结束的。如果是线程池结束,并且任务队列为空
,那么就终止这个线程。剩下的情况,就是任务队列有任务
,此时不论线程有没有退出,都要把任务拿走执行掉。
终止线程池
终止线程池也不仅仅是直接_isRunning = false
这么简单,要考虑一下问题:
- 如果在
stop
时,有线程正在调用handlerTask
函数怎么办?
此时多个线程访问变量_isRunning
,就有可能会造成线程安全问题,所以访问_isRunning
时也要加锁,由于之前所有的访问_siRuiing
的操作,都在_mutex
锁中,所以和之前共用一把锁即可。
- 如果
stop
后,还有线程在_cond
下面等待怎么办?
如果线程一直在_cond
下面等待,就会导致无法退出,此时在_isRunning = false
之后,还要通过pthread_cond_broadcast
唤醒所有等待的线程,让它们重新执行handlerTask
的逻辑,从而正常退出。
代码:
template <typename T>
class threadPool
{
public:
void stop()
{
pthread_mutex_lock(&_mutex);
_isRunning = false; //终止线程池
pthread_cond_broadcast(&_cond); //唤醒所有等待的线程
pthread_mutex_unlock(&_mutex);
}
};
测试
现在我们已经有一个比较完整的线程池代码了,我们用以下代码测试一下:
int test()
{
int a = rand() % 100 + 1;
int b = rand() % 100 + 1;
std::cout << a << " + " << b << " = " << a + b << std::endl;
return a + b;
}
int main()
{
srand(static_cast<unsigned int>(time(nullptr)));
threadPool<int(*)(void)> tp(3);
tp.init();
tp.allStart();
for (int i = 0; i < 10; i++)
{
tp.enQueue(test);
sleep(1);
}
tp.stop();
tp.allJoin();
return 0;
}
通过threadPool<int(*)(void)> tp(3);
创建有三个线程的线程池,执行的任务类型为int(void)
,但是要注意,此处要传入可调用对象
,C++的可调用对象有:函数指针
,仿函数,lambda表达式
。此处我用了函数指针int(*)(void)
。
接着init
初始化线程池,此时线程对象Thread
已经创建出来了,但是还有没创建线程。随后调用allStart
,此时才真正创建了线程。
然后进入一个for
循环,给任务队列派发任务,总共派发十个任务,都是函数test
,其中生成两个随机数的加法。
最后调用stop
终止退出线程池,此时线程也会一个个退出,然后调用allJoin
回收所有线程。
输出结果:
最后可以看到,我们创建了三个线程,每个线程都依次拿到了任务,并且执行后计算出了结果。十个任务结束后,三个线程依次退出。
线程池总代码
我将线程池封装在文件ThreadPool.hpp
中:
#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <string>
#include <unistd.h>
#include <pthread.h>
#include "Thread.hpp"
template <typename T>
class threadPool
{
public:
threadPool(int threadNum = 5)
: _threadNum(threadNum)
, _waitNum(0)
, _isRunning(false)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
~threadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
void enQueue(const T& task)
{
pthread_mutex_lock(&_mutex);
if (_isRunning)
{
_taskQueue.push(task);
if (_waitNum)
pthread_cond_signal(&_cond);
}
pthread_mutex_unlock(&_mutex);
}
void handlerTask(std::string name)
{
while (true)
{
pthread_mutex_lock(&_mutex);
while (_taskQueue.empty() && _isRunning)
{
_waitNum++;
pthread_cond_wait(&_cond, &_mutex);
_waitNum--;
}
//线程池终止了,并且队列中没有任务了 -> 线程退出
if (_taskQueue.empty() && !_isRunning)
{
pthread_mutex_unlock(&_mutex);
std::cout << name << " quit..." << std::endl;
break;//线程离开while循环,同时线程退出
}
//走到这一步:一定还有任务要执行,不论线程池有没有终止,都先把任务做完
T task = _taskQueue.front();
_taskQueue.pop();
std::cout << name << " get a task..." << std::endl;
pthread_mutex_unlock(&_mutex);
task();
}
}
void init()
{
auto func = bind(&threadPool::handlerTask, this, std::placeholders::_1);
for (int i = 1; i <= _threadNum; i++)
{
std::string name = "Thread - " + std::to_string(i);
_threads.emplace_back(func, name, name);
}
_isRunning = true;
}
void stop()
{
pthread_mutex_lock(&_mutex);
_isRunning = false; //终止线程池
pthread_cond_broadcast(&_cond); //唤醒所有等待的线程
pthread_mutex_unlock(&_mutex);
}
void allStart()
{
for (auto& th : _threads)
th.start();
}
void allJoin()
{
for (auto& th : _threads)
th.join();
}
private:
int _threadNum; // 线程总数
int _waitNum; // 正在等待任务的线程数目
bool _isRunning; // 当前线程池是否运行
std::vector<Thread<std::string>> _threads; // 用数组管理多个线程
std::queue<T> _taskQueue; // 任务队列
pthread_mutex_t _mutex; // 互斥锁,维护任务队列
pthread_cond_t _cond; // 条件变量,保证主线程与其他线程之间的同步
};
测试代码main.cpp
:
#include <iostream>
#include <vector>
#include <string>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
#include "ThreadPool.hpp"
int test()
{
int a = rand() % 100 + 1;
int b = rand() % 100 + 1;
std::cout << a << " + " << b << " = " << a + b << std::endl;
return a + b;
}
int main()
{
srand(static_cast<unsigned int>(time(nullptr)));
threadPool<int(*)(void)> tp(3);
tp.init();
tp.allStart();
for (int i = 0; i < 10; i++)
{
tp.enQueue(test);
sleep(1);
}
tp.stop();
tp.allJoin();
return 0;
}