提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- 前言
- 一、线程池是什么?
- 二、代码示例
- 总结
前言
线程池c代码简单实现:
大致思路如下:
一个管理线程轮询工作线程是否空闲,空闲的话从工作队列中取出work函数给工作线程处理
提示:以下是本篇文章正文内容,下面案例可供参考
一、线程池是什么?
线程池是一种用于管理一组预先创建的线程的技术,它能够高效地处理大量并发任务。线程池的核心思想是复用线程,而不是为每个任务都创建和销毁线程,这样可以显著减少线程创建和销毁所带来的开销,并提高系统的整体性能。
线程池的优点
1,减少线程创建和销毁的开销:线程的创建和销毁是一个相对耗时的过程,线程池通过复用线程来避免频繁的创建和销毁,提高了效率。
2,控制并发度:线程池可以限制同时运行的线程数量,从而控制系统的并发度,防止过度消耗系统资源。
3,提高响应速度:由于线程已经在池中预先创建,当新任务到来时,可以直接从池中获取线程执行,无需等待线程创建过程。
4,资源管理:线程池可以更有效地管理资源,比如限制最大线程数量,避免系统资源耗尽。
线程池的基本组成部分
1,线程池:包含一组预先创建的线程。
2,任务队列:用于保存待处理的任务。
3,任务调度器:负责将任务分发给空闲的线程执行。
4,线程工厂:用于创建新线程(虽然线程池通常会复用线程,但在某些情况下可能需要创建新线程)。
线程池的工作流程
1,任务提交:当一个任务需要被执行时,它会被提交给线程池。
2,任务分配:如果线程池中有空闲线程,那么任务会被分配给其中一个线程执行。
3,任务执行:线程开始执行任务。
4,任务完成:任务完成后,线程回到线程池中等待分配新的任务。
二、代码示例
thread_pool.c
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include "double_list.h"
#define CONTAINER_OF(ptr, type, member) \
((type *)((char *)(ptr) - offsetof(type, member)))
/*
// 任务数据结构
typedef struct {
int id; //最好能传递 提交任务的线程pid
void* (*function)(void *); // 指向任务函数的指针
void *arg; // 传递给任务函数的参数
} task_t;
*/
typedef struct {
DoublyLinkedList list;
int stack_size;
pthread_t *thread_id; //工作线程的 thread id
task_t *thread_arg; //工作线程的 输入arg
int thread_num;
pthread_t thread_mg; //管理线程
int thread_pool_pause;
int thread_pool_stop;
void* ext_arg;
} thread_pool_t;
typedef struct {
thread_pool_t * tp;
int num;
} work_th_arg;
// 线程池工作线程的主循环
void *thread_pool_thread(void *arg) {
thread_pool_t *tp = ((work_th_arg *)arg)->tp;
int thread_num = ((work_th_arg *)arg)->num;
int ret = 0;
printf("tp = %x, thread_num = %d\n", tp, thread_num);
tp->thread_arg[thread_num].id = -1;
struct timespec ts, rem;
// 设置要睡眠的时间为5毫秒
ts.tv_sec = 0; // 秒
ts.tv_nsec = 5 * 1000 * 1000; // 5毫秒转换为纳秒
free(arg);
while (1) {
// 如果线程池关闭并且队列为空,则退出线程
if(tp->thread_pool_stop == 1 || tp->thread_arg == NULL) {
pthread_exit(NULL);
}
// 如果队列为空则等待
if(tp->thread_pool_pause == 1 || tp->thread_arg[thread_num].id == -1) {
nanosleep(&ts, &rem); // 睡眠5ms
continue;
}
// 获取任务
// 执行任务
ret = tp->thread_arg[thread_num].function(tp->thread_arg[thread_num].arg);
tp->thread_arg[thread_num].id = -1;
printf("thread_num = %d, do task done ret = %d\n", thread_num, ret);
}
}
// 线程池 管理线程的主循环
void *thread_pool_manage(void *arg) {
thread_pool_t *tp = (thread_pool_t *)arg;
task_t tmp_ta = {0};
int ret = 0;
struct timespec ts, rem;
// 设置要睡眠的时间为2毫秒
ts.tv_sec = 0; // 秒
ts.tv_nsec = 2 * 1000 * 1000; // 5毫秒转换为纳秒
printf("thread_pool_manage enter\n");
while (1) {
// 如果线程池关闭并且队列为空,则退出线程
if(tp->thread_pool_stop == 1 || tp->thread_arg == NULL) {
pthread_exit(NULL);
}
// 如果队列为空则等待
if(tp->thread_pool_pause == 1 || is_list_empty(&(tp->list))) {
//printf("nanosleep enter\n");
nanosleep(&ts, &rem); // 睡眠5ms
continue;
}
for(int i = 0; i < tp->thread_num; i++)
{
if(tp->thread_arg[i].id == -1)
{
printf("queue_dequeue enter\n");
ret = queue_dequeue(&(tp->list),&tmp_ta);
if(ret != 0)
{
break;
}
tp->thread_arg[i].function = tmp_ta.function;
tp->thread_arg[i].arg = tmp_ta.arg;
tp->thread_arg[i].id = tmp_ta.id;
}
}
}
}
int thread_pool_init(thread_pool_t *tp,int thread_num, int stack_size)
{
int ret = 0;
if(tp == NULL)
{
return -1;
}
if(thread_num == 0)
{
tp->thread_num = 100;
}else
{
tp->thread_num = thread_num;
}
if(stack_size == 0)
{
tp->stack_size = 1000;
}else
{
tp->stack_size = stack_size;
}
initDoublyList(&(tp->list));
list_set_maxlen(&(tp->list), tp->stack_size);
tp->thread_arg = malloc(tp->thread_num * sizeof(task_t));
if(tp->thread_arg == NULL)
{
return -1;
}
tp->thread_id = malloc(tp->thread_num * sizeof(pthread_t));
if(tp->thread_arg == NULL)
{
ret = -1;
goto error;
}
tp->thread_pool_pause = 1;
tp->thread_pool_stop = 0;
for(int i = 0; i < tp->thread_num; i++)
{
work_th_arg *th_arg = malloc(sizeof(work_th_arg));
if(th_arg == NULL)
{
return -1;
}
tp->thread_arg[i].id = -1;
th_arg->tp = tp;
th_arg->num = i;
if (pthread_create(&(tp->thread_id[i]), NULL, thread_pool_thread, (void*)th_arg) != 0) {
perror("Failed to create thread 1");
ret = -1;
free(th_arg);
goto error;
}
//printf("tp->thread_id[%d] = %d\n", i , tp->thread_id[i]);
}
if (pthread_create(&(tp->thread_mg), NULL, thread_pool_manage, (void*)tp) != 0) {
perror("Failed to create thread 1");
ret = -1;
goto error;
}
return 0;
error:
if(tp->thread_arg)
free(tp->thread_arg);
if(tp->thread_id)
free(tp->thread_id);
return ret;
}
int thread_pool_stop(thread_pool_t *tp)
{
if(NULL == tp)
{
return -1;
}
tp->thread_pool_stop = 1;
for(int i = 0; i < tp->thread_num; i++)
{
//printf("thread_pool_stop tp->thread_id[%d] = %d\n", i , tp->thread_id[i]);
pthread_join(tp->thread_id[i], NULL);
}
pthread_join(tp->thread_mg, NULL);
return 0;
}
int thread_pool_run(thread_pool_t *tp, int pause)
{
if(NULL == tp)
{
return -1;
}
tp->thread_pool_pause = pause;
return 0;
}
int thread_pool_uninit(thread_pool_t *tp)
{
int ret = 0;
if(tp == NULL)
{
return -1;
}
thread_pool_stop(tp);
clearDoublyList(&(tp->list));
if(tp->thread_arg)
free(tp->thread_arg);
if(tp->thread_id)
free(tp->thread_id);
tp->thread_pool_pause = 0;
tp->thread_pool_stop = 0;
tp->thread_mg = 0;
error:
return ret;
}
int task_fun(int arg)
{
printf("task_fun enter\n");
for(int i = 0; i < arg; i++)
{
for(int j = 0; j < 1000; j++)
{
asm("nop");
}
}
sleep(arg);
return arg;
}
int main()
{
task_t task_test = {0};
thread_pool_t *tp = malloc(sizeof(thread_pool_t));
thread_pool_init(tp, 10, 100);
//run
thread_pool_run(tp, 0);
for(int i = 0; i < 20; i++)
{
task_test.id = i;
task_test.function = task_fun;
task_test.arg = i;
queue_enqueue(&(tp->list), task_test);
}
usleep(30* 1000 * 1000);
thread_pool_uninit(tp);
free(tp);
}
double_list.h
#ifndef _DOUBLE_LIST_H_
#define _DOUBLE_LIST_H_
#define USE_MUTEX
#define MYTYPE task_t
//special struct define
// 任务数据结构
typedef struct {
int id; //最好能传递 提交任务的线程pid
void* (*function)(void *); // 指向任务函数的指针
void *arg; // 传递给任务函数的参数
} task_t;
typedef struct DoublyListNode {
MYTYPE data;
struct DoublyListNode* prev;
struct DoublyListNode* next;
} DoublyListNode;
typedef struct DoublyLinkedList {
DoublyListNode* head;
DoublyListNode* tail;
#ifdef USE_MUTEX
pthread_mutex_t lock; // 添加互斥锁
#endif
int list_len_max;
int list_len_cur;
} DoublyLinkedList;
void initDoublyList(DoublyLinkedList* list);
void clearDoublyList(DoublyLinkedList* list);
void printDoublyList(DoublyLinkedList* list);
void list_set_maxlen(DoublyLinkedList* list,int len);
int insertAtHead(DoublyLinkedList *list, MYTYPE value);
int insertAtTail(DoublyLinkedList *list, MYTYPE value);
int deleteAtHead(DoublyLinkedList *list, MYTYPE *value);
int deleteAtTail(DoublyLinkedList *list, MYTYPE *value);
//full/enpty
int is_list_empty(DoublyLinkedList *list);
int is_list_full(DoublyLinkedList *list);
//queue
int queue_enqueue(DoublyLinkedList *list, MYTYPE value);
int queue_dequeue(DoublyLinkedList *list, MYTYPE *value);
//stack
int stack_push(DoublyLinkedList *list, MYTYPE value);
int stack_pop(DoublyLinkedList *list, MYTYPE *value);
#endif
double_list.c
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include "double_list.h"
#include <string.h>
void list_set_maxlen(DoublyLinkedList* list, int len)
{
if(NULL == list)
{
printf("invalid parameter\n");
return;
}
list->list_len_max = len;
}
void initDoublyList(DoublyLinkedList* list) {
list->head = NULL;
list->tail = NULL;
#ifdef USE_MUTEX
pthread_mutex_init(&list->lock, NULL); // 初始化互斥锁
#endif
list->list_len_max = -1;
list->list_len_cur = 0;
}
//清空链表
void clearDoublyList(DoublyLinkedList* list) {
DoublyListNode* current = list->head;
while (current != NULL) {
DoublyListNode* next = current->next;
//printf("clearDoublyList free\n");
free(current);
current = next;
}
list->head = NULL;
list->tail = NULL;
list->list_len_cur = 0;
list->list_len_max = -1;
#ifdef USE_MUTEX
pthread_mutex_destroy(&list->lock);
#endif
}
//遍历并打印链表
void printDoublyList(DoublyLinkedList* list) {
#ifdef USE_MUTEX
pthread_mutex_lock(&list->lock);
#endif
DoublyListNode* current = list->head;
while (current != NULL) {
printf("%d <-> ", current->data);
current = current->next;
}
printf("NULL\n");
#ifdef USE_MUTEX
pthread_mutex_unlock(&list->lock);
#endif
}
//向链表头部插入元素
int insertAtHead(DoublyLinkedList *list, MYTYPE value) {
int ret = 0;
#ifdef USE_MUTEX
pthread_mutex_lock(&list->lock);
#endif
if(list == NULL || (list->list_len_max != -1 && list->list_len_cur >= list->list_len_max))
{
printf("invalid parameter or list full\n");
ret = -1;
goto error;
}
DoublyListNode *newNode = (DoublyListNode *)malloc(sizeof(DoublyListNode));
if (newNode == NULL) {
printf("Memory allocation failed\n");
ret = -1;
goto error;
}
memcpy(&(newNode->data), &value, sizeof(MYTYPE));
newNode->prev = NULL;
newNode->next = list->head;
if (list->head != NULL)
list->head->prev = newNode;
else
list->tail = newNode;
list->head = newNode;
list->list_len_cur++;
error:
#ifdef USE_MUTEX
pthread_mutex_unlock(&list->lock);
#endif
return 0;
}
//向链表尾部插入元素
int insertAtTail(DoublyLinkedList *list, MYTYPE value) {
int ret = 0;
#ifdef USE_MUTEX
pthread_mutex_lock(&list->lock);
#endif
if(list == NULL || (list->list_len_max != -1 && list->list_len_cur >= list->list_len_max))
{
printf("invalid parameter or list full\n");
ret = -1;
goto error;
}
DoublyListNode *newNode = (DoublyListNode *)malloc(sizeof(DoublyListNode));
if (newNode == NULL) {
printf("Memory allocation failed\n");
ret = -1;
goto error;
}
memcpy(&(newNode->data), &value, sizeof(MYTYPE));
newNode->next = NULL;
newNode->prev = list->tail;
if (list->tail != NULL)
list->tail->next = newNode;
else
list->head = newNode;
list->tail = newNode;
list->list_len_cur++;
error:
#ifdef USE_MUTEX
pthread_mutex_unlock(&list->lock);
#endif
return 0;
}
//从链表头部删除元素
int deleteAtHead(DoublyLinkedList *list, MYTYPE * value) {
int ret = 0;
if(list == NULL || value == NULL)
{
printf("invalid parameter\n");
return -1;
}
#ifdef USE_MUTEX
pthread_mutex_lock(&list->lock);
#endif
if (list->head == NULL) {
printf("List is empty\n");
list->tail = NULL;
ret = -1;
goto error;
}
// 头和尾指针指向一个node,只有一个元素
if(list->head == list->tail)
{
list->tail = NULL;
list->head->next = NULL;
}
DoublyListNode *nodeToDelete = list->head;
memcpy(value, &(nodeToDelete->data), sizeof(MYTYPE));
list->head = nodeToDelete->next;
if (list->head != NULL)
list->head->prev = NULL;
// else
// list->tail = NULL;
//printf("deleteAtHead free\n");
free(nodeToDelete);
list->list_len_cur--;
error:
#ifdef USE_MUTEX
pthread_mutex_unlock(&list->lock);
#endif
return 0;
}
//从链表末尾删除元素
int deleteAtTail(DoublyLinkedList *list, MYTYPE * value) {
int ret = 0;
if(list == NULL || value == NULL)
{
printf("invalid parameter\n");
return -1;
}
#ifdef USE_MUTEX
pthread_mutex_lock(&list->lock);
#endif
if (list->tail == NULL) {
printf("List is empty\n");
list->head = NULL;
ret = -1;
goto error;
}
// 头和尾指针指向一个node,只有一个元素
if(list->head == list->tail)
{
list->head = NULL;
list->tail->prev = NULL;
}
DoublyListNode *nodeToDelete = list->tail;
memcpy(value, &(nodeToDelete->data), sizeof(MYTYPE));
list->tail = nodeToDelete->prev;
if (list->tail != NULL)
list->tail->next = NULL;
// else
// list->head = NULL;
//printf("deleteAtTail free\n");
free(nodeToDelete);
list->list_len_cur--;
error:
#ifdef USE_MUTEX
pthread_mutex_unlock(&list->lock);
#endif
return ret;
}
int is_list_full(DoublyLinkedList *list)
{
int ret = 0;
#ifdef USE_MUTEX
pthread_mutex_lock(&list->lock);
#endif
if(list->list_len_max != -1 && list->list_len_cur >= list->list_len_max)
{
ret = 1;
}
#ifdef USE_MUTEX
pthread_mutex_unlock(&list->lock);
#endif
return ret;
}
int is_list_empty(DoublyLinkedList *list)
{
int ret = 0;
#ifdef USE_MUTEX
pthread_mutex_lock(&list->lock);
#endif
if(list->tail == NULL && list->head == NULL && list->list_len_cur == 0)
{
ret = 1;
}
#ifdef USE_MUTEX
pthread_mutex_unlock(&list->lock);
#endif
return ret;
}
// queue
int queue_enqueue(DoublyLinkedList *list, MYTYPE value)
{
return insertAtTail(list, value);
}
int queue_dequeue(DoublyLinkedList *list, MYTYPE *value)
{
return deleteAtHead(list, value);
}
//stack
int stack_push(DoublyLinkedList *list, MYTYPE value)
{
return insertAtHead(list, value);
}
int stack_pop(DoublyLinkedList *list, MYTYPE *value)
{
return deleteAtHead(list, value);
}
总结
线程池c代码实现,支持设置线程数和任务队列大小,可以运行仅供参考