【线程系列之五】线程池介绍C语言

news2024/9/23 7:28:51

一、基本概念

1.1 概念

线程池(Thread Pool)是一种基于池化技术管理线程的机制,旨在减少线程创建和销毁的开销,提高系统资源的利用率,以及更好地控制系统中同时运行的线程数量。线程池通过预先创建一定数量的线程,并将这些线程放入一个“池”中,当有新任务到来时,不是立即创建新线程来执行,而是从线程池中取出一个空闲的线程来执行该任务。如果所有线程都在忙碌,则新任务会等待直到有线程变得空闲。

在C语言中,由于标准库(如C89/C99/C11等)不支持线程或线程池,因此通常需要使用第三方库如POSIX线程(pthread)来实现线程池。

1.2 应用场景【C语言】

C语言中的线程池应用场景与在其他编程语言中类似,主要包括以下几类:

  • 高并发服务器: 在网络服务器中处理大量客户端请求。每个请求可以分配给一个线程池中的线程进行处理,以提高响应速度和吞吐量。

  • 数据处理和计算密集型任务: 当需要处理大量数据或执行复杂的计算时,可以将任务分配到线程池中并行执行,以缩短总体执行时间。

  • 资源密集型任务: 对于需要频繁访问共享资源(如数据库、文件系统等)的任务,使用线程池可以减少线程创建和销毁的开销,并可能通过更高效的资源管理来提高性能。

  • 异步操作: 在需要执行非阻塞操作(如异步I/O、异步网络请求等)时,线程池可以用来处理这些异步操作的结果或回调。

  • 定时任务和周期性任务: C语言本身不直接支持定时器和周期性任务,但你可以使用线程池中的线程来模拟这些功能,通过轮询或睡眠机制来检查时间并执行相应的任务。

1.3 实现线程池步骤
  • 定义线程池结构: 包括线程数量、任务队列、互斥锁、条件变量等;

  • 实现任务队列: 用于存储待执行的任务;

  • 编写线程工作函数: 该函数将被多个线程同时执行,并从任务队列中取出任务进行处理;

  • 初始化和管理线程池: 包括创建线程、启动线程、销毁线程等操作;

  • 添加任务到线程池: 提供接口将新任务添加到任务队列中,并通知等待的线程。

由于C语言相对底层,因此实现这些功能需要更多的手动编码和对系统资源的管理。此外,你还需要考虑线程安全和性能优化等问题,第三方库可在C++、python中查找,避免从头实现线程池。

下文将具体说明C语言实现线程池的代码。

二、实现

2.1 定义线程池结构

首先,定义一个包含线程池所需所有信息的结构体,如线程数组、任务队列、互斥锁、条件变量等。

#include <pthread.h>  
#include <stdbool.h>  
#include <stdlib.h> 

#define MAX_THREADS 4 
  
// 任务队列节点  
typedef struct task {  
    void (*function)(void*);  
    void* arg;  
    struct task* next;  
} task_t;  
  
// 线程池结构体  
typedef struct {  
    pthread_t threads[MAX_THREADS]; // 线程数组  
    pthread_mutex_t queue_mutex;    // 保护任务队列的互斥锁  
    pthread_cond_t queue_cond;      // 任务队列的条件变量  
    bool stop;                      // 线程池停止标志  
    task_t* head;                   // 任务队列头指针  
    task_t* tail;                   // 任务队列尾指针  
    int thread_count;               // 当前活跃线程数  
    int active_threads;             // 最大活跃线程数(可配置)  
} threadpool_t;
2.2 初始化线程池

实现一个函数来初始化线程池,包括创建线程、初始化同步等。

void* worker(void* arg) {  
    threadpool_t* pool = (threadpool_t*)arg;  
  
    while (true) {  
        pthread_mutex_lock(&pool->queue_mutex);  
  
        // 如果线程池已停止且没有任务,则退出循环  
        if (pool->stop && pool->head == NULL) {  
            pthread_mutex_unlock(&pool->queue_mutex);  
            break;  
        }  
  
        // 等待任务或线程池停止信号  
        while (!pool->stop && pool->head == NULL && pool->thread_count >= pool->active_threads) {  
            pthread_cond_wait(&pool->queue_cond, &pool->queue_mutex);  
        }  
  
        // 取出任务(如果线程池未停止且队列不为空)  
        task_t* task = NULL;  
        if (!pool->stop && pool->head != NULL) {  
            task = pool->head;  
            pool->head = task->next;  
            if (pool->head == NULL) {  
                pool->tail = NULL;  
            }  
            pool->thread_count--;  
        }  
  
        pthread_mutex_unlock(&pool->queue_mutex);  
  
        // 执行任务(如果任务不为空)  
        if (task != NULL) {  
            (*(task->function))(task->arg);  
            free(task); // 释放任务节点内存(如果任务节点是动态分配的)  
        }  
    }  
  
    return NULL;  
}  
  
void threadpool_init(threadpool_t* pool, int active_threads) {  
    pool->stop = false;  
    pool->head = pool->tail = NULL;  
    pool->thread_count = 0;  
    pool->active_threads = active_threads;  
  
    pthread_mutex_init(&pool->queue_mutex, NULL);  
    pthread_cond_init(&pool->queue_cond, NULL);  
  
    for (int i = 0; i < active_threads; i++) {  
        pthread_create(&pool->threads[i], NULL, worker, pool);  
    }  
}
2.3 添加任务到线程池

实现一个函数来向线程池的任务队列中添加任务,并唤醒一个等待的线程(如果有的话)。

void threadpool_add_task(threadpool_t* pool, void (*function)(void*), void* arg) {  
    task_t* new_task = (task_t*)malloc(sizeof(task_t));  
    new_task->function = function;  
    new_task->arg = arg;  
    new_task->next = NULL;  
  
    pthread_mutex_lock(&pool->queue_mutex);  
  
    if (pool->tail == NULL) {  
        pool->head = pool->tail = new_task;  
    } else {  
        pool->tail->next = new_task;  
        pool->tail = new_task;  
    }  
  
    pthread_cond_signal(&pool->queue_cond);  
    pthread_mutex_unlock(&pool->queue_mutex);  
}
2.4 销毁线程池

实现一个函数来停止所有线程并销毁线程池。

void threadpool_destroy(threadpool_t* pool) {  
    pool->stop = true;  
  
    pthread_cond_broadcast(&pool->queue_cond);  
  
    for (int i = 0; i < MAX_THREADS; i++) {  
        pthread_join(pool->threads[i], NULL);  
    }  
  
    pthread_mutex_destroy(&pool->queue_mutex);  
    pthread_cond_destroy(&pool->queue_cond);  
}

三、完整简单示例

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <pthread.h>
#include <unistd.h>

#define MAX_THREADS 5

typedef struct task {
    void (*function)(void*);
    void* arg;
    struct task* next;
} task_t;

typedef struct {
    pthread_t threads[MAX_THREADS];
    pthread_mutex_t queue_mutex;
    pthread_cond_t queue_cond;
    bool stop;
    task_t* head;
    task_t* tail;
    int thread_count;
    int active_threads;
} threadpool_t;

void* worker(void* arg) {
    threadpool_t* pool = (threadpool_t*)arg;

    while (true) {
        pthread_mutex_lock(&pool->queue_mutex);

        while (pool->head == NULL && !pool->stop) {
            pthread_cond_wait(&pool->queue_cond, &pool->queue_mutex);
        }

        if (pool->stop && pool->head == NULL) {
            pthread_mutex_unlock(&pool->queue_mutex);
            break;
        }

        task_t* task = pool->head;
        if (task != NULL) {
            pool->head = task->next;
            if (pool->head == NULL) {
                pool->tail = NULL;
            }
        }

        pthread_mutex_unlock(&pool->queue_mutex);

        if (task != NULL) {
            (*(task->function))(task->arg);
            free(task); // 释放任务节点内存
        }
        sleep(1);
    }

    return NULL;
}

void threadpool_init(threadpool_t* pool, int active_threads) {
    pool->stop = false;
    pool->head = pool->tail = NULL;
    pool->thread_count = 0;
    pool->active_threads = active_threads;

    pthread_mutex_init(&pool->queue_mutex, NULL);
    pthread_cond_init(&pool->queue_cond, NULL);

    for (int i = 0; i < active_threads; ++i) {
        pthread_create(&pool->threads[i], NULL, worker, pool);
    }
}

void threadpool_add_task(threadpool_t* pool, void (*function)(void*), void* arg) {
    task_t* new_task = (task_t*)malloc(sizeof(task_t));
    new_task->function = function;
    new_task->arg = arg;
    new_task->next = NULL;

    pthread_mutex_lock(&pool->queue_mutex);

    if (pool->tail == NULL) {
        pool->head = pool->tail = new_task;
    } else {
        pool->tail->next = new_task;
        pool->tail = new_task;
    }

    pthread_cond_signal(&pool->queue_cond);
    pthread_mutex_unlock(&pool->queue_mutex);
}

void threadpool_destroy(threadpool_t* pool) {
    pool->stop = true;

    pthread_mutex_lock(&pool->queue_mutex);
    pthread_cond_broadcast(&pool->queue_cond);
    pthread_mutex_unlock(&pool->queue_mutex);

    for (int i = 0; i < pool->active_threads; ++i) {
        pthread_join(pool->threads[i], NULL);
        printf("%d\r\n", i);
    }

    pthread_mutex_destroy(&pool->queue_mutex);
    pthread_cond_destroy(&pool->queue_cond);
}

// 示例任务函数
void example_task(void* arg) {
    int task_id = *(int*)arg;
    printf("Task %d is executing\n", task_id);
    free(arg); // 释放参数内存
    sleep(1);  // 模拟任务执行时间
}

int main() {
    threadpool_t pool;
    threadpool_init(&pool, 2); // 初始化线程池,最大活跃线程数为2

    // 添加示例任务
    for (int i = 0; i < MAX_THREADS; ++i) {
        int* arg = (int*)malloc(sizeof(int));
        *arg = i;
        threadpool_add_task(&pool, example_task, arg);
    }

    // 等待一段时间,观察任务执行情况
    sleep(5);

    // 销毁线程池
    threadpool_destroy(&pool);

    return 0;
}

运行结果为:

在这里插入图片描述

这段代码演示了一个简单的线程池的使用方式。在主函数中,我们初始化了一个线程池(最大活跃线程数为2),然后添加了5个示例任务(每个任务执行时间模拟为1秒)。在任务执行完毕后,通过调用 threadpool_destroy 函数来销毁线程池,确保所有任务都被执行完毕。

注意:这里销毁的时候保证知道数组中有多少个有效的线程ID,进而销毁。

可在结构体中添加pool->num_threads ,即用来保存线程池中线程数量的成员变量,应该在初始化线程池时进行设置,并在后续操作中根据需要使用,则不会出现在销毁时越界或死锁。

这个示例展示了如何使用线程池来管理并发执行的任务,以及如何通过互斥锁和条件变量来实现线程安全的任务队列操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1932566.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

FFMPEG提取音频流数据

FFmpeg是一套开源的计算机程序&#xff0c;主要用于记录、转换数字音频、视频&#xff0c;并能将其转化为流。它提供了录制、转换以及流化音视频的完整解决方案&#xff0c;被誉为多媒体业界的“瑞士军刀”。 1.使用ffmpeg命令实现音频流数据提取 [wbyqwbyq ffmpeg]$ ffmpeg …

全局 loading

好久不见&#xff01; 做项目中一直想用一个统一的 loading 状态控制全部的接口加载&#xff0c;但是一直不知道怎么处理&#xff0c;最近脑子突然灵光了一下想到了一个办法。 首先设置一个全局的 loading 状态&#xff0c;优先想到的就是 Pinia 然后因为页面会有很多接口会…

减分猫-12123货车驾驶证(学法减分)专用题目及答案 #知识分享#媒体

想要顺利通过驾驶考试&#xff0c;掌握一些常考题目和答案是非常有必要的。今天&#xff0c;我就为大家带来了这样一份资料——20道驾驶考试题目和答案解析&#xff0c;让你考试更有底气&#xff01;这些题目和答案不仅包括了考试中的重点和难点内容&#xff0c;还有针对每道题…

leetcode力扣_二分查找

69.x的平方根 给你一个非负整数 x &#xff0c;计算并返回 x 的 算术平方根 。由于返回类型是整数&#xff0c;结果只保留 整数部分 &#xff0c;小数部分将被 舍去 。注意&#xff1a;不允许使用任何内置指数函数和算符&#xff0c;例如 pow(x, 0.5) 或者 x ** 0.5 。 示例 1&…

神经网络构造

目录 一、神经网络骨架&#xff1a;二、卷积操作&#xff1a;三、卷积层&#xff1a;四、池化层&#xff1a;五、激活函数&#xff08;以ReLU为例&#xff09;&#xff1a; 一、神经网络骨架&#xff1a; import torch from torch import nn#神经网络 class CLH(nn.Module):de…

微信小程序 vant-weapp的 SwipeCell 滑动单元格 van-swipe-cell 滑动单元格不显示 和 样式问题 滑动后删除样式不显示

在微信小程序开发过程中 遇到个坑 此处引用 swipeCell 组件 刚开始是组件不显示 然后又遇到样式不生效 首先排除问题 是否在.json文件中引入了组件 {"usingComponents": {"van-swipe-cell": "vant/weapp/swipe-cell/index","van-cell-gro…

视频共享融合赋能平台LntonCVS视频监控业务平台技术方案详细介绍

LntonCVS国标视频综合管理平台是一款智慧物联应用平台&#xff0c;核心技术基于视频流媒体&#xff0c;采用分布式和负载均衡技术开发&#xff0c;提供广泛兼容、安全可靠、开放共享的视频综合服务。该平台功能丰富&#xff0c;包括视频直播、录像、回放、检索、云存储、告警上…

【C++开源】GuiLite:超轻量UI框架-入门

开发环境说明 使用visual Studio 2022进行开发 下载源码 从如下的网址进行源码和示例代码的下载: GitHub源码网址为:idea4good/GuiLite示例代码路径为:idea4good/GuiLiteExample使用方法 GuiLite是一个仅有头文件的一个库,使用的时候直接include到自己的UIcode.cpp文件…

Golang面试题整理(持续更新...)

文章目录 Golang面试题总结一、基础知识1、defer相关2、rune 类型3、context包4、Go 竞态、内存逃逸分析5、Goroutine 和线程的区别6、Go 里面并发安全的数据类型7、Go 中常用的并发模型8、Go 中安全读写共享变量方式9、Go 面向对象是如何实现的10、make 和 new 的区别11、Go 关…

Elasticsearch 企业级实战 01:Painless 脚本如何调试?

在企业级应用中&#xff0c;Elasticsearch 常常被用来处理复杂的数据查询和操作。 Painless 是 Elasticsearch 的内置脚本语言&#xff0c;虽然强大&#xff0c;但调试起来并不容易。 本文将详细介绍如何在实战中有效调试 Painless 脚本&#xff0c;以提高开发和运维效率。 本文…

打印室预约小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;附近打印店管理&#xff0c;文件打印管理&#xff0c;当前预约管理&#xff0c;预约历史管理&#xff0c;打印记录管理 开发系统&#xff1a;Windows 架构模式&#xff1a;SSM JD…

微分段Microsegmentation简介

目录 微分段Microsegmentation简介什么是微分段&#xff1f;微分段的防范措施微分段的防护层级 基于网络的微分段微分段基本工作机制微分段的角色VxLAN的额外字段 业务链分组与传输策略场景1&#xff1a;三层报文本地转发场景场景2&#xff1a;三层报文跨设备转发场景 微分段的…

微信小程序与本地MySQL数据库通信

微信小程序与本地MySQL数据库通信 因为本地MySQL服务器没有域名&#xff0c;也没有进行相应的请求操作封装&#xff0c;因此微信小程序没办法和数据库通信。 但是对于开发人员来说&#xff0c;没有数据库&#xff0c;那还能干撒&#xff1f;虽然我尝试过用json-server&#x…

Android音视频—OpenGL 与OpenGL ES简述,渲染视频到界面基本流程

文章目录 OpenGL 简述特点和功能主要组件OpenGL ES当前状态 OpenGL ES 在 Android 上进行视频帧渲染总体流程 OpenGL 简述 OpenGL&#xff08;Open Graphics Library&#xff09;是一个跨平台的、语言无关的应用程序编程接口&#xff08;API&#xff09;&#xff0c;用于开发生…

关于 Redis 中分布式锁

什么是分布式锁 在一个分布式系统中&#xff0c;也会涉及到多个节点访问同一个公共资源的情况。此时就需要通过锁来做互斥控制&#xff0c;避免出现类似于“线程安全”的问题。 而 Java 中的 synchronized 或者 C 中的 std::mutex&#xff0c;这样的锁都只能在当前进程中生效…

allure_pytest:AttributeError: ‘str‘ object has no attribute ‘iter_parents‘

踩坑记录 问题描述&#xff1a; 接口自动化测试时出现报错&#xff0c;报错文件是allure_pytest库 问题分析&#xff1a; 自动化测试框架是比较成熟的代码&#xff0c;报错也不是自己写的文件&#xff0c;而是第三方库&#xff0c;首先推测是allure_pytest和某些库有版本不兼…

Hadoop3:RPC通信原理及简单案例实现

一、场景介绍 我们知道&#xff0c;Hadoop中存在多种服务&#xff0c;那么&#xff0c;服务之间是如何通信的了&#xff1f; 比如&#xff0c;DN和NN之间如何通信&#xff1f; 这里&#xff0c;实际上是通过RPC实现进程间通信的了。 RPC属于Java网络编程范畴 需要编写客户端和…

Apache POI 使用Java处理Excel数据 进阶

1.POI入门教程链接 http://t.csdnimg.cn/Axn4Phttp://t.csdnimg.cn/Axn4P建议&#xff1a;从入门看起会更好理解POI对Excel数据的使用和处理 记得引入依赖&#xff1a; <dependency><groupId>org.apache.poi</groupId><artifactId>poi</artifactI…

Java中锁的全面详解(深刻理解各种锁)

一.Monitor 1. Java对象头 以32位虚拟机位例 对于普通对象,其对象头的存储结构为 总长为64位,也就是8个字节, 存在两个部分 Kclass Word: 其实也就是表示我们这个对象属于什么类型,也就是哪个类的对象.而对于Mark Word.查看一下它的结构存储 64位虚拟机中 而对于数组对象,我…

阿里云开源 Qwen2-Audio 音频聊天和预训练大型音频语言模型

Qwen2-Audio由阿里巴巴集团Qwen团队开发&#xff0c;它能够接受各种音频信号输入&#xff0c;对语音指令进行音频分析或直接文本回复。与以往复杂的层次标签不同&#xff0c;Qwen2-Audio通过使用自然语言提示简化了预训练过程&#xff0c;并扩大了数据量。 喜好儿网 Qwen2-Au…