线程池实现
结构设计
先上图:
参数
- 线程池:
- 包含一个执行队列、
- 一个任务队列
- mutex用来在多个线程取任务时锁任务队列,cond用来在任务队列为空时锁任务队列
- 如线程A锁了任务队列,去取任务时,又发现任务队列为空,线程A会先释放锁,然后重新加锁,等待有新的任务加入任务队列,唤醒自己
- 执行队列:
- 一般一个线程对应一个执行者,创建线程池时,没创建一个线程,就绑定一个执行者,因此其节点需要
线程id
- 取任务时,需要通过线程池结构体取任务队列,因此需要
线程池对象
- 在线程池回收时,所有线程和锁都需要释放,在释放的时候执行者不能再去获取新锁,因此需要
停止
- 任务队列:
取一个任务,也就是获取一个任务节点后,需要知道执行的函数地址
和参数
API
创建/关闭线程池、添加任务、线程入口函数(获取并执行任务)
- 创建线程池:就是创建多个线程,以及初始化其结构体的参数
- mutex和cond的静态初始化
- 线程和执行队列的创建
- 添加任务:添加任务并唤醒等待的线程
- 线程入口函数:锁队列,如果队列为空就锁等待,取到任务后执行函数
- 关闭线程池:所有执行者都停止,唤醒所有条件变量以释放锁,线程池、执行队列、任务队列对象置为NULL
代码实现
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
// Tip:宏定义只对一行内有效,所以用 \ 吧回车转义了
// 而且宏以内联的方式编译进可执行文件速度快
#define LL_ADD(item, list) do { \
item->prev = NULL; \
item->next = list; \
if (list!=NULL) list->prev=item; \
list = item; \
} while(0)
#define LL_REMOVE(item, list) do { \
if (item->prev != NULL) item->prev->next = item->next; \
if (item->next != NULL) item->next->prev = item->prev; \
if (list == item) list = item->next; \
item->prev = item->next = NULL; \
} while(0)
// 任务执行队列
typedef struct NWORKER {
pthread_t thread; // 一个执行者一个线程,线程号
int terminate; // 相当于让这个工作者休息,抢不到任务队列的锁
struct NWORKQUEUE *workqueue; // 线程池,为了方便取任务
struct NWORKER *prev; // 任务链表
struct NWORKER *next;
} nWorker;
// 任务队列
typedef struct NJOB {
// Tip:函数指针,换成void (*job_function)(void *arg)更灵活
void (*job_function)(struct NJOB *job);
void *user_data; // 参数
struct NJOB *prev;
struct NJOB *next;
} nJob;
// 线程池
typedef struct NWORKQUEUE {
struct NWORKER *workers;
struct NJOB *waiting_jobs;
pthread_mutex_t jobs_mtx; // 互斥锁,抢到锁的线程才去任务队列取数据
pthread_cond_t jobs_cond; // 条件等待
} nWorkQueue;
typedef nWorkQueue nThreadPool;
// 线程入口函数:对任务队列进行加锁,如果任务队列为空则进入条件等待
static void *thread_callback(void *ptr) {
nWorker *worker = (nWorker*)ptr;
while (1) {
// pthread_cond_wait必须配合pthread_mutex_lock使用,否则,可能有多个线程一起等待条件被唤醒
// 线程A在这里上锁
pthread_mutex_lock(&worker->workqueue->jobs_mtx);
// 通过线程池取任务,
// 任务队列为空就阻塞,等待队列不为空被唤醒
while (worker->workqueue->waiting_jobs == NULL) {
if (worker->terminate) break;
// 条件等待,线程A先解锁又重新加锁,所以线程A最终阻塞在这里,其他线程还阻塞在上面的mutex
pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mtx);
}
if (worker->terminate) {
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
break;
}
nJob *job = worker->workqueue->waiting_jobs;
if (job != NULL) {
LL_REMOVE(job, worker->workqueue->waiting_jobs);
}
pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
if (job == NULL) continue;
job->job_function(job);
}
free(worker);
pthread_exit(NULL);
}
// 创建线程池:初始化线程池参数、为每个线程构建一个执行者
int ntyThreadPoolCreate(nThreadPool *workqueue, int num_thread) {
if (num_thread < 1) num_thread = 1; // 线程数量初始化
memset(workqueue, 0, sizeof(nThreadPool)); // 线程池初始化为0,防止有脏数据
// 条件变量和互斥量静态初始化; 动态方式调用pthread_cond_init()函数
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&workqueue->jobs_cond, &blank_cond, sizeof(workqueue->jobs_cond));
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(&workqueue->jobs_mtx, &blank_mutex, sizeof(workqueue->jobs_mtx));
// 一个线程对应一个执行者
int i = 0;
for (i = 0;i < num_thread;i ++) {
nWorker *worker = (nWorker*)malloc(sizeof(nWorker));
if (worker == NULL) {
perror("malloc");
return i; // 返回线程创建成功的数量
}
memset(worker, 0, sizeof(nWorker));
worker->workqueue = workqueue;
// 线程号、线程属性、入口函数、函数参数
int ret = pthread_create(&worker->thread, NULL, thread_callback, (void *)worker);
if (ret) {
perror("pthread_create");
free(worker);
return i;
}
// 创建线程的同时,将worker加入执行队列
LL_ADD(worker, worker->workqueue->workers);
}
return 0;
}
// 关闭线程池
void ntyThreadPoolShutdown(nThreadPool *workqueue) {
nWorker *worker = NULL;
for (worker = workqueue->workers;worker != NULL;worker = worker->next) {
worker->terminate = 1; // 所有执行者休息,不要再去锁任务队列
}
pthread_mutex_lock(&workqueue->jobs_mtx);
workqueue->workers = NULL;
workqueue->waiting_jobs = NULL;
pthread_cond_broadcast(&workqueue->jobs_cond); // 激活所有条件变量
pthread_mutex_unlock(&workqueue->jobs_mtx);
}
// 添加任务
void ntyThreadPoolQueue(nThreadPool *workqueue, nJob *job) {
pthread_mutex_lock(&workqueue->jobs_mtx);
LL_ADD(job, workqueue->waiting_jobs);
pthread_cond_signal(&workqueue->jobs_cond); // 唤醒条件等待线程
pthread_mutex_unlock(&workqueue->jobs_mtx);
}
/************************** debug thread pool **************************/
#if 1
#define ZENGER_MAX_THREAD 80
#define ZENGER_COUNTER_SIZE 1000
void ZENGER_counter(nJob *job) {
int index = *(int*)job->user_data;
printf("index : %d, selfid : %lu\n", index, pthread_self());
free(job->user_data);
free(job);
}
int main(int argc, char *argv[]) {
nThreadPool pool;
ntyThreadPoolCreate(&pool, ZENGER_MAX_THREAD); // 创建线程池
int i = 0; // 创建任务
for (i = 0;i < ZENGER_COUNTER_SIZE;i ++) {
nJob *job = (nJob*)malloc(sizeof(nJob));
if (job == NULL) {
perror("malloc");
exit(1);
}
job->job_function = ZENGER_counter;
// Tip:这里不能用int i; 因为for循环一结束,i就被回收了,所以要在堆上申请
job->user_data = malloc(sizeof(int));
*(int*)job->user_data = i;
ntyThreadPoolQueue(&pool, job);
}
getchar();
printf("\n");
}
#endif