线程池-手写线程池Linux C简单版本(生产者-消费者模型)

news2024/10/5 22:17:17

目录

  • 简介
  • 手写线程池
    • 线程池结构体分析
      • task_t
      • task_queue_t
      • thread_pool_t
    • 线程池函数分析
      • thread_pool_create
      • thread_pool_post
      • thread_worker
      • thread_pool_destroy
      • wait_all_done
      • thread_pool_free
    • 主函数调用
  • 运行结果

简介

本线程池采用C语言实现

线程池的场景:

当某些任务特别耗时(例如大量的IO读写操作),严重影响线程其他的任务的执行,可以使用线程池

线程池的一般特点:

线程池通常是一个生产者-消费者模型
生产者线程用于发布任务,任务通常保存在任务队列中
线程池作为消费者,用于取出任务,执行任务

线程池中线程数量的选择:

有一个经验公式: 线程数量 =(io等待时间+cpu运算时间)*核心数/cpu运算时间

因此可以根据经验公式得出下面两种场景的线程数量:

  • cpu密集任务:线程数量=核心数(即上面的公式假设cpu运算时间>>io等待时间)
  • io密集任务:线程数量=2*n+2

手写线程池

线程池代码结构:

  • thread_pool_create:创建线程池所需要的资源,包含不限于任务队列,子线程的创建。
  • thread_pool_post:用于任务的发布,将执行任务存在任务队列中。
  • thread_pool_destroy:用于线程池的退出,以及资源的销毁。
  • wait_all_done:join线程池所有子线程,等待回收子线程。
  • thread_worker:用于任务执行。

主要的核心点集中在thread_pool_post和thread_worker两个函数中,这两个函数也构成了生产者-消费者模型。本文采用队列+互斥锁+条件变量实现。

线程池结构体分析

由于C语言不像C++可以用类封装函数,因此线程池会使用结构体来封装一些变量或者函数指针。

task_t

封装任务的入口指针以及参数。

typedef struct task_t {
    handler_pt func;
    void * arg;
} task_t;

task_queue_t

封装任务队列,为了不频繁移动队列中数据,此处采用头尾索引来标记任务。

typedef struct task_queue_t {
    uint32_t head;
    uint32_t tail;
    uint32_t count;
    task_t *queue;
} task_queue_t;

thread_pool_t

包含互斥锁,条件变量,任务队列等信息

struct thread_pool_t {
    pthread_mutex_t mutex;
    pthread_cond_t condition; //条件变量
    pthread_t *threads; //线程
    task_queue_t task_queue; //任务队列

    int closed; //是否关闭线程池执行的标志,为1表示关闭
    int started; // 当前正在运行的线程数

    int thrd_count; //线程数
    int queue_size; //任务队列大小
};

其中closed:表示是否关闭线程池执行的标志,为1表示关闭。在线程的运行函数中,用来判断是否继续循环等待执行任务队列中的任务。
started:表示当前正在运行的线程数。在thread_pool_destroy函数中销毁线程池时,需要等待所有线程停止才行,即started == 0

线程池函数分析

thread_pool_create

创建线程池,初始化一些线程池属性
通过循环pthread_create函数创建子线程。

thread_pool_t *thread_pool_create(int thrd_count, int queue_size) {
    thread_pool_t *pool;

    if (thrd_count <= 0 || queue_size <= 0) {
        return NULL;
    }
    pool = (thread_pool_t*) malloc(sizeof(*pool));
    if (pool == NULL) {
        return NULL;
    }
    pool->thrd_count = 0;
    pool->queue_size = queue_size;
    pool->task_queue.head = 0;
    pool->task_queue.tail = 0;
    pool->task_queue.count = 0;

    pool->started = pool->closed = 0;

    pool->task_queue.queue = (task_t*)malloc(sizeof(task_t)*queue_size);
    if (pool->task_queue.queue == NULL) {
        // TODO: free pool
        return NULL;
    }

    pool->threads = (pthread_t*) malloc(sizeof(pthread_t) * thrd_count);
    if (pool->threads == NULL) {
        // TODO: free pool
        return NULL;
    }
    int i = 0;
    for (; i < thrd_count; i++) {
        if (pthread_create(&(pool ->threads[i]), NULL, thread_worker, (void*)pool) != 0) {
            // TODO: free pool
            return NULL;
        }
        pool->thrd_count++;
        pool->started++;
    }
    return pool;
}

thread_pool_post

作为生产者,往任务队列里面添加任务
通过pthread_cond_signal通知子唤醒子线程的pthread_cond_wait

int thread_pool_post(thread_pool_t *pool, handler_pt func, void *arg) {
    if (pool == NULL || func == NULL) {
        return -1;
    }
    task_queue_t *task_queue = &(pool->task_queue);
//此处用自旋锁会更节省消耗,因为锁里面的逻辑比较简单
    if (pthread_mutex_lock(&(pool->mutex)) != 0) {
        return -2;
    }

    if (pool->closed) {
        pthread_mutex_unlock(&(pool->mutex));
        return -3;
    }

    if (task_queue->count == pool->queue_size) {
        pthread_mutex_unlock(&(pool->mutex));
        return -4;
    }
//避免queue数据的变化,采用头尾索引来标识
    task_queue->queue[task_queue->tail].func = func;
    task_queue->queue[task_queue->tail].arg = arg;
    task_queue->tail = (task_queue->tail + 1) % pool->queue_size;
    task_queue->count++;
//唤醒一个休眠的线程
    if (pthread_cond_signal(&(pool->condition)) != 0) {
        pthread_mutex_unlock(&(pool->mutex));
        return -5;
    }
    pthread_mutex_unlock(&(pool->mutex));
    return 0;
}

thread_worker

pthread_cond_wait等待任务的唤醒
作为消费者, (*(task.func))(task.arg);执行任务

static void *thread_worker(void *thrd_pool) {
    thread_pool_t *pool = (thread_pool_t*)thrd_pool;
    task_queue_t *que;
    task_t task;
    for (;;) {
        pthread_mutex_lock(&(pool->mutex));
        que = &pool->task_queue;
        while (que->count == 0 && pool->closed == 0) {
            // 阻塞在 condition,等待任务队列添加任务
            pthread_cond_wait(&(pool->condition), &(pool->mutex));
        }
        if (pool->closed == 1 && que->count == 0) break;//没有任务,并且关闭标志打开,即跳出循环
        task = que->queue[que->head];
        que->head = (que->head + 1) % pool->queue_size;
        que->count--;
        pthread_mutex_unlock(&(pool->mutex));
        (*(task.func))(task.arg);//执行对应任务函数
    }
    pool->started--;//跳出循环之后,运行线程数需要减1
    pthread_mutex_unlock(&(pool->mutex));
    pthread_exit(NULL);
    return NULL;
}

thread_pool_destroy

销毁释放线程池,置 pool->closed = 1;
通过pthread_cond_broadcast唤醒线程池所有线程,这个和thread_pool_post里的pthread_cond_signal一样,并且broadcast会通知到所有的线程

int thread_pool_destroy(thread_pool_t *pool) {
    if (pool == NULL) {
        return -1;
    }

    if (pthread_mutex_lock(&(pool->mutex)) != 0) {
        return -2;
    }

    if (pool->closed) {
        thread_pool_free(pool);
        return -3;
    }
    pool->closed = 1;
//广播形式,通知所有阻塞在condition的线程接触阻塞
    if (pthread_cond_broadcast(&(pool->condition)) != 0 || 
            pthread_mutex_unlock(&(pool->mutex)) != 0) {
        thread_pool_free(pool);
        return -4;
    }
    wait_all_done(pool);
    thread_pool_free(pool);
    return 0;
}

wait_all_done

将所有线程通过pthread_join回收,所有子线程任务执行完毕,回收线程

int wait_all_done(thread_pool_t *pool) {
    printf("wait_all_done start!pool->thrd_count:%d\n", pool->thrd_count);
    int i, ret=0;
    for (i=0; i < pool->thrd_count; i++) {
        printf("wait_all_done doing! i:%d\n", i);
        if (pthread_join(pool->threads[i], NULL) != 0) {
            ret=1;
        }
        
    }
    printf("wait_all_done end!\n");
    return ret;
}

thread_pool_free

释放线程池空间

static void thread_pool_free(thread_pool_t *pool) {
    if (pool == NULL || pool->started > 0) {
        return;
    }

    if (pool->threads) {
        free(pool->threads);
        pool->threads = NULL;

        pthread_mutex_lock(&(pool->mutex));
        pthread_mutex_destroy(&pool->mutex);
        pthread_cond_destroy(&pool->condition);
    }

    if (pool->task_queue.queue) {
        free(pool->task_queue.queue);
        pool->task_queue.queue = NULL;
    }
    free(pool);
}

主函数调用

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

#include "thrd_pool.h"

int nums = 0;
int done = 0;
int task_num = 100;

pthread_mutex_t lock;

void do_task(void *arg) {
    usleep(10000);
    pthread_mutex_lock(&lock);
    done++;
    printf("doing %d task\n", done);
    pthread_mutex_unlock(&lock);
}

int main(int argc, char **argv) {
    int threads = 8;
    int queue_size = 256;

    if (argc == 2) {
        threads = atoi(argv[1]);
        if (threads <= 0) {
            printf("threads number error: %d\n", threads);
            return 1;
        }
    } else if (argc > 2) {
        threads = atoi(argv[1]);
        queue_size = atoi(argv[1]);
        if (threads <= 0 || queue_size <= 0) {
            printf("threads number or queue size error: %d,%d\n", threads, queue_size);
            return 1;
        }
    }

    thread_pool_t *pool = thread_pool_create(threads, queue_size);
    if (pool == NULL) {
        printf("thread pool create error!\n");
        return 1;
    }

    while (thread_pool_post(pool, &do_task, NULL) == 0) {
        pthread_mutex_lock(&lock);
        nums++;
        pthread_mutex_unlock(&lock);
        if (nums > task_num) break;
    }

    printf("add %d tasks\n", nums);
    usleep(1000000);//延时等待所有的作业完成

    printf("did %d tasks\n", done);
    thread_pool_destroy(pool);
    return 0;
}

运行结果

使用指令编译文件:

gcc main.c thrd_pool.c -o main -lpthread

运行执行文件得到运行结果
在这里插入图片描述在这里插入图片描述

完整代码下载线程池Linux C语言简单版本

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/827233.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Delphi 开发的QR二维码生成工具,开箱即用

目录 一、基本功能&#xff1a; 二、使用说明&#xff1a; 三、操作演示gif 四、下载链接 在日常的开发中&#xff0c;经常需要将一个链接生成为二维码图片&#xff0c;特别是在进行支付开发的时候&#xff0c;因为我们支付后台获取了支付链接&#xff0c;需要变成二维码扫…

设计模式行为型——解释器模式

目录 什么是解释器模式 解释器模式的实现 解释器模式角色 解释器模式类图 解释器模式举例 解释器模式代码实现 解释器模式的特点 优点 缺点 使用场景 注意事项 实际应用 什么是解释器模式 解释器模式&#xff08;Interpreter Pattern&#xff09;属于行为型模式&…

SOLIDWORKS中的弹簧设计指南

SOLIDWORKS是一款广泛使用的三维计算机辅助设计软件&#xff0c;可以用于设计各种机械零件和组件&#xff0c;包括弹簧。在SOLIDWORKS中设计弹簧需要注意一些关键点&#xff0c;本文将为您介绍SOLIDWORKS中的弹簧设计指南。 1. 弹簧类型 按受力性质&#xff0c;弹簧类型包括压…

小程序云开发快速入门(1/4)

前言 从上次完成了码仔备忘录本地版本后&#xff0c;码仔就养成了每天记录备忘录的好习惯&#xff0c;每周早上会记录下自己要做的任务&#xff0c;然后晚上在复盘一下今天的计划是否完成。 有一天&#xff0c;码仔看到它最喜欢的码妞在一旁愁眉苦脸。 码仔&#xff1a;“怎么…

5个设计师必备的绘画工具,不看错亿

在设计工作中&#xff0c;绘画工具是设计师经常会用到的设计工具&#xff0c;今天本文将与大家分享5个好用的绘画工具&#xff0c;一起来看看吧&#xff01; 1、即时灵感 即时灵感是一款非常受欢迎的绘画工具&#xff0c;它为设计师提供了自由的绘画方式&#xff0c;也提供了…

【雕爷学编程】Arduino动手做(181)---Maixduino AI开发板

37款传感器与执行器的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止这37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&am…

091.粉刷房子

一、题目 剑指 Offer II 091. 粉刷房子 - 力扣&#xff08;LeetCode&#xff09; 二、代码 class Solution { public:int minCost(vector<vector<int>>& costs) {int row costs.size();int col costs[0].size();if (row 1)return min(min(costs[0][0], cos…

Liunx开发工具

Liunx开发工具 1.Linux编辑器-vim使用1.1vim的基本概念1.2vim的基本操作1.3命令模式命令集1.3.1光标定位1.3.2光标移动1.3.3文本复制1.3.4文本操作 1.4插入模式命令集1.5底行模式命令集 2.vim配置3.sudo配置4.Linux编辑器-gcc/g使用4.1背景知识4.2gcc如何操作 5.函数库5.1函数库…

ES6 - generator和async函数

一、前言 ES6 诞生以前&#xff0c;异步编程的方法&#xff0c;大概有下面四种。 回调函数事件监听发布/订阅Promise 对象 回调函数本身并没有问题&#xff0c;它的问题出现在多个回调函数嵌套会造成回调地狱&#xff0c;非常不利于代码的维护和逻辑混乱等问题&#xff1b; …

数论分块学习笔记

准备开始复习莫比乌斯反演&#xff0c;杜教筛这一部分&#xff0c;先复习一下数论分块 0.随便说说 数论分块可以计算如下形式的式子 ∑ i 1 n f ( i ) g ( ⌊ n i ⌋ ) \sum_{i1}^{n}f(i)g(\lfloor\frac{n}{i}\rfloor) ∑i1n​f(i)g(⌊in​⌋)。 利用的原理是 ⌊ n i ⌋ \lf…

StarRocks数据库部署全记录(保姆式帮助你初次体验StarRocks)

因业务需要&#xff0c;特此了解StarRocks产品和部署。 接触过程中发现指导资料很稀少&#xff0c;本人将结合官方的手册其他开源博主指导&#xff0c;将第一次接触到的概念和部署流程梳理&#xff0c;得出本文。 已有的资源中对细节介绍欠缺&#xff0c;导致我本人整个过程中花…

fifo读写的数据个数

fifo IP核设置读写个数 如果不勾选精确值&#xff0c;则统计的当前写入和待读出的数据为估计值&#xff0c;可能会相差2个左右。且fifo设计的wr_data_count. wr_data_count&#xff1a;当前的fifo中剩余已经写入的数据。 rd_data_count&#xff1a;当前的fifo中剩余可以读出…

Codeforces Round 855 (Div. 3) E题题解

文章目录 [ Unforgivable Curse (hard version)](https://codeforces.com/contest/1800/problem/E2)问题建模问题分析方法1分析性质1.分析操作对元素位置的影响2.分析可以使用操作的元素可以与相邻元素交换位置的作用代码 方法2通过DFS得到相互可以交换位置的字符集合代码 方法…

vue3和typescript_组件

1 components下新建myComponent.vue 2 页面中引入组件&#xff0c;传入值&#xff0c;并且绑定事件函数。 3

原型链污染,nodejs逃逸例子

文章目录 原型链污染原型链污染原理原型链污染小例子 原型链污染题目解析第一题第二题 Nodejs沙箱逃逸方法一方法二 原型链污染 原型链污染原理 原型链 function test(){this.a test; } b new test;可以看到b在实例化为test对象以后&#xff0c;就可以输出test类中的属性a…

关于Linux启动后eth0网卡起不来的问题

1./etc/udev/rules.d/70-persistent-net.rules 先到这个文件中 将eth0注掉 ## 同时记录ADDR 2.mv /etc/sysconfig/network-scripts/ifcfg-eth0 /etc/sysconfig/network-scripts/ifcfg-eth2 注意这个eth2, 要和第一步的号码对应 同时进入文件,将设备和ADDR修改 3.重启网络 servi…

FTP文件传输协议

FTP文件传输协议 介绍 将某台计算机中的文件通过网络传送到可能相距很远的另一台计算机中&#xff0c;是一项基本的网络应用&#xff0c;即文件传送文件传输协议(File Transfer Protocol)是因特网上使用得最广泛的文件传输协议 FTP提供交互式访问&#xff0c;允许客户指明文件…

flask中写一个基础的sqlHelper类

写一个SQLHelper类&#xff1a; from flask_sqlalchemy import SQLAlchemydb SQLAlchemy()class SQLHelper:staticmethoddef add(record):db.session.add(record)return SQLHelper.session_commit()staticmethoddef add_all(records):db.session.add_all(records)return SQLH…

FFmepg视频解码

1 前言 上一篇文章<FFmpeg下载安装及Windows开发环境设置>介绍了FFmpeg的下载安装及环境配置&#xff0c;本文介绍最简单的FFmpeg视频解码示例。 2 视频解码过程 本文只讨论视频解码。 FFmpeg视频解码的过程比较简单&#xff0c;实际就4步&#xff1a; 打开媒体流获取…

Meta-Transformer:基于Transformer的多模态感知,融合Token化与共享编码

论文标题&#xff1a;Meta-Transformer: A Unified Framework for Multimodal Learning 论文地址&#xff1a;https://arxiv.org/pdf/2307.10802.pdf 这里写目录标题 引言基于Transformer的多模态发展Meta-Transformer框架预备知识数据到序列如何分词&#xff08;Data-to-Seq…