线程池 在网络编程中是一个关键的组成部分,尤其是处理高并发请求时,线程池可以显著提高系统的性能和资源利用效率。它的关键组成部分包括以下几个要素:任务队列 :一个(或多个)用于存放待执行任务的队列。任务通常以函数指针或任务对象的形式存储在队列中。生产者 : 负责生成任务并将任务提交到线程池的任务队列中。消费者 : 从任务队列中获取任务并进行处理。
线程池结构
线程池由各种任务队列、用到的锁和一些属性组成。 https://github1s.com/pupnp/pupnp/blob/branch-1.14.x/upnp/src/threadutil/ThreadPool.h
typedef struct THREADPOOL
{
FreeList jobFreeList; /*! 任务的空闲列表。 */
LinkedList lowJobQ; /*! 低优先级任务队列。 */
LinkedList medJobQ; /*! 中等优先级任务队列。 */
LinkedList highJobQ; /*! 高优先级任务队列。 */
ThreadPoolJob *persistentJob; /*! 持久任务。 */
ithread_mutex_t mutex; /*! 保护任务队列的互斥锁。*/
ithread_cond_t condition; /*! 用于任务队列信号的条件变量。*/
ithread_cond_t start_and_shutdown; /*! 用于启动和关闭的条件变量。 */
// 该变量只在 创建消费者线程CreateWorker 时使用,为1时阻塞其他线程,创建完成后通过start_and_shutdown通知其他继续运行
int pendingWorkerThreadStart; /*! 是否在等待新工作线程启动。 */
int lastJobId; /*! 任务ID计数器。 */
int shutdown; /*! 标识线程池是否正在关闭。 */
int totalThreads; /*! 线程总数。 */
int busyThreads; /*! 当前正在执行任务的线程数量。 */
int persistentThreads; /*! 持久线程的数量。 */
ThreadPoolAttr attr; /*! 线程池的属性。 */
ThreadPoolStats stats; /*! 统计数据。 */
} ThreadPool;
线程池的任务队列结构(以LINKEDLIST 为例)
LINKEDLIST :生产者和消费者共同操作的任务队列,线程池中一般存在多个生产者和多个消费者(共同push、pop),所以操作时要加锁。 在LinkedList.h中也定义了链表的创建(ListInit)、插入(ListAddTail)和删除 (ListDelNode)函数。 另外需要注意的是,当void *ListDelNode(LinkedList *list, ListNode *dnode, int freeItem)的freeItem == 0,则节点中的项会从链表中移除但不会被释放。其地址作为函数的返回值。
typedef struct LINKEDLIST {
ListNode head;
ListNode tail;
long size;
FreeList freeNodeList;
free_function free_func;
cmp_routine cmp_func;
} LinkedList;
ThreadPoolAdd
函数调用ListAddTail
将通过TPJobInit
等函数创建并配置的ThreadPoolJob *
对象加入到目标队列中。生产者的调用过程可见(【p2p、分布式,区块链笔记 UPNP】: Libupnp sample 02 tv_device)。其中的TimerThreadSchedule(&gTimerThread, timeTillRead, REL_SEC, &job, SHORT_TERM, id);
通过ListAddBefore
函数将job添加到gTimerThread,最终通过TimerThreadWorker调用ThreadPoolAdd添加任务队列。 消费者从队列中获取任务head = ListHead(&tp->lowJobQ);job = (ThreadPoolJob *)head->item;
并删除节点ListDelNode(&tp->lowJobQ, head, 0);
。然后通过job->func(job->arg);
执行回调函数。消费者的调用过程见 【p2p、分布式,区块链笔记 UPNP】: Libupnp test_init.c 03 初始化SDK — 线程池初始化(UpnpInitThreadPools)的WorkerThread 。
ListNode
typedef struct LISTNODE {
struct LISTNODE *prev; // 指向前一个节点的指针
struct LISTNODE *next; // 指向下一个节点的指针
void *item; // 指向节点中存储的通用数据(任意类型的项,当前示例为指向ThreadPoolJob的指针)
} ListNode;
下面的例子中highJobQ的head为ListNode类型,其中的item
为ThreadPoolJob类型
// https://github1s.com/pupnp/pupnp/blob/branch-1.14.x/upnp/src/threadutil/ThreadPool.c#L547-L556
// 依次尝试从高、中、低优先级队列中取作业
if (tp->highJobQ.size > 0) {
head = ListHead(&tp->highJobQ);
if (head == NULL) {
tp->stats.workerThreads--;
goto exit_function;
}
job = (ThreadPoolJob *)head->item;
CalcWaitTime(tp, HIGH_PRIORITY, job);
ListDelNode(&tp->highJobQ, head, 0);
ThreadPoolJob
/*! Internal ThreadPool Job. */
typedef struct THREADPOOLJOB
{
start_routine func;
void *arg;
free_routine free_func;
struct timeval requestTime;
ThreadPriority priority;
int jobId;
} ThreadPoolJob;