😏★,°:.☆( ̄▽ ̄)/$:.°★ 😏
这篇文章主要介绍ThreadPoll线程池实现。
学其所用,用其所学。——梁启超
欢迎来到我的博客,一起学习,共同进步。
喜欢的朋友可以关注一下,下次更新不迷路🥞
文章目录
- :smirk:1. 线程池介绍
- :blush:2. 线程池实现方法
- :satisfied:3. 线程池实现示例
😏1. 线程池介绍
线程池是一种线程管理的抽象概念,它主要用于优化多线程应用程序的性能和资源利用。在多线程编程中,创建和销毁线程是一个开销较大的操作。线程池通过预先创建一组线程,并将任务提交给这些线程来执行,从而避免了重复创建和销毁线程的开销。
线程池通常由以下几个组件组成:
1.任务队列(Task Queue):用于存储待执行的任务。当任务提交到线程池时,它们被放置在任务队列中等待执行。
2.线程池管理器(Thread Pool Manager):负责创建、管理和调度线程池中的线程。它控制着线程的数量,可以动态地增加或减少线程的数量,以适应不同的工作负载。
3.工作线程(Worker Threads):线程池中的实际执行单元。它们不断地从任务队列中获取任务并执行。
4.任务接口(Task Interface):定义了要执行的任务的接口。通常,任务是以函数或可运行对象的形式表示。
使用线程池的好处包括:
提高性能:线程池可以减少线程的创建和销毁次数,避免了频繁的上下文切换,提高了多线程程序的性能和响应速度。
资源管理:线程池可以限制并发线程的数量,避免资源过度占用,从而更好地管理系统资源。
提高可扩展性:通过调整线程池的大小,可以适应不同的并发需求,提高系统的可扩展性。
总之,线程池是一种重要的多线程编程技术,它能够有效地管理和利用线程,提高程序的性能和资源利用率。
😊2. 线程池实现方法
如上所述,实现线程池主要有4个部分:
- 线程管理器:用于创建并管理线程池。
- 工作线程:线程池中实际执行任务的线程。在初始化线程时会预先创建好固定数目的线程在池中,这些初始化的线程一般处于空闲状态。
- 任务接口:每个任务必须实现的接口。当线程池的任务队列中有可执行任务时,被空间的工作线程调去执行(线程的闲与忙的状态是通过互斥量实现的),把任务抽象出来形成一个接口,可以做到线程池与具体的任务无关。
- 任务队列:用来存放没有处理的任务。提供一种缓冲机制。实现这种结构有很多方法,常用的有队列和链表结构。
😆3. 线程池实现示例
参考Github:https://github.com/volute24/ThreadPoll
// main.cpp
#include "threadpool.h"
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#define THREADPOOL_MAX_NUM 5
void* mytask(void *arg)
{
printf("thread %d is working on task %d\n", (int)pthread_self(), *(int*)arg);
sleep(1);
free(arg);
return NULL;
}
int main(int argc, char* argv[])
{
threadpool_t pool;
// 初始化线程池,最多5个线程
threadpool_init(&pool, THREADPOOL_MAX_NUM);
// 创建10个任务
for(int i=0; i < 10; i++)
{
int *arg =(int *)malloc(sizeof(int));
*arg = i;
threadpool_add_task(&pool, mytask, arg);
//printf("arg address:%p,arg:[%d],i:[%d]\n",arg,*arg,i);
}
threadpool_destroy(&pool);
return 0;
}
// threadpool.h
#ifndef _THREAD_POLL_H_
#define _THREAD_POLL_H_
#include "condition.h"
// 封装线程池中的对象需要执行的任务对象
typedef struct task
{
void *(*run)(void *args);//函数指针,需要执行的任务
void *arg; //参数
struct task *next; //任务队列中下一个任务
}task_t;
//定义线程池结构体
typedef struct threadpool
{
condition_t ready; //状态量
task_t *first; //任务队列中第一个任务
task_t *last; //任务队列中最后一个任务
int counter; //线程池中已有线程数
int idle; //线程池中空闲线程数
int max_threads; //线程池最大线程数
int quit; //是否退出标志
}threadpool_t;
//线程池初始化
void threadpool_init(threadpool_t *pool,int threads);
//线程池中加入任务
void threadpool_add_task(threadpool_t *pool,void *(*run)(void *args),void *arg);
//销毁线程池
void threadpool_destroy(threadpool_t *pool);
#endif
// threadpoll.cpp
#include "threadpool.h"
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <pthread.h>
using namespace std;
//线程执行
void *thread_routine(void *arg)
{
struct timespec abstime;
int timeout;
printf("thread %d is starting\n",(int)pthread_self());
threadpool_t *pool = (threadpool_t *)arg;
while(1)
{
timeout = 0;
//访问池前加锁
condition_lock(&pool->ready);
//空闲
pool->idle++;
//等待队列任务|| 收到线程池销毁通知
while(pool->first == NULL && !pool->quit)
{
//否则线程阻塞等待
printf("thread %d is waiting\n",(int)pthread_self());
//获取从当前时间加上等待时间,设置超时睡眠时间
//clock_gettime 在编译链接时需加上 -lrt ,librt中实现了clock_gettime函数
clock_gettime(CLOCK_REALTIME,&abstime); //CLOCK_REALTIME 系统实时时间
abstime.tv_sec += 2;
int status;
status = condition_timedwait(&pool->ready,&abstime);
if(status == ETIMEDOUT)
{
printf("thread %d wait timed out\n",(int)pthread_self());
timeout = -1;
break;
}
}
pool->idle--;
if(pool->first != NULL)
{
//取出等待队列最前任务,移除任务,并执行任务
task_t *t = pool->first;
pool->first = t->next;
//由于任务执行需要消耗时间,先解锁让其他线程访问线程池
condition_unlock(&pool->ready);
//执行任务
t->run(t->arg);
//执行完任务释放内存
free(t);
//重新加锁
condition_lock(&pool->ready);
}
//退出线程池
if(pool->quit && pool->first == NULL)
{
pool->counter--;
//若线程池中没有线程,通知等待线程(主线程)全部任务已经完成
if(pool->counter == 0)
{
condition_signal(&pool->ready);
}
condition_unlock(&pool->ready);
break;
}
//超时,跳出销毁线程
if(timeout == 1)
{
pool->counter--;
condition_unlock(&pool->ready);
break;
}
condition_unlock(&pool->ready);
}
printf("thread %d is exiting\n", (int)pthread_self());
return NULL;
}
//线程池初始化
void threadpool_init(threadpool_t *pool, int threads)
{
int nstatu = condition_init(&pool-ready>);
printf("Init return values:%d\n",nstatu);
pool->first = NULL;
pool->last =NULL;
pool->counter =0;
pool->idle =0;
pool->max_threads = threads;
pool->quit =0;
}
//增加一个任务到线程池
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg)
{
//产生一个新的任务
task_t *newtask = (task_t *)malloc(sizeof(task_t));
newtask->run = run;
newtask->arg = arg;
newtask->next=NULL;//新加的任务放在队列尾端
//线程池的状态被多个线程共享,操作前需要加锁
condition_lock(&pool->ready);
if(pool->first == NULL)
{
pool->first = newtask;
}
else
{
pool->last->next = newtask;
}
pool->last = newtask; //队列尾指向新加入的线程
//线程池中有线程空闲,唤醒
if(pool->idle > 0)
{
condition_signal(&pool->ready);
}
//当前线程池中线程个数没有达到设定的最大值,创建一个新的线程
else if(pool->counter < pool->max_threads)
{
pthread_t tid;
pthread_create(&tid, NULL, thread_routine,pool);
pool->counter++;
}
//结束,访问解锁
condition_unlock(&pool->ready);
}
//线程池销毁
void threadpool_destroy(threadpool_t *pool)
{
if(pool->quit)
{
return;
}
condition_lock(&pool->ready);
pool->quit = 1;
//线程池中线程个数大于0
if(pool->counter > 0)
{
//对于等待的线程,发送信号唤醒
if(pool->idle > 0)
{
condition_broadcast(&pool->ready);
}
//正在执行任务的线程,等待他们结束任务
while(pool->counter)
{
condition_wait(&pool->ready);
}
}
condition_unlock(&pool->ready);
condition_destroy(&pool->ready);
}
//condition.h
#ifndef _CONDITION_H_
#define _CONDITION_H_
#include <pthread.h>
//封装互斥量和条件变量作为初始状态
typedef struct condition
{
pthread_mutex_t pmutex;
pthread_cond_t pcond;
}condition_t;
//对状态操作函数
int condition_init(condition_t *cond);
int condition_lock(condition_t *cond);
int condition_unlock(condition_t *cond);
//条件等待
int condition_wait(condition_t *cond);
//计时等待
int condition_timedwait(condition_t *cond,const struct timespec *abstime);
//激活一个等待该条件的线程
int condition_signal(condition_t *cond);
//激活所有等待线程
int condition_broadcast(condition_t *cond);
int condition_destroy(condition_t *cond);
#endif
// condition.cpp
#include "condition.h"
#include "stdio.h"
//初始化
int condition_init(condition_t *cond)
{
int status;
if((status = pthread_mutex_init(&cond->pmutex,NULL)))
{
printf("pthread_mutex_init return value:%d\n",status);
return status;
}
if((status = pthread_cond_init(&cond->pcond,NULL)))
{
printf("pthread_cond_init return value:%d\n",status);
return status;
}
return 0;
}
//加锁
int condition_lock(condition_t *cond)
{
return pthread_mutex_lock(&cond->pmutex);
}
//解锁
int condition_unlock(condition_t *cond)
{
return pthread_mutex_unlock(&cond->pmutex);
}
//等待
int condition_wait(condition_t * cond)
{
return pthread_cond_wait(&cond->pcond,&cond->pmutex);
}
//固定时间等待
int condition_timedwait(condition_t *cond, const struct timespec *abstime)
{
return pthread_cond_timedwait(&cond->pcond,&cond->pmutex,abstime);
}
//唤醒一个睡眠线程
int condition_signal(condition_t *cond)
{
return pthread_cond_signal(&cond->pcond);
}
//唤醒所有睡眠线程
int condition_broadcast(condition_t * cond)
{
return pthread_cond_broadcast(&cond->pcond);
}
//销毁锁
int condition_destroy(condition_t * cond)
{
int status;
if((status = pthread_mutex_destroy(&cond->pmutex)))
{
return status;
}
if((status = pthread_cond_destroy(&cond->pcond)))
{
return status;
}
return 0;
}
编译:g++ main.cpp condition.cpp threadpool.cpp -lpthread
运行如下:
Init return values:0
thread 1696954112 is starting
thread 1705346816 is starting
thread 1688561408 is starting
thread 1688561408 is working on task 0
thread 1671776000 is starting
thread 1671776000 is working on task 2
thread 1705346816 is working on task 1
thread 1696954112 is working on task 3
thread 1680168704 is starting
thread 1680168704 is working on task 4
thread 1688561408 is working on task 5
thread 1671776000 is working on task 6
thread 1705346816 is working on task 7
thread 1680168704 is working on task 8
thread 1696954112 is working on task 9
thread 1680168704 is exiting
thread 1705346816 is exiting
thread 1696954112 is exiting
以上。