目录
- 一、线程池的概念
- 二、线程池的核心组件
- 三、数据结构设计
- 1、任务队列
- 2、线程池
- 四、接口设计
- 1、创建线程池
- 2、销毁线程池
- 3、抛出任务的接口
- 五、实现一个线程池及测试
- 1、测试单生成者——多消费者
- 2、测试多生产者——多消费者
- 3、thrd_pool.h
- 4、thrd_pool.c
- 5、main.c
- 6、thrdpool_test.cc
一、线程池的概念
业务开发过程中,如果遇到某些耗时特别严重的任务,我们会想着把它们抛给其他线程进行异步处理。但是线程频繁的创建与销毁,会造成大量的系统开销。因此,我们希望有一些备用线程,需要时候从中取出,不需要的时候等待休眠。这就引出了线程池的概念 —— 线程池是管理维持固定数量线程 的池式结构。
(1)总结一下,为什么需要线程池?
某些任务特别耗时,严重影响该线程处理其他任务,但又不想频繁创建销毁线程,就需要把这些任务抛给线程池进行异步处理。这样可以异步执行耗时任务,复用线程资源,充分利用系统资源。
(2)为什么是固定数量呢?
这是因此线程作为系统资源,需要系统进行调度,并不是越多越好。随着线程数量的增加,由于系统戏院的限制,不再带来性能的提升,反而是负担。
就好比一个老师能管理的学生数量也是有限的,超过了这个界限,效果反而会下降。
(3)如何决定数量呢?
这得根据任务进行区分。如果是CPU密集型,一般等于CPU核心数;如果是I/O密集型,一般是2倍的CPU核心数。
经验公式:
(
I
/
O
等待时间
+
C
P
U
运算时间
)
×
核心数
C
P
U
运算时间
\frac{(I/O等待时间+CPU运算时间)\times 核心数}{CPU运算时间}
CPU运算时间(I/O等待时间+CPU运算时间)×核心数
二、线程池的核心组件
首先,线程池是属于生产消费模型。因此,线程池运行环境构成:1)生产者线程:发布任务;2)消费者线程:取出任务,执行任务
其次,我们需要有一个任务队列,存储任务结点,其中包括异步执行任务的上下文、执行函数等,起到联系生产者线程和消费者线程的作用。
另外,生产者不一定时时刻刻都有任务,如果当生产者不发布任务时,消费者线程还在空转等待,那就特别浪费系统资源。因此,需要设计一个机制来调度消费者:
1)当有任务进来时,会唤醒消费者线程,取出并执行任务。
2)当没有任务时,让消费者线程进行休眠,让出执行权。
由于任务队列是生产者线程和消费者线程的桥梁,因此这个调度工作当仁不让安排给任务队列。
因此,就有了线程池的三个核心组件:
1)生产者线程:发布任务,通知一个消费者线程需要唤醒;
2)任务队列:存储任务结点,其中包括异步执行任务的上下文、执行函数等;调度线程池的状态(唤醒 or 休眠),通过锁的方式。
3)消费者线程:取出任务,执行任务。
线程池的运行流程
1)首先存在生产者线程。然后启动若干个消费者线程,交由线程池进行管理。一开始没有任务,消费者线程处于休眠状态。
2)出现一个耗时严重的任务,生产者将其加入到任务队列。
3)任务队列唤醒消费者线程,消费者线程从队列取出任务,并执行。
4)再次检查任务队列,有任务再次唤醒消费者线程取出执行。没有任务,就让消费者线程休眠。
三、数据结构设计
1、任务队列
//任务队列的结点
typedef struct task_s {
task_t *next; //指向下一个任务的指针
handler_pt func; //任务的执行函数
void *arg; //任务的上下文
} task_t;
//任务队列
//默认是阻塞类型的队列,谁来取任务,如果此时队列为空,谁应该阻塞休眠
typedef struct task_queue_s {
void *head; //头指针
void **tail; //指向队尾的指针的指针
int block; //设置当前是否阻塞类型
spinlock_t lock;
pthread_mutex_t mutex;
pthread_cond_t cond;
} task_queue_t;
这边需要着重介绍一下**tail,先看下面两个
task_t *p : p用于存储指向 task_t 对象的地址,因此p占8个字节,*p占24个字节。
task_t q:q用于存储指向 task_t 类型指针的地址,task_t 类型指针占8个字节.
也就是指向task_t结构体中前8个字节的区域,因此*q == task->next。
也就是说,p和q虽然都指向的是task_t,但是q是指针的指针,因此存储的是 task_t* 类型指针(task_t前8个字节的区域)的地址。
2、线程池
struct thrdpool_s {
task_queue_t *task_queue;
atomic_int quit; //标志是否让线程退出
int thrd_count; //线程的数量
pthread_t *thread;
};
四、接口设计
接口设计是暴露给用户使用的,但隐藏具体的实现细节。
接口设计的细节请看第五节代码thrd_pool.c部分的注释
1、创建线程池
thrdpool_t *thrdpool_create(int thrd_count);
2、销毁线程池
int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg);
3、抛出任务的接口
void thrdpool_terminate(thrdpool_t * pool);
void thrdpool_waitdone(thrdpool_t *pool);
五、实现一个线程池及测试
1、测试单生成者——多消费者
生成动态链接库
gcc -c -fPIC thrd_pool.c
gcc -shared thrd_pool.o -o libthrd_pool.so -I/. -lpthread
gcc -Wl,-rpath=./ main.c -o main -I./ -L./ -lthrd_pool -lpthread
./main
2、测试多生产者——多消费者
g++ -Wl,-rpath=./ thrdpool_test.cc -o thrdpool_test -I./ -L./ -lthrd_pool -lpthread
./thrdpool_test
3、thrd_pool.h
#ifndef _THREAD_POOL_H
#define _THREAD_POOL_H
typedef struct thrdpool_s thrdpool_t;
typedef void (*handler_pt)(void *);
#ifdef __cplusplus
extern "C"
{
#endif
// 对称处理
thrdpool_t *thrdpool_create(int thrd_count);
void thrdpool_terminate(thrdpool_t * pool);
int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg);
void thrdpool_waitdone(thrdpool_t *pool);
#ifdef __cplusplus
}
#endif
#endif
4、thrd_pool.c
#include <pthread.h>
#include <stdint.h>
#include <stdlib.h>
#include "thrd_pool.h"
#include "spinlock.h"
typedef struct spinlock spinlock_t;
//任务队列的结点
typedef struct task_s {
void *next; //指向下一个任务的指针
handler_pt func; //任务的执行函数
void *arg; //任务的上下文
} task_t;
//任务队列
//默认是阻塞类型的队列,谁来取任务,如果此时队列为空,谁应该阻塞休眠
typedef struct task_queue_s {
void *head; //头指针
void **tail; //指向队尾的指针的指针
int block; //设置当前是否阻塞类型
spinlock_t lock;
pthread_mutex_t mutex;
pthread_cond_t cond;
} task_queue_t;
//线程池
struct thrdpool_s {
task_queue_t *task_queue;
atomic_int quit; //标志是否让线程退出,原子操作
int thrd_count; //线程的数量
pthread_t *thread;
};
//创建任务队列
static task_queue_t * __taskqueue_create() {
task_queue_t *queue = (task_queue_t *)malloc(sizeof(task_queue_t));
if(!queue) return NULL;
int ret;
ret = pthread_mutex_init(&queue->mutex, NULL);
//若初始化成功
if (ret == 0){
ret = pthread_cond_init(&queue->cond, NULL);
if (ret == 0){
spinlock_init(&queue->lock);
queue->head = NULL;
queue->tail = &queue->head;
queue->block = 1; //设置为阻塞
return queue;
}
pthread_cond_destroy(&queue->cond);
}
//若初始化失败
pthread_mutex_destroy(&queue->mutex);
return NULL;
}
static void __nonblock(task_queue_t *queue){
pthread_mutex_lock(&queue->mutex);
queue->block = 0;
pthread_mutex_unlock(&queue->mutex);
pthread_cond_broadcast(&queue->cond);
}
//插入新的任务task
static inline void __add_task(task_queue_t *queue, void *task){
void **link = (void **)task;
*link = NULL; //等价于task->next = NULL
spinlock_lock(&queue->lock);
*queue->tail = link; //将最后一个结点的next指向task
queue->tail = link; //更新尾指针
spinlock_unlock(&queue->lock);
//生产了新的任务,通知消费者
pthread_cond_signal(&queue->cond);
}
//取出结点(先进先出)
static inline void *__pop_task(task_queue_t *queue){
spinlock_lock(&queue->lock);
if (queue->head == NULL){
spinlock_unlock(&queue->lock);
return NULL;
}
task_t *task;
task = queue->head;
queue->head = task->next;
if (queue->head == NULL){
queue->tail = &queue->head;
}
spinlock_unlock(&queue->lock);
return task;
}
// 消费者线程取出任务
static inline void *__get_task(task_queue_t *queue){
task_t *task;
/*虚假唤醒:当把一个线程唤醒之后,但是其他消费者线程提前把这个任务取走了。
此时__pop_task(queue)为 NULL。
因此,需要用while循环进行判断,如果任务队列为NULL,继续休眠*/
//若队列为空,休眠
while ((task = __pop_task(queue)) == NULL){
pthread_mutex_lock(&queue->mutex);
if (queue->block == 0){
pthread_mutex_unlock(&queue->mutex);
return NULL;
}
/*
1. unlock(&mutex),让出执行权
2. 在cond处休眠
3. 当生产者产生任务,发送信号signal
4. 在cond处唤醒
5. lonck(&mutx),接管执行权
*/
pthread_cond_wait(&queue->cond, &queue->mutex);
pthread_mutex_unlock(&queue->mutex);
}
return task;
}
//销毁任务队列
static void __taskqueue_destory(task_queue_t *queue){
task_t *task;
while ((task = __pop_task(queue))){
free(task);
}
spinlock_destroy(&queue->lock);
pthread_cond_destroy(&queue->cond);
pthread_mutex_destroy(&queue->mutex);
free(queue);
}
//消费者线程的工作——取出任务,执行任务
static void *__thrdpoll_worker(void *arg){
thrdpool_t *pool = (thrdpool_t *)arg;
task_t *task;
void *ctx;
while (atomic_load(&pool->quit) == 0 ){
task = (task_t *)__get_task(pool->task_queue);
if (!task) break;
handler_pt func = task->func;
ctx = task->arg;
free(task);
func(ctx);
}
return NULL;
}
//停止
static void __threads_terminate(thrdpool_t *pool){
atomic_store(&pool->quit, 1);
/*默认情况下,线程池中的线程在执行任务时可能会阻塞等待任务的到来。
如果在线程池退出时,仍有线程处于等待任务的阻塞状态,那么这些线程将无法在有新任务时重新启动并执行。
调用 __nonblock 函数可以将任务队列设置为非阻塞状态,确保所有线程都能正确地退出,
而不会被阻塞在等待任务的状态中。*/
__nonblock(pool->task_queue);
int i;
for (i=0; i<pool->thrd_count; i++){
pthread_join(pool->thread[i], NULL); //阻塞调用它的线程,直到指定的线程结束执行。
}
}
//创建线程池
static int __threads_create(thrdpool_t *pool, size_t thrd_count){
pthread_attr_t attr;
int ret;
ret = pthread_attr_init(&attr);
if (ret == 0){
pool->thread = (pthread_t *)malloc(sizeof(pthread_t) * thrd_count);
if (pool->thread){
int i = 0;
for (i=0; i < thrd_count; i++){
if (pthread_create(&pool->thread[i], &attr, __thrdpoll_worker, pool) != 0){
break;
}
}
pool->thrd_count = i;
pthread_attr_destroy(&attr);
if (i == thrd_count)
return 0;
//如果实际创建的线程数 != thrd_count,停止当前创建的线程
__threads_terminate(pool);
free(pool->thread);
}
ret = -1;
}
return ret;
}
//停止线程-------用户调用的接口
void thrdpool_terminate(thrdpool_t * pool) {
atomic_store(&pool->quit, 1);
__nonblock(pool->task_queue);
}
//创建线程-------用户调用的接口
thrdpool_t *thrdpool_create(int thrd_count){
thrdpool_t *pool;
pool = (thrdpool_t*)malloc(sizeof(thrdpool_t));
if (!pool) return NULL;
//创建任务队列
task_queue_t *queue = __taskqueue_create();
if (queue){
pool->task_queue = queue;
atomic_init(&pool->quit, 0);
if(__threads_create(pool, thrd_count) == 0){
return pool;
}
__taskqueue_destory(pool->task_queue);
}
free(pool);
return NULL;
}
//生产者抛出任务到线程池(加到任务队列)
int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg){
if (atomic_load(&pool->quit) == 1){
return -1;
}
task_t *task = (task_t *)malloc(sizeof(task_t));
if (!task) return -1;
task->func = func;
task->arg = arg;
__add_task(pool->task_queue, task);
return 0;
}
//等待线程池中的所有线程完成任务,并清理线程池的资源
void thrdpool_waitdone(thrdpool_t *pool){
int i;
for (i=0; i<pool->thrd_count; i++){
pthread_join(pool->thread[i], NULL);
}
__taskqueue_destory(pool->task_queue);
free(pool->thread);
free(pool);
}
5、main.c
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include "thrd_pool.h"
/**
* author: mark
* QQ: 2548898954
* shell: g++ taskqueue_test.cc -o taskqueue_test -lgtest -lgtest_main -lpthread
*/
int done = 0;
pthread_mutex_t lock;
void do_task(void *arg) {
thrdpool_t *pool = (thrdpool_t*)arg;
pthread_mutex_lock(&lock);
done++;
printf("doing %d task\n", done);
pthread_mutex_unlock(&lock);
if (done >= 1000) {
thrdpool_terminate(pool);
}
}
void test_thrdpool_basic() {
int threads = 8;
pthread_mutex_init(&lock, NULL);
thrdpool_t *pool = thrdpool_create(threads);
if (pool == NULL) {
perror("thread pool create error!\n");
exit(-1);
}
while (thrdpool_post(pool, &do_task, pool) == 0) {
}
thrdpool_waitdone(pool);
pthread_mutex_destroy(&lock);
}
int main(int argc, char **argv) {
test_thrdpool_basic();
return 0;
}
6、thrdpool_test.cc
#include "thrd_pool.h"
#include <bits/types/time_t.h>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <atomic>
#include <thread>
#include <iostream>
#include <unistd.h>
/**
* shell: g++ -Wl,-rpath=./ thrdpool_test.cc -o thrdpool_test -I./ -L./ -lthrd_pool -lpthread
*/
time_t GetTick() {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch()
).count();
}
std::atomic<int64_t> g_count{0};
void JustTask(void *ctx) {
++g_count;
}
constexpr int64_t n = 1000000;
void producer(thrdpool_t *pool) {
for(int64_t i=0; i < n; ++i) {
thrdpool_post(pool, JustTask, NULL);
}
}
void test_thrdpool(int nproducer, int nconsumer) {
auto pool = thrdpool_create(nconsumer);
for (int i=0; i<nproducer; ++i) {
std::thread(&producer, pool).detach();
}
time_t t1 = GetTick();
// wait for all producer done
while (g_count.load() != n*nproducer) {
usleep(100000);
}
time_t t2 = GetTick();
std::cout << t2 << " " << t1 << " " << "used:" << t2-t1 << " exec per sec:"
<< (double)g_count.load()*1000 / (t2-t1) << std::endl;
thrdpool_terminate(pool);
thrdpool_waitdone(pool);
}
int main() {
// test_thrdpool(1, 8);
test_thrdpool(4, 4);
return 0;
}