构建与优化自定义进程池

news2025/1/14 1:02:13
1. 什么是进程池?

简单来说,进程池就是预先创建固定数量的工作进程,通过设计任务队列或调度算法来分配任务给空闲的进程 —— 实现“负载均衡”。

2. 进程池框架设计

枚举错误返回值:

enum
{
    UsageError = 1,
    ArgError,
    PipeError
};
i. 设定子进程数量与格式验证

设置进程池的默认调用方式: ./processpool sub_process_nums

#include <iostream>
using namespace std;

void Usage(char* argv)
{
    cout << "Usage:" << endl;
    cout << "\t" << "argv sub_process_nums" << endl;
}

int main(int argc, char* argv[])
{
    if (argc < 2) // 命令行指令的本质是字符串, 字符串个数 < 2 时返回
    {
        Usage(argv[0]);
        return UsageError;
    }
    // ...
    
    return 0;
}
ii. 控制与管理子进程 之 " 创建通信管道与进程 "

在这部分内容开始之前,不妨先明晰一个问题: **“先创建管道再创建子进程,还是先创建子进程再创建管道?” **

要回答这个问题,我们需要了解 通信管道 的本质 及 fork 函数

  • 通信管道 (通信信道)

当你调用 pipe() 时,操作系统会在 文件描述符表 中为管道分配两个条目,一个用于写入 —— “写端”,一个用于读取 —— “读端”;

管道的 读端写端 本质上都是 文件描述符

  • fork 函数

fork() 的主要功能是,从当前进程(父进程)中创建一个新的进程(子进程);

子进程继承了父进程的资源限制、环境变量、打开的文件描述符表、工作目录等,但它们是独立的实体。

根据以上信息,不难得出:先创建管道、再创建子进程,子进程会继承父进程打开的文件描述符表,接着只需要关闭父进程的读端、子进程的写端,即可实现父子进程间的通信(反之亦然)。

#include <unistd.h>

int main(int argc, char* argv[])
{
    int sub_process_nums = stoi(argv[1]); // c标准库中的函数,将字符串转整型
    if (sub_process_nums <= 0)	
        return ArgError;
    
    // 1. 创建通信管道与进程
    for (int i = 0; i < sub_process_nums; i++)
    {
        int pipefd[2];
        int n = pipe(pipefd);
        
        // 创建管道成功返回 0,失败返回 -1
        if (n < 0)
            return PipeError;
        
        pid_t id = fork();
        if (id == 0)
        {
            // child 负责读
            close(pipefd[1]); // 关闭写端
            // todo
            
            exit(0); // 执行完退出
        }
        
        // father 负责写
        close(pipefd[1]); // 关闭读端
        
    }
    
    // ...
}

int pipefd[2];

int n = pipe(pipefd); ——> 管道创建成功后,pipefd[0] 为 读端文件描述符,pipefd[1] 为 写端文件描述符。

3. 封装通信管道与进程池
i. class Channel

为了保存循环创建的通信管道和子进程信息,我们封装一个 通信管道 类型。

#include <string>

class Channel
{
public:
    Channel(int wfd, pid_t process_id, const string& name)
    	:_wfd(wfd), _sub_process_id(process_id), _name(name)
    {}
    
    // 观察父进程创建子进程时的现象
    void Debug()
    {
        cout << "_wfd: " << _wfd;
        cout << ", _sub_process_id: " << _sub_process_id;
        cout << ", _name: " << _name << endl;
    }
    
    // 增加获取管道各种信息的接口
    int Wfd() { return _wfd; }
    pid_t Pid() { return _sub_processs_id; }
    string Name() { return _name; }
    
    ~Channel() {}
private:
    int _wfd; // 写端文件描述符
    pid_t _sub_process_id;
    string _name;
};

我们并未在 Channel 中封装读端文件描述符,因为我们将在每次循环中对 stdin 做重定向 —— dup2(pipefd[0], 0) ,之后子进程在运行时,只需要向 标准输入stdin —— 0 中读取任务指令即可。

通信管道本质上是文件,管道的读端和写端本质上是文件描述符;

dup2() 的工作原理,是将第一个参数指定的文件描述符,复制到第二个参数指定的位置。

ii. class ProcessPool

封装进程池,是为了更好地控制与管理子进程。

#include <vector>

class ProcessPool
{
public:
    ProcessPool(int sub_process_num) 
    	:_sub_process_num(sub_process_num)
    {} 
    ~ProcessPool() {}
    
    int CreatChannels(work_t work) // 回调函数
    {
        // 1. 创建通信信道和进程
        for (int i = 0; i < _sub_process_num; i++)
        {
            // 先创建管道
            int pipefd[2];
            int n = pipe(pipefd);
            if (n < 0)
            {
                return PipeError;
            }

            // 再创建子进程,确保父进程和子进程读写同一根管道
            pid_t id = fork();
            if (id == 0)
            {
                // child -> r
                close(pipefd[1]);

                // TODO
                dup2(pipefd[0], 0); // 将 pipefd[0] 重定向
                work(pipefd[0]); // 方便后续在子进程中观察每个管道读端的文件描述符

                // sleep(100);
                exit(0);
            }

            // father
            close(pipefd[0]);
            string cname = "channel--" + to_string(i);

            _channels.push_back(Channel(pipefd[1], id, cname));
        }

        return 0;
    }
    
private:
    vector<Channel> _channels;
    int _sub_process_num;
};

int CreatChannels(work_t work) { } 中有一两个细节:

一为前文提到过的,重定向;

第二,即这个函数的参数 —— 这种编程模式也叫做 回调函数 —— 将函数作为参数传递给另一个函数,以便特定条件发生时供后者调用。

我们将子进程待执行的函数,作为参数传入 CreatChannels() 中供子进程调用,后续我们只需对传入参数(传入不同的函数)进行修改就可以让子进程执行不同的任务而不用对 CreadChannels() 函数体进行修改。

4. 负载均衡式任务调度
#include <stdlib.h>
#include <time.h>

void CtrlProcess(ProcessPool* ptr, int cnt)
{
    while (cnt)
    {
        // a. 选择一个通道和进程
        int channel = ptr->NextChannel();
        // b. 选择一个任务
        int task = NextTask();
        // c. 发送任务
        ptr->SendTask(channel, task);
        
        sleep(1); // 每隔 1s 发送一次任务
        --cnt;
    }
}

int main()
{
    // ...
    // 1. 创建通信管道与进程
    ProcessPool *processpool_ptr = new ProcessPool(sub_process_num); // sub_process_num 为要创建子进程的个数
    processpool_ptr->CreatChannels(worker); // worker() 待补充
    
    srand(time(nullptr));
    
    // 2. 任务调度
    CtrlProcess(processpool_ptr, 10); // 假定 10 个任务
    cout << "task run done" << endl;
    
    // 3. 回收进程
    
    
    delete processpool_ptr;
    return 0;
}
  • 选择一个通道和进程
class ProcessPool
{
public:
    int NextChannel()
    {
        static int next = 0;
        int c = next;
        next++;
        next %= _channels.size(); // 防止越界
        return c;
    }
};
  • 选择一个任务
typedef void(*task_t)(int, pid_t); // 函数指针类型

// 模拟任务
void PrintLog(int fd, pid_t id)
{
    cout << "channel rfd: " << fd << ", sub process: " << id << ", task: Print log task" << endl << endl;
}

void ConnectMysql(int fd, pid_t id)
{
    cout << "channel rfd: " << fd << ", sub process: " << id << ", task: Connect mysql task" << endl << endl;
}

void ReloadConf(int fd, pid_t id)
{
    cout << "channel rfd: " << fd << ", sub process: " << id << ", task: Reload conf task" << endl << endl;
}

task_t tasks[3] = {PrintLog, ConnectMysql, ReloadConf};

int NextTask()
{
    return rand() % 3; 
}
  • 发送任务
class ProcessPool
{
public:
    void SendTask(int index, int command)
    {
        cout << "Send task to " << _channels[index].Name() << ", pid: " << _channels[index].Pid() << endl;
        write(_channels[index].Wfd(), &command, sizeof(command));
    }
};
5. 子进程任务执行:通过 worker() 读取父进程指令
typedef void(*work_t)(int);// 函数指针类型

void worker(int fd)
{
    while (1)
    {
        int code = 0;
        ssize_t n = read(0, &code, sizeof(code));
        if (n == sizeof(code)) // read 成功,返回值为读取到内容的大小/字节个数
        {
            if (code >= 3) continue;
            tasks[code](fd, getpid());
        }
        else if (n == 0) // 父进程关闭写端后,继续读,read 返回 0
        {
            cout << "sub process id: " <<  getpid() << " is to quit ..." << endl;
            break;
        }
        sleep(1);
    }
}
6. 回收子进程

设计 KillAll() ,完成子进程和管道的回收 —— 遍历进程池中的 _channels ,关闭管道的写端,读端将管道中的数据读完后,会读到返回值 0,表示读结束。

#include <sys/wait.h>

class ProcessPool
{
public:
    void KillAll()
    {
        for (auto& channel : _channels)
        {
            pid_t pid = channel.Pid(); // 子进程(管道读端进程)的 pid
            close(channel.Wfd());
            
            pid_t rid = waitpid(pid, nullptr, 0);
            if (rid == pid) // wait 成功
            {
                cout << "wait sub process: " << pid << "success..." << endl;
            }
            cout << "close channel: " << channel.Name() << ", sub process is to quit.." << endl;
        }
    }
};
int main()
{
    // ... 
    
    // 3. 回收进程
    processpool_ptr->KillAll();

    delete processpool_ptr;
    return 0;
}

程序运行情况如下:

在这里插入图片描述

从图中可以观察到两点信息:1. 每个读端文件描述符都是 3; 2. task run done 后,子进程并没有退出。

原因是,

1. 当父进程关闭管道的读端后, 原先分配给读端的文件描述符(3 号文件描述符)就会被释放;再次调用 pipe() 创建新管道时,OS 会重新分配这个最小未使用的文件描述符(3 号文件描述符)给新创建的管道

2.子进程通过 fork() 创建时,它会继承父进程所有打开的文件描述符。回收进程调用 KillAll() 时,尽管关闭了父进程的写端,子进程仍持有对原管道写端的引用,使得读端无法按预期读到返回值 0 ,进而无法关闭子进程。

要解决 子进程持有对原管道写端的引用 的问题,我们需要定义一个 vector<int> —— 用于保存父进程对所有管道的写端,接着让子进程在执行分配任务之前关闭所有写端 —— 修改 CreatProcess 函数。

int CreatChannels(work_t work)
    {
        vector<int> fds;
        for (int i = 0; i < _sub_process_num; i++)
        {

            int pipefd[2];
            int n = pipe(pipefd);
            if (n < 0)
            {
                return PipeError;
            }
            fds.push_back(pipefd[1]); // 保存管道的写端

            pid_t id = fork();
            if (id == 0)
            {
                // child -> r
                // close(pipefd[1]); // 不再需要单独关闭对应管道的写端
                if (!fds.empty())
                {
                    for (auto& fd : fds)
                    {
                        close(fd);
                    }
                }

                // TODO
                dup2(pipefd[0], 0);
                work(pipefd[0]);

                // sleep(100);
                exit(0);
            }

            // father
            close(pipefd[0]);
            string cname = "channel--" + to_string(i);

            _channels.push_back(Channel(pipefd[1], id, cname));
        }

        return 0;
    }

在这里插入图片描述

子进程正常退出,程序正常结束…

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2154947.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot环境配置(Spring Boot Profile)

一、介绍 在Spring Boot中&#xff0c;spring.profiles 配置用于定义不同环境下的配置文件。这使得应用可以在不同的环境中使用不同的配置&#xff0c;比如开发环境、测试环境和生产环境等。这种方式可以避免在代码中硬编码配置信息&#xff0c;并且能够更灵活地管理应用的环境…

Linux之实战命令02:shred应用实例(三十六)

简介&#xff1a; CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布&#xff1a;《Android系统多媒体进阶实战》&#x1f680; 优质专栏&#xff1a; Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a; 多媒体系统工程师系列【…

12年计算机408考研-计算机网络

【题33】在TCP/IP体系结构中&#xff0c;直接为ICMP提供服务的协议是&#xff08;B&#xff09; A. PPP B.IP C.UDP D. TCP ICMP报文协议处与网际层&#xff0c;PPP协议处于数据链路层&#xff0c;TCP和UDP都是运输层协议。 他们都由IP直接提供服务。 【题…

算法题之每日温度

每日温度 给定一个整数数组 temperatures &#xff0c;表示每天的温度&#xff0c;返回一个数组 answer &#xff0c;其中 answer[i] 是指对于第 i 天&#xff0c;下一个更高温度出现在几天后。如果气温在这之后都不会升高&#xff0c;请在该位置用 0 来代替。 示例 1: 输入…

【数据仓库】数据仓库层次化设计

一、基本概念 **1. RDS&#xff08;RAW DATA STORES&#xff0c;原始数据存储&#xff09;** RDS作为原始数据存储层&#xff0c;用于存储来自各种源头的未经处理的数据。这些数据可能来自企业内部的业务系统、外部数据源或各种传感器等。RDS确保原始数据的完整性和可访问性&…

物理学基础精解【9】

文章目录 直线与二元一次方程两直线夹角直线方程斜率两点式方程截距式方程将不同形式的直线方程转换为截距方程直线的一般方程直线一般方程的系数有一个或两个为零的直线 参考文献 直线与二元一次方程 两直线夹角 两直线 y 1 k 1 x b 1 , y 2 k 2 x b 2 形成夹角 a 1 和 a…

MATLAB语言编写的EKF程序,带大量的中文注释

三维非线性状态量的EKF&#xff08;扩展卡尔曼滤波&#xff09;&#xff0c;几乎每一行都有中文注释&#xff0c;方便初学者上手。 文章目录 代码说明绘图 代码说明 状态变量&#xff1a;x 表示三维状态&#xff0c;包括位置和速度。 协方差矩阵&#xff1a;P 用来表示估计的…

构建数字化生态系统:打造数字化转型中开放协作平台的最佳实践和关键实施技巧

在数字化转型浪潮中&#xff0c;企业如何确保成功实施至关重要。除了技术上的革新&#xff0c;企业还必须在战略执行、架构优化以及合规性管理等方面掌握最佳实践。随着云计算、大数据、人工智能等新兴技术的迅速发展&#xff0c;企业通过正确的实施技巧不仅能提升业务效率&…

CentOS中使用DockerCompose方式部署带postgis的postgresql(附kartoza/docker-postgis镜像下载)

场景 CentOS中使用Docker部署带postgis的postgresql&#xff1a; CentOS中使用Docker部署带postgis的postgresql_centos postgis插件在容器中如何安装-CSDN博客 上面使用Docker搜索和拉取kartoza/postgis时并没有任何限制。 当下如果不能科学上网时&#xff0c;大部分镜像源…

react hooks--React.memo

基本语法 React.memo 高阶组件的使用场景说明&#xff1a; React 组件更新机制&#xff1a;只要父组件状态更新&#xff0c;子组件就会无条件的一起更新。 子组件 props 变化时更新过程&#xff1a;组件代码执行 -> JSX Diff&#xff08;配合虚拟 DOM&#xff09;-> 渲…

消息中间件---Kafka

一、什么是Kafka&#xff1f; Kafka是一个分布式流处理平台,类似于消息队列或企业消息传递系统&#xff1b; 流处理事什么呢&#xff1f; 流处理就是数据处理工作流&#xff0c;本质上是一种计算机编程范例。流处理是对接收到的新数据事件的连续处理。‌它涉及对从生产者到消…

Sublime text3怎么关闭提示更新

问题 sublime text 3有新版本后,会不停地在每次启动后弹窗提示更新版本 第一步 软件安装之前&#xff0c;切记是软件安装之前&#xff01;&#xff01;&#xff01;需要在hosts中添加以下内容(屏蔽官网联网检测)&#xff1a;hosts的位置一般在C:\Windows\System32\drivers\etc…

展锐平台手机camera 软硬件架构

曾经在紫光展锐做过几年的camera驱动&#xff0c;经历过从2013 年最初的几人团队&#xff0c;每人独当一面&#xff0c;负责很多的模块的粗放&#xff0c;到后面的逐步的精细化&#xff0c;设计部门按照内核驱动&#xff0c;hal驱动&#xff0c;tuning效果&#xff0c;3A&#…

华为HarmonyOS地图服务 11 - 如何在地图上增加点注释?

场景介绍 本章节将向您介绍如何在地图的指定位置添加点注释以标识位置、商家、建筑等&#xff0c;并可以通过信息窗口展示详细信息。 点注释支持功能&#xff1a; 支持设置图标、文字、碰撞规则等。支持添加点击事件。 PointAnnotation有默认风格&#xff0c;同时也支持自定…

Diffusion Model Stable Diffusion(笔记)

参考资料&#xff1a; 文章目录 DDPM架构模型如何拥有产生逼真图片的能力Denoise模型功能Denoise模型如何训练考虑进文字 文生图流程(Stable Diffusion) DDPM架构 模型如何拥有产生逼真图片的能力 Denoise模型功能 通过Denoise将一个噪音图一步步生成为目标图像 Denoise实际…

Java | Leetcode Java题解之第415题字符串相加

题目&#xff1a; 题解&#xff1a; class Solution {public String addStrings(String num1, String num2) {int i num1.length() - 1, j num2.length() - 1, add 0;StringBuffer ans new StringBuffer();while (i > 0 || j > 0 || add ! 0) {int x i > 0 ? n…

Linux文件IO(八)-文件共享

什么是文件共享&#xff1f;所谓文件共享指的是同一个文件&#xff08;譬如磁盘上的同一个文件&#xff0c;对应同一个 inode&#xff09;被多个独立的读写体同时进行 IO 操作。多个独立的读写体大家可以将其简单地理解为对应于同一个文件的多个不同的文件描述符&#xff0c;譬…

大数据新视界 --大数据大厂之 Node.js 与大数据交互:实现高效数据处理

&#x1f496;&#x1f496;&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎你们来到 青云交的博客&#xff01;能与你们在此邂逅&#xff0c;我满心欢喜&#xff0c;深感无比荣幸。在这个瞬息万变的时代&#xff0c;我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…

ClickHouse-Kafka Engine 正确的使用方式

Kafka 是大数据领域非常流行的一款分布式消息中间件&#xff0c;是实时计算中必不可少的一环&#xff0c;同时一款 OLAP 系统能否对接 Kafka 也算是考量是否具备流批一体的衡量指标之一。ClickHouse 的 Kafka 表引擎能够直接与 Kafka 系统对接&#xff0c;进而订阅 Kafka 中的 …

RDKit|分子可视化,定制你的分子图

1 使用 RDKit 绘制 2D 分子结构 在化学信息学中,直观地展示分子的 2D 结构图是非常重要的。RDKit 提供了强大的工具来绘制和定制分子的 2D 结构图,使得科学家和工程师可以轻松地可视化分子的构造。本节将介绍如何使用 RDKit 绘制 2D 分子结构,并展示一些常用的绘图方法和技…