深入分析 workflow 线程池

news2024/11/20 13:23:36

深入分析 workflow 线程池

线程池是日常开发中很常用的一种管理线程的工具。它是池化技术中的一种。

池化技术的初衷就是将一些资源进行重复利用,来避免重复的构建来提高执行效率。类似的还有数据库连接池,字符串常量池,httpClient 连接池。

本文将分享一个好用的线程池,其来源于搜狗开源高性能网络框架workflow。

workflow 是搜狗公司近期开源发布的一款 C++ 服务器引擎,支撑搜狗几乎所有后端 C++ 在线服务,包括所有搜索服务,云输入法,在线广告等,每日处理超百亿请求。

下面就通过阅读源码的方式来深入了解workflow线程池的实现原理。

workflow线程池

workflow的线程池主要分布在四个文件中,msgqueue.h/msgqueue.c/thrdpool.h/thrdpool.c。

首先我们分析msgqueue.h/msgqueue.c

msgqueue

从名字中得知,这个是一个消息队列。这个消息队列中盛放的就是需要在线程池中执行的任务。所谓任务就是一个执行函数和一个对应的入参。

在workflow中,任务的定义是thrdpool_task,有两个参数,第一个是一个函数指针routine,第二个参数是context上下文。

struct thrdpool_task
{
	void (*routine)(void *);
	void *context;
};

接下来看一下msgqueue的定义。 其中包含了两个队列。一个队列用于放置新任务,一个队列用于拿取新任务。当get的任务队列为空时,就将get队列和put队列进行切换。如果get和put共用一个队列,那么放置任务和取任务都需要加锁,而使用两个队列的好处是只有当get的任务队列为空时进行切换时,才需要进行加锁。

__msgqueue结构体的定义中,get_head就是读队列的队头,put_head是放置队列的队头,put_tail是放置队列的队尾。

msg_max代表最大能够放置的任务数量。

nonblock表示消息队列的阻塞模式。如果是阻塞的,当队列中消息放满了,则不再继续放,直到队列中消息被消费。如果是非阻塞的,则可以一直放消息。

linkoff代表link指针的偏移量。例如下面的msg,其linkoff值就是4。

typedef struct msg_t {
	int data; 
	struct msg_t* link; 
} msg_t;
struct __msgqueue
{
	size_t msg_max;
	size_t msg_cnt;
	int linkoff;
	int nonblock;
	void *head1;
	void *head2;
	void **get_head;
	void **put_head;
	void **put_tail;
	pthread_mutex_t get_mutex;
	pthread_mutex_t put_mutex;
	pthread_cond_t get_cond;
	pthread_cond_t put_cond;
};

有了上述的初步概念之后,我们看看msgqueue中会提供哪些方法。如下所示是msgqueue.h的源代码。

msgqueue.h

#ifndef _MSGQUEUE_H_
#define _MSGQUEUE_H_

#include <stddef.h>

typedef struct __msgqueue msgqueue_t;

#ifdef __cplusplus
extern "C"
{
#endif

/* A simple implementation of message queue. The max pending messages may
 * reach two times 'maxlen' when the queue is in blocking mode, and infinite
 * in nonblocking mode. 'linkoff' is the offset from the head of each message,
 * where spaces of one pointer size should be available for internal usage.
 * 'linkoff' can be positive or negative or zero. */

msgqueue_t *msgqueue_create(size_t maxlen, int linkoff);
void msgqueue_put(void *msg, msgqueue_t *queue);
void *msgqueue_get(msgqueue_t *queue);
void msgqueue_set_nonblock(msgqueue_t *queue);
void msgqueue_set_block(msgqueue_t *queue);
void msgqueue_destroy(msgqueue_t *queue);

#ifdef __cplusplus
}
#endif

#endif

因为其实c语言编写的代码,为了使其可以被c和c++程序都调用,因此在代码中使用了extern "C"

msgqueue的头文件中提供了6个方法,其作用总结如下:

  • msgqueue_create:创建msgqueue。
  • msgqueue_put:放置任务到msgqueue中。
  • msgqueue_get:从msgqueue中取出任务。
  • msgqueue_set_nonblock:将msgqueue设置为nonblock。
  • msgqueue_set_block:将msgqueue设置为block。
  • msgqueue_destroy:销毁msgqueu额。

上述接口做到了见文知意,值得学习。

下面看看这些接口是如何实现的。

msgqueue_create

第一眼看到这个代码,居然是这种梯形的代码,难以说是优美的,不知道是不是我没有领悟到其中的精髓?这个梯形的代码应该可以使用if-return进行优化。

	ret = pthread_mutex_init(&queue->get_mutex, NULL);
	if (ret != 0)
	{
		//...
	}
	ret = pthread_mutex_init(&queue->put_mutex, NULL);
	if (ret != 0) 
	{
		//...
	}

下面继续看msgqueue_create的实现。其实该代码并不难,主要就是对一些变量进行初始化。初始化了get_mutex/put_mutex/get_cond/put_cond。在这些互斥锁和条件变量创建成功后,初始化maxlen/linkoff/msg_cnt/nonblock,并将msgqueue中的队列指针设置为空。

msgqueue_t *msgqueue_create(size_t maxlen, int linkoff)
{
	msgqueue_t *queue = (msgqueue_t *)malloc(sizeof (msgqueue_t));
	int ret;

	if (!queue)
		return NULL;

	ret = pthread_mutex_init(&queue->get_mutex, NULL);
	if (ret == 0)
	{
		ret = pthread_mutex_init(&queue->put_mutex, NULL);
		if (ret == 0)
		{
			ret = pthread_cond_init(&queue->get_cond, NULL);
			if (ret == 0)
			{
				ret = pthread_cond_init(&queue->put_cond, NULL);
				if (ret == 0)
				{
					queue->msg_max = maxlen;
					queue->linkoff = linkoff;
					queue->head1 = NULL;
					queue->head2 = NULL;
					queue->get_head = &queue->head1;
					queue->put_head = &queue->head2;
					queue->put_tail = &queue->head2;
					queue->msg_cnt = 0;
					queue->nonblock = 0;
					return queue;
				}

				pthread_cond_destroy(&queue->get_cond);
			}

			pthread_mutex_destroy(&queue->put_mutex);
		}

		pthread_mutex_destroy(&queue->get_mutex);
	}

	errno = ret;
	free(queue);
	return NULL;
}

msgqueue_put

msgqueue_put的源码如下:

void msgqueue_put(void *msg, msgqueue_t *queue)
{
	void **link = (void **)((char *)msg + queue->linkoff);//(1)

	*link = NULL; //(2)
	pthread_mutex_lock(&queue->put_mutex);//(3)
	while (queue->msg_cnt > queue->msg_max - 1 && !queue->nonblock)//(4)
		pthread_cond_wait(&queue->put_cond, &queue->put_mutex);//(5)

	*queue->put_tail = link;//(6)
	queue->put_tail = link;//(7)
	queue->msg_cnt++;//(8)
	pthread_mutex_unlock(&queue->put_mutex);//(9)
	pthread_cond_signal(&queue->get_cond);//(10)
}

msgqueue_put是一个相对较难理解的方法。尤其是下面这两个更是劝退了很多人。

	*queue->put_tail = link;
	queue->put_tail = link;

msgqueue_put的入参有两个,第一个是msg,第二个是queue,实际上作用就是将msg放入queue中。

但是这里的msg的类型是一个void*类型,为了通用而考虑。下面以一个实际的msg类型取看下面的代码。

对于线程池而言, msg的类型是__thrdpool_task_entry,其拥有两个参数,第一个参数是link,其类似于一个next指针用于指向下一个task。第二个参数则是task的实际内容。

struct __thrdpool_task_entry
{
	void *link;
	struct thrdpool_task task;
};

下面开始一行一行的看代码。

下面代码的目的其实就是取出了msg中link指针所在的位置。因为msg的类型是void*,而void*指针无法进行加减,因此首先将msg转换成char*。给其加上了queue->linkoff,linkoff其实就是msg的类型中link指针的偏移量,对于__thrdpool_task_entry而言,linkoff值为0。 所以第1行代码将msg中的linkoff指针的地址传给link。

第二行代码就比较好理解,就是将linkoff指针指向了NULL。如下图所示:

msgqueue_put

	void **link = (void **)((char *)msg + queue->linkoff);//(1)

	*link = NULL;//(2)

接着往下看,下面这块代码的作用是如果队列中的消息已经放满,并且是block模式,则将停止放消息,直到队列中的消息被消费。如果是non-block模式,则可以一直放消息,没有数量限制。

第3行代码使用put_mutex给临界区上锁,因为可能有多个线程同时put。

第4行和第5行是条件变量的常规写法,即循环判等,直到条件满足。

	pthread_mutex_lock(&queue->put_mutex); //(3)
	while (queue->msg_cnt > queue->msg_max - 1 && !queue->nonblock) //(4)
		pthread_cond_wait(&queue->put_cond, &queue->put_mutex); //(5)

下面两行是劝退代码。。。

	*queue->put_tail = link;// (6)
	queue->put_tail = link; //(7)

首先回顾一下在msgqueue_create方法中,queue->put_tail指向了head2。如下图所示:

msgqueue_put

因此第6行的作用就是让head2指向了link。其效果如下图所示。

msgqueue_put

第7行的作用是将put_tail移动到消息队列的末尾,即link上。

其效果如下所示:

msgqueue_put

如果再有其他消息进来,过程也是类似的,例如此时再加入一条消息,队列的情况如下所示:

msgqueue_put

对照图形,这两行代码也就清晰可见了。

最后三行很简单,不在详细解析,参考注释。

	queue->msg_cnt++;//队列中的消息数量加1
	pthread_mutex_unlock(&queue->put_mutex);//将put_mutex解锁
	pthread_cond_signal(&queue->get_cond);//发送信号给消费线程

msgqueue_get

msgqueue_get的代码如下所示:

void *msgqueue_get(msgqueue_t *queue)
{
	void *msg;//(1)

	pthread_mutex_lock(&queue->get_mutex);//(2)
	if (*queue->get_head || __msgqueue_swap(queue) > 0) //(3)
	{
		msg = (char *)*queue->get_head - queue->linkoff;//(4)
		*queue->get_head = *(void **)*queue->get_head;//(5)
	}
	else
	{
		msg = NULL; //(6)
		errno = ENOENT; //(7)
	}

	pthread_mutex_unlock(&queue->get_mutex);//(8)
	return msg;//(9)
}

下面还是一行一行的进行解析。

第1行定义了msg指针,第2行使用get_mutex进行加锁,因为get的过程可能是并发的。

第3行,如果get_head为不为空,则意味着可以从中取出msg。第4行的目的是重新计算出msg消息的起始地址。第5行是让get_head指向了get队列的下一个msg。

如下图所示,是将msg指针指向了msg消息的地址,

msgqueue_get

下面则是让get_head指向了队列中的下一个元素。

msgqueue_get

重新回到第3行,如果get_head为空,则意味着可能需要将put队列和get队列进行交换。这里参考__msgqueue_swap的解析。

第6行和第7行是处理如果put队列元素也为空的情况。元素为空的场景会出现在non-block的场景上。block的场景下,如果put队列为空,则会阻塞等待。

__msgqueue_swap

__msgqueue_swap的源码如下所示:

static size_t __msgqueue_swap(msgqueue_t *queue)
{
	void **get_head = queue->get_head; //(1)
	size_t cnt; //(2)

	queue->get_head = queue->put_head;//(3)
	pthread_mutex_lock(&queue->put_mutex);//(4)
	while (queue->msg_cnt == 0 && !queue->nonblock)//(5)
		pthread_cond_wait(&queue->get_cond, &queue->put_mutex);(6)

	cnt = queue->msg_cnt;//(7)
	if (cnt > queue->msg_max - 1)//(8)
		pthread_cond_broadcast(&queue->put_cond);//(9)

	queue->put_head = get_head;//(10)
	queue->put_tail = get_head;//(11)
	queue->msg_cnt = 0;//(12)
	pthread_mutex_unlock(&queue->put_mutex);//(13)
	return cnt;
}

下面逐一进行分析。

第1-2行比较好理解,就是取出了队列的头部元素的地址。cnt代表put队列中的消息的数量。

第3行将get_head指向了put队列的头部。注意此时已经加上了get_mutex,所以不会有并发的问题。

第4行加上了put_mutex。

第5-6行当消息队列是block模式时,将判断put队列是否有元素,如果没有元素,则等待。如果是non-block模式,则跳过。

程序走到第7-9行是为了通知正在等待的生产者可以继续生产。这里虽然使用了pthread_cond_broadcast,但是生产者并不会立即被唤醒,因为此时queue->put_mutex被当前线程持有,生产者唤醒后自旋强锁一段时间失败后,将再次sleep,直到queue->put_mutex被释放。 所以个人觉得7-9行放置到12行后面可能更好一些。

10-12行代表则是将两个队列的指针进行切换。

第13行释放queue->put_mutex,生产者可以继续生产。

msgqueue_set_nonblock

msgqueue_set_nonblock代码较为简单,即设置队列的为non-block模式。

void msgqueue_set_nonblock(msgqueue_t *queue)
{
	queue->nonblock = 1;
	pthread_mutex_lock(&queue->put_mutex);
	pthread_cond_signal(&queue->get_cond);
	pthread_cond_broadcast(&queue->put_cond);
	pthread_mutex_unlock(&queue->put_mutex);
}

msgqueue_set_block

msgqueue_set_block代码较为简单,即设置队列的为block模式。

void msgqueue_set_block(msgqueue_t *queue)
{
	queue->nonblock = 0;
}

msgqueue_destroy

msgqueue_destroy方法主要就是为了销毁其中用到的互斥锁和条件变量。

	pthread_cond_destroy(&queue->put_cond);
	pthread_cond_destroy(&queue->get_cond);
	pthread_mutex_destroy(&queue->put_mutex);
	pthread_mutex_destroy(&queue->get_mutex);
	free(queue);

thrdpool

thrpool是线程池的核心代码,首先看一下其头文件thrdpool.h,如下所示:

#ifndef _THRDPOOL_H_
#define _THRDPOOL_H_

#include <stddef.h>

typedef struct __thrdpool thrdpool_t;

struct thrdpool_task
{
	void (*routine)(void *);
	void *context;
};

#ifdef __cplusplus
extern "C"
{
#endif

thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize);
int thrdpool_schedule(const struct thrdpool_task *task, thrdpool_t *pool);
int thrdpool_increase(thrdpool_t *pool);
int thrdpool_in_pool(thrdpool_t *pool);
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
					  thrdpool_t *pool);

#ifdef __cplusplus
}
#endif

#endif

可以看到其中的方法不是很多,仅有5个,下面便一个一个的进行分析。

thrdpool_create

thrdpool_create的代码如下所示。

这里又使用了嵌套if的形式,不太美观。

thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize)
{
	thrdpool_t *pool;// (1)
	int ret;// (2)

	pool = (thrdpool_t *)malloc(sizeof (thrdpool_t));// (3)
	if (!pool)// (4)
		return NULL;// (5)

	pool->msgqueue = msgqueue_create((size_t)-1, 0);// (6)
	if (pool->msgqueue)// (7)
	{
		ret = pthread_mutex_init(&pool->mutex, NULL);// (8)
		if (ret == 0)// (9)
		{
			ret = pthread_key_create(&pool->key, NULL);// (9)
			if (ret == 0)// (10)
			{
				pool->stacksize = stacksize;// (11)
				pool->nthreads = 0;// (12)
				memset(&pool->tid, 0, sizeof (pthread_t));// (13)
				pool->terminate = NULL;// (14)
				if (__thrdpool_create_threads(nthreads, pool) >= 0)// (15)
					return pool;// (16)

				pthread_key_delete(pool->key);// (17)
			}

			pthread_mutex_destroy(&pool->mutex);// (18)
		}

		errno = ret;// (19)
		msgqueue_destroy(pool->msgqueue);// (20)
	}

	free(pool);
	return NULL;
}

thrdpool_create的入参是nthreads和stacksize,分别代表线程数量和线程栈的大小。

第1-2行声明了pool和ret变量。

第3-5行创建了pool对象。

第6行给pool创建了一个消息队列msgqueue。

第7-14行,当消息队列创建成功后,初始化pool中的mutex和线程key。并设置了线程池的线程数量和stack。

第15行调用了__thrdpool_create_threads进行实际的线程创建。后面将针对该方法讲解。如果创建成功,16行将pool变量进行返回。

第17-20行则是进行一些创建失败时的回滚操作。

__thrdpool_create_threads

__thrdpool_create_threads是实际创建线程的方法。

static int __thrdpool_create_threads(size_t nthreads, thrdpool_t *pool)
{
	pthread_attr_t attr;//(1)
	pthread_t tid;//(2)
	int ret;//(3)

	ret = pthread_attr_init(&attr);//(4)
	if (ret == 0)//(5)
	{
		if (pool->stacksize)//(6)
			pthread_attr_setstacksize(&attr, pool->stacksize);//(7)

		while (pool->nthreads < nthreads)//(8)
		{
			ret = pthread_create(&tid, &attr, __thrdpool_routine, pool);//(9)
			if (ret == 0)//(10)
				pool->nthreads++;//(11)
			else//(12)
				break;//(13)
		}

		pthread_attr_destroy(&attr);//(14)
		if (pool->nthreads == nthreads)//(15)
			return 0;//(16)

		__thrdpool_terminate(0, pool);//(17)
	}

	errno = ret;//(18)
	return -1;//(19)
}

第1-3行声明了三个参数attr/tid/ret。

第4行创建了attr。如果创建成功,则进行继续,如果创建失败,则返回错误。

第6-7行设置了线程栈的大小。

第8-13行循环地进行线程的创建。若创建成功,则将线程池中的线程数进行递增。线程的入口方法是__thrdpool_routine,将在下面进行讲解。

第14-16行当线程创建完毕后要对attr进行销毁。如果创建出的线程数量等于期望的数量,则返沪,否则创建失败,销毁线程池。

__thrdpool_routine

static void *__thrdpool_routine(void *arg)
{
	thrdpool_t *pool = (thrdpool_t *)arg;//(1)
	struct __thrdpool_task_entry *entry;//(2)
	void (*task_routine)(void *);//(3)
	void *task_context;//(4)
	pthread_t tid;//(5)

	pthread_setspecific(pool->key, pool);//(6)
	while (!pool->terminate)//(7)
	{
		entry = (struct __thrdpool_task_entry *)msgqueue_get(pool->msgqueue);//(8)
		if (!entry)//(9)
			break;//(10)

		task_routine = entry->task.routine;//(11)
		task_context = entry->task.context;//(12)
		free(entry);//(13)
		task_routine(task_context);//(14)

		if (pool->nthreads == 0)//(15)
		{
			free(pool);//(16)
			return NULL;//(17)
		}
	}

	/* One thread joins another. Don't need to keep all thread IDs. */
	pthread_mutex_lock(&pool->mutex);//(18)
	tid = pool->tid;//(19)
	pool->tid = pthread_self();//(20)
	if (--pool->nthreads == 0)//(21)
		pthread_cond_signal(pool->terminate);//(22)

	pthread_mutex_unlock(&pool->mutex);//(23)
	if (memcmp(&tid, &__zero_tid, sizeof (pthread_t)) != 0)//(24)
		pthread_join(tid, NULL);//(25)

	return NULL;//(23)
}

第1行,从arg中取出了参数,强转为thrdpool_t类型。第2-5行声明了一条参数。

第6行,向pool->key中设置了一个pool的地址值。这将用于后面判断一个线程是否属于某个线程池。

第7到17行,循环从消息队列中取出任务执行,如果取到的消息为空,则退出。因为线程池退出的时候会设置消息队列为non-block,因此取到的消息可能为空。第7行中,pool->terminate在线程池没有退出时其值为NULL,当线程池destroy时,会对其赋值。

第18-23行,则代表线程已经退出了,这里会首先挂上pool->mutex。因为线程可能存在同时退出的场景。这里的设计思路是让线程逐一退出,让后一个线程去join前一个线程。后面在讲解destroy时,还会再提到这里。

thrdpool_schedule

thrdpool_schedule的作用实际就是向线程池中推送一个任务。内部将调用msgqueue_put向消息队列中塞上一个任务。该方法的实现比较简单,不做过多解析。

int thrdpool_schedule(const struct thrdpool_task *task, thrdpool_t *pool)
{
	void *buf = malloc(sizeof (struct __thrdpool_task_entry));

	if (buf)
	{
		__thrdpool_schedule(task, buf, pool);
		return 0;
	}

	return -1;
}

void __thrdpool_schedule(const struct thrdpool_task *task, void *buf,
						 thrdpool_t *pool)
{
	((struct __thrdpool_task_entry *)buf)->task = *task;
	msgqueue_put(buf, pool->msgqueue);
}

thrdpool_increase

thrdpool_increase的作用是增加一个线程。该函数的作用还是比较清晰的,这里不做太多解析。

int thrdpool_increase(thrdpool_t *pool)
{
	pthread_attr_t attr;
	pthread_t tid;
	int ret;

	ret = pthread_attr_init(&attr);
	if (ret == 0)
	{
		if (pool->stacksize)
			pthread_attr_setstacksize(&attr, pool->stacksize);

		pthread_mutex_lock(&pool->mutex);
		ret = pthread_create(&tid, &attr, __thrdpool_routine, pool);
		if (ret == 0)
			pool->nthreads++;

		pthread_mutex_unlock(&pool->mutex);
		pthread_attr_destroy(&attr);
		if (ret == 0)
			return 0;
	}

	errno = ret;
	return -1;
}


thrdpool_in_pool

该函数的作用是判断一个线程是否属于线程池。属于线程池的线程在启动时会给线程私有变量key塞上pool的地址,因此可以使用pthread_getspecific(pool->key) == pool进行判断。

int thrdpool_in_pool(thrdpool_t *pool)
{
	return pthread_getspecific(pool->key) == pool;
}

thrdpool_destroy

thrdpool_destroy的函数入参包含量部分,一个部分是一个pending函数,这个函数用于处理一些已经提交但是还没有被执行的任务。


void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
					  thrdpool_t *pool)
{
	int in_pool = thrdpool_in_pool(pool);//(1)
	struct __thrdpool_task_entry *entry;//(2)

	__thrdpool_terminate(in_pool, pool);//(3)
	while (1)//(4)
	{
		entry = (struct __thrdpool_task_entry *)msgqueue_get(pool->msgqueue);//(4)
		if (!entry)//(5)
			break;//(6)

		if (pending)//(7)
			pending(&entry->task);//(8)

		free(entry);//(9)
	}

	pthread_key_delete(pool->key);//(10)
	pthread_mutex_destroy(&pool->mutex);//(11)
	msgqueue_destroy(pool->msgqueue);//(12)
	if (!in_pool)//(13)
		free(pool);//(14)
}

第1行in_pool用于判断调用thrdpool_destroy的线程是否属于线程池中的线程。destroy线程可以是线程池中的线程,也可以是外部线程。二者的实现会有区别。

第3行将调用__thrdpool_terminate进行线程池的销毁,后续将对其讲解。

第4-13行则是处理消息队列中还没有处理的一些消息。

__thrdpool_terminate

static void __thrdpool_terminate(int in_pool, thrdpool_t *pool)
{
	pthread_cond_t term = PTHREAD_COND_INITIALIZER;//(1)

	pthread_mutex_lock(&pool->mutex);//(2)
	msgqueue_set_nonblock(pool->msgqueue);//(3)
	pool->terminate = &term;//(4)

	if (in_pool)//(5)
	{
		pthread_detach(pthread_self());//(6)
		pool->nthreads--;//(7)
	}

	while (pool->nthreads > 0)//(8)
		pthread_cond_wait(&term, &pool->mutex);//(9)

	pthread_mutex_unlock(&pool->mutex);//(10)
	if (memcmp(&pool->tid, &__zero_tid, sizeof (pthread_t)) != 0)//(11)
		pthread_join(pool->tid, NULL);//(12)
}

第1行和第4行对条件变量pool->termincate设置了初值。将消息队列设置为non-block,使得空等消息的线程迅速走入退出流程。如果是线程池中的线程发出了destory请求,则将自身设置为detach,同时将线程池的线程数量减去1。

第8-9行则是等待所有的线程都退出。上面提到__thrdpool_routine会join前一个线程,并向pool->termincate发送信号。

最后如果是外部线程发起destroy,则还需要帮忙将线程池中的最后一个线程destroy。

如果是线程池中的线程发起destroy,因为上面已经设置为了detach,因此其自身无需join。

两种发起模式的区别可参考下面两张图:

发起者是外部线程:

thrdpool_terminate

发起者是线程池内部线程:

thrdpool_terminate

demo

文件结构如下所示:

[root@localhost workflow-thread]# tree .
.
├── a.out
├── main.cpp
├── msgqueue.cpp
├── msgqueue.h
├── thrdpool.cpp
└── thrdpool.h

main.cpp

//g++ main.cpp msgqueue.cpp thrdpool.cpp
#include <iostream>
#include <unistd.h>
#include "thrdpool.h"
#include "msgqueue.h"


struct Context{
	int val;
};

void my_func(void *context) // 我们要执行的函数  
{ 
	printf("task-%d start.\n", ((Context*)context)->val);
	sleep(1);
} 

void my_pending(const struct thrdpool_task *task) // 线程池销毁后,没执行的任务会到这里
{
	printf("pending task-%d.\n",  ((Context*)task->context)->val);  
} 

int main() 
{
	thrdpool_t *thrd_pool = thrdpool_create(3, 1024); // 创建  
	struct thrdpool_task task;
	int i;
	
	Context *p_context[5];
	for (i = 0; i < 5; i++)
	{
		p_context[i] = new Context();
		p_context[i]->val = i;
		task.routine = &my_func; 
		task.context = (void *)(p_context[i]); 
		thrdpool_schedule(&task, thrd_pool); // 调用
	}
	getchar(); 

	std::cout << "start_destroy" << std::endl;
	thrdpool_destroy(&my_pending, thrd_pool); // 结束
	for (i = 0; i < 5; i++)
	{
		delete p_context[i];
	}
	
	return 0; 
} 

demo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/755690.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

git下载源码及环境搭建下载源码之后端(一)

学习目标&#xff1a; git下载源码 步骤&#xff1a; 下载源码 使用 windows R 使用cmd调用命令框下载gitee云上面的 源码文件 输入命令&#xff1a; Git clone &#xff08;此处拼接gitee源代码 地址&#xff09; 若使用 git 命令 clone 项目时 我们需要在系统变量中进行…

Ceph 存储(最详细!)

目录 一&#xff1a;存储基础 1、单机存储设备 &#xff08;1&#xff09;DAS&#xff08;直接附加存储&#xff0c;是直接接到计算机的主板总线上去的存储&#xff09; &#xff08;2&#xff09;NAS&#xff08;网络附加存储&#xff0c;是通过网络附加到当前主机文件系统…

CSS 伪元素: ::marker 自定义列表序号

::marker 伪元素 ::marker&#xff0c;可作用在任何设置了 display: list-item 的元素或伪元素上&#xff0c;例如<li>和<summary>。 /** <ul><li>Peaches</li><li>Apples</li><li>Plums</li> </ul> */ ul li::…

Python教程(3)——python开发工具vscode的下载与安装

Python的开发工具有很多款&#xff0c;很多都是非常好用的&#xff0c;其中vscode作为其中一款Python的开发工具&#xff0c;是非常轻量级的&#xff0c;今天我们来介绍一下vs code的下载与安装。 vscode的下载与安装 首先需要到vscode的官网&#xff0c;这个谷歌或者百度一下…

【C++11】包装器 和 bind函数 的定义与 使用

包装器 在C11标准中&#xff0c;没有提供内置的包装器功能&#xff0c;但我们可以使用一些技术手段来实现包装器的效果。下面介绍两种常用的方法&#xff1a; 函数对象包装器 函数对象包装器&#xff08;Function Object Wrapper&#xff09;&#xff1a; C11引入了 std::fu…

CCLINK IE 转MODBUS-RTU网关modbusrtu地址对照表

远创智控YC-CCLKIE-RTU。这款产品的主要功能是将各种MODBUS-RTU、RS485、RS232设备接入到CCLINK IE FIELD BASIC网络中。 那么&#xff0c;这款通讯网关又有哪些特点呢&#xff1f;首先&#xff0c;它能够连接到CCLINK IE FIELD BASIC总线中作为从站使用&#xff0c;同时也能连…

晶体三极管总结

目录 1.3.1晶体管的结构与类型 1.3.2晶体管的电流放大作用 1.3.3晶体管的共射特性曲线 晶体三极管中有两种带有不同的极性电荷的载流子参与导电&#xff0c;故称之为双极型晶体管&#xff08;BJT&#xff09;Bipolar Junction Transistor 1.3.1晶体管的结构与类型 晶体三极…

PowerShell设置最美界面

在 Windows 上安装 scoop&#xff1a; scoop是什么 windows下的安装源搜索工具&#xff0c;有点类似centos下的yum和Ubuntu下的apt。用这个拉下来安装的软件没有广告。 scoop官方网址&#xff1a; https://scoop.sh/A command-line installer for Windowshttps://scoop.sh/ …

scratch二级、三级常考大题

目录 1. 绘制多彩五角星 2. 躲避陨石 3. 数星星 4. 古堡历险记 5. 五彩糖葫芦 6. 疫情隔离和核酸检测模拟 7. 画正方形 8.大鱼吃小鱼 9. 接水果 10. 绘制正方形 1. 绘制多彩五角星 1.准备工作 &#xff08;1&#xff09;选择背景stars、角色Pencil&#xff1b; &am…

工作日志3 对类型的判断 slice的截取对于jq的使用 el-table的表头和内容的位置

在 JavaScript 中&#xff0c;你可以使用多种方式来判断某个值是否为空。以下是几种常见的方法&#xff1a; 使用严格相等运算符 () 检查值是否为 null 或 undefined&#xff1a; var value null; // 或者 undefinedif (value null || value undefined) {// 值为空 }使用逻…

Linux常用命令——elm命令

在线Linux命令查询工具 elm 纯文本邮件客户端程序 补充说明 elm命令是一个E-mail客户端管理程序&#xff0c;它提供了纯文本交互式全屏幕界面。 语法 elm(选项)选项 -s<邮件主题>&#xff1a;指定新邮件的邮件主题&#xff1b; -f<目录>&#xff1a;开启程序…

Flutter 使用JSONToDart 生成bean文件

1. 首先安装插件&#xff0c;进入flile-setting-plugins 2.然后搜索安装jsontodart&#xff0c;之后重启ide使其生效 3.在你需要使用的地方直接鼠标右键 或者使用快捷键AltShiftD 4.然后会出现这样一个弹窗&#xff0c;输入你要用到的json数据&#xff0c;和文件名称点击生成就…

2023年房地产经纪中介行业研究报告

第一章 行业概况 1.1 概述 房地产经纪中介行业是一个专业的服务行业&#xff0c;主要涉及在买家和卖家之间进行房地产交易的媒介服务。这些服务包括评估和定价房产&#xff0c;对房产进行营销和推广&#xff0c;协助谈判和结算交易等。在这个行业中&#xff0c;中介公司通常会…

C国演义 [第九章]

第九章 买卖股票的最佳时机III题目理解步骤dp数组递推公式初始化遍历方向 代码 买卖股票的最佳时机IV题目理解步骤dp数组递推公式初始化遍历方向 代码 买卖股票的最佳时机III 力扣链接 给定一个数组&#xff0c;它的第 i 个元素是一支给定的股票在第 i 天的价格 设计一个算法…

每日科技分享-POE新增文件和链接发送功能

POE推出新功能 注意POE需要魔法上午才能进去。 实测 实测可以发送论文给chatgpt&#xff0c;然后和AI进行共享的对话。 POE网站链接&#xff1a; 也可以发送链接&#xff0c;实测了一下&#xff0c;似乎有时候并不准确&#xff0c;我发送了关于分层强化的文章&#xff0c;但是…

按首字母排序分组(类通讯录)

移动端开发过程中&#xff0c;有遇到按首字母分组排序的&#xff0c;仿通讯录效果 那实现过程中&#xff0c;我们需要安装插件 npm i --save js-pinyin 安装后使用&#xff1a; 在页面中引用 import Pinyin from js-pinyin 调用 const sortByFirstLetter (origin) &g…

笔记本电脑的电池健康:确保长时间使用和优异性能的关键

笔记本电脑已经成为我们日常生活中不可或缺的工具&#xff0c;无论是办公、学习还是娱乐&#xff0c;我们都依赖着它的便携性和高效性能。而在所有的硬件组件中&#xff0c;电池健康被认为是确保长时间使用和良好性能的关键因素之一。一块健康的电池不仅能提供持久的续航时间&a…

从零开始的抢购脚本开发-油猴开发教程(多快好省)

文章目录 前言为何学习 JavaScript&#xff1f; JS简介JavaScript 能够改变 HTML 内容 JavaScript 能够改变 HTML 属性JavaScript 能够改变 HTML 样式 (CSS)JavaScript 能够隐藏 HTML 元素JavaScript 能够显示 HTML 元素JS的使用外部脚本外部 JavaScript 的优势外部引用JavaScr…

【JavaEE】了解JVM

JVM的基本认识 文章目录 【JavaEE】了解JVM1. JVM中的内存区域划分1.1 JVM的核心区域1.2 JVM内存城防图 2. JVM的类加载机制2.1 loading2.2 verification2.3 preparation2.4 resolution2.5 initialization2.6 类加载触发的时机2.7 双亲委派模型 3. JVM中的垃圾回收策略3.1 JVM释…

cmake 提前结束处理命令: return

有时候,我们有这样的需求,当处理到某个地方的时候,后面的我们都不想处理或者不需要处理的时候,就可以提前结束当前的处理逻辑,回到父级去处理.在C/C中,我们有break关键字跳出当前循环,continue关键字进入下一次循环,return关键字返回当前处理的函数. cmake也提供了break(),con…