Linux:使用匿名管道对进程池的模拟实现

news2025/1/14 18:21:29

目录

一、Makefile

二、processpool.cc

2.1创建通信管道和子进程

 2.2控制子进程

 2.3回收进程

三、task.hpp

 四、完整代码


接下来我们将模拟实现一个进程池,进程池广泛应用与各个领域和方向,比如我们打开电脑后同时打开很多个进程(也就是软件),此时就要操作系统就要对进程进行管理,并且让每个进程去运行各自所对应的功能,今天我们无法实现如此庞大的功能,但可以通过模拟来熟悉和了解进程池是什么有什么作用以及其底层的运行原理。博主将通过匿名管道的方式,经过层层封装、层层调用来一步步实现一个小型的迷你进程池。

一、Makefile

processpool:processpool.cc
	g++ -o $@ $^ -std=c++11 -g
.PHONY:clean
clean:
	rm -f processpool

二、processpool.cc

首先需要构建出我们整个进程池的主体框架结构和功能。我们生成.exe文件后通过./processpool来进行运行,此时我们可以通过在main函数中显示写命令行参数列表来进行传参从而控制创建进程的数量,argv中存放我们输入的命令,在输入时用空格作为分隔符可以通过./process num传参给main函数,num就是我们要创建的进程的个数,此时argc所对应的就是argv中元素的个数,我们判断符合要求后就去创建通信信道和子进程。

int main(int argc,char* argv[])
{
    if(argc!=2)
    {
        Usage(argv[0]);
        return UsageError;
    }
    int sub_process_num=stoi(argv[1]);
    if(sub_process_num<=0)
    return ArgError;
    srand((uint64_t)time(nullptr));

    //1.创建通信信道和子进程
    ProcessPool *processpool_ptr=new ProcessPool(sub_process_num);
    processpool_ptr->CreateProcess(worker);
    //2.控制子进程
    CtrlProcessPool(processpool_ptr,10);

    cout<<"task run done"<<endl;

    sleep(100);

    //3.回收进程

    delete processpool_ptr;

    return 0;
}

        管道由父进程创建并打开两个文件描述符,012都被占用的情况下,在第一次进行创建时pipe打开的fd是3和4,子进程继承父进程时会将文件描述符表也一并继承下来,父进程要写就必须关闭读端,子进程要读就需要关闭写端。

 

 可此时父进程在循环的进行创建管道和子进程, 而文件描述符fd的分配规则也是从前往后找有空位的地方依次往后进行分配,而刚刚父进程已经关闭了读端即fd为3所指向的文件,所以现在3号下标是空的,而父进程再次去创建下一个管道和进程时就会出现如下的情况:

pipe[0]即读端中的值依然是3 ,pipe[1]即写端中的值则是5,子进程依旧会继承父进程的两个读写端,然后依旧是父进程关闭读端,子进程关闭写端。

所以接下来继续创建就会反复出现子进程读端是3的情况。而父进程的写端则会依次4567创建下去。 同样的,子进程在继承父进程时会连同文件描述符表一起进行继承:同样,上一个管道的写端也会被新创建的子进程继承下来

此时的管道就不是真正意义上的单向管道了,随着父进程打开的文件描述符越来越多,就会导致后面创建的子进程就都会将之前打开的写端都继承下去。

而想要解决这种情况可以通过两种方式:

1、倒着关闭管道写端描述符,根据操作系统的规则,写端(父进程)没有被关闭时,读端(子进程)就会阻塞等待,而写端一旦被完全关闭,读端就会立马退出。而关闭写端时关闭的实际上时写端的引用计数,当引用计数为0时,写端才会被完全关闭。而之所以倒着关闭就是因为最后一个创建的channel的写端一定只有一个,此时倒着关闭写端的文件描述符后子进程就会退出,退出时连带着子进程的文件描述符表也会一并销毁,此时原本子进程中指向之前从父进程继承下来的其他管道的写端描述符的引用计数都会--,父进程依次倒着关闭写端,到第一个管道时,它就只剩下一个写端了。

2、在创建子进程时就关闭父进程曾经历史打开的写端描述符,这样每个子进程就只对应一个读端,每个管道也只对应一个写端。(博主用的是第二种方法)。

2.1创建通信管道和子进程

//1.创建通信信道和子进程
class ProcessPool
{
public:
    ProcessPool(int sub_process_num):_sub_process_num(sub_process_num)
    {}
    int CreateProcess(work_t work)//回调函数
    {
        vector<int> fds;
        for(int number=0;number<_sub_process_num;number++)
        {
            int pipefd[2]{0};
            int n=pipe(pipefd);
            if(n<0) return PipeError;

            pid_t id=fork();
            //子进程执行关闭写端,去读
            if(id==0)
            {
                //把父进程历史打开的写端关闭
                if(!fds.empty())
                {
                    cout<<"close w_fd:";
                    for(auto fd:fds) 
                    {
                        close(fd);
                        cout<<fd<<" ";
                    }
                    cout<<endl;
                }
                //child -> r
                close(pipefd[1]);
                //执行任务

                //为什么要dup2重定向:
                //原本从标准输入中读,变成从管道中读,
                //也就是此时当前子进程fd0中存放的就是当前管道的地址
                //之后通过访问标准输入就可以读到管道中的内容。
                dup2(pipefd[0],0);
                work(pipefd[0]);
                exit(0);
            }

            string cname="channel-"+to_string(number);
            //father 父进程执行
            close(pipefd[0]);
            channels.push_back(Channel(pipefd[1],id,cname));

            //让父进程此次创建的wfd保存,便于下次创建子进程继承时关闭之前的文件描述符
            fds.push_back(pipefd[1]);
        }
        return 0;
    }
    int NexChannel()
    {
        static int next=0;
        int c=next;
        next++;
        next%=channels.size();
        return c;
    }
    void SendTaskCode(int index,uint32_t code)
    {
        cout<<"send code: "<<code<<"to "<<channels[index].name()<<"sub process id: "<<channels[index].pid()<<endl;
        write(channels[index].wfd(),&code,sizeof(code));
    }

    //让所有子进程全部退出,只需要关闭所有的Channel w(写端)即可
    void KillAll()
    {
        for(auto &channel:channels)
        {
            channel.Close();
             pid_t pid=channel.pid();
            pid_t rid=waitpid(pid,nullptr,0);
            if(rid==pid)
            {
                cout<<"wait sub process: "<<pid<<" success..."<<endl;
            }
            cout<<channel.name()<<"close done"<<" sub process quit now: "<<channel.pid()<<endl;
            cout<<endl;
        }
    }
    void wait()
    {
        for(auto &channel:channels)
        {
            pid_t pid=channel.pid();
            pid_t rid=waitpid(pid,nullptr,0);
            if(rid==pid)
            {
                cout<<"wait sub process: "<<pid<<" success..."<<endl;
            }
        }
    }
    void Debug()
    {
        for(auto &channel:channels)
        {
            channel.PrintDebug();
        }
    }
    ~ProcessPool()
    {
    }
private:
    int _sub_process_num;
    vector<Channel> channels;
};

 2.2控制子进程

void CtrlProcessPool(ProcessPool* processpool_ptr,int cnt)
{
    while(cnt)
    {
        //a.选择一个进程和通道
        int channel=processpool_ptr->NexChannel();
        //b.选择一个任务
        uint32_t code=NextTask();
        //c.发送任务
        processpool_ptr->SendTaskCode(channel,code);

        sleep(1);
        cnt--;
    }
}

 2.3回收进程

 //让所有子进程全部退出,只需要关闭所有的Channel w(写端)即可
    void KillAll()
    {
        for(auto &channel:channels)
        {
            channel.Close();
            pid_t pid=channel.pid();
            pid_t rid=waitpid(pid,nullptr,0);
            if(rid==pid)
            {
                cout<<"wait sub process: "<<pid<<" success..."<<endl;
            }
            cout<<channel.name()<<"close done"<<" sub process quit now: "<<channel.pid()<<endl;
            cout<<endl;
        }
    }

三、task.hpp

在task.hpp中我们可以模拟一些我们需要执行的进程任务并对其进行封装处理,父进程负责通过向管道write向子进程发送需要执行的具体任务信息,子进程则通过read读到需要用到的进程和其需要执行的具体任务,前面的代码中我们已经把创建的子进程所对应的管道的读端重定向到0,所以worker在读取任务时直接从0中读取即可。

#pragma once
#include <iostream>
#include <unistd.h>

using namespace std;

typedef void(*work_t)(int);//函数指针类型
typedef void(*task_t)(int,pid_t);//函数指针类型

void PrintLog(int fd,pid_t pid)
{
    cout<<"sub process:"<<pid<<",fd: "<<fd<<",task is:printf log task"<<endl<<endl;
}

void ReloadConf(int fd,pid_t pid)
{
    cout<<"sub process:"<<pid<<",fd: "<<fd<<",task is:reload conf task"<<endl<<endl;
}

void ConnectMysql(int fd,pid_t pid)
{
    cout<<"sub process:"<<pid<<",fd: "<<fd<<",task is:connect mysql task"<<endl<<endl;
}

task_t task[3]={PrintLog,ReloadConf,ConnectMysql};

uint32_t NextTask()
{
    return rand()%3;
}

void worker(int fd)
{
    //从0中读取任务
    while(true)
    {
        uint32_t command_code=0;
        ssize_t n=read(0,&command_code,sizeof(command_code));
        if(n==sizeof(command_code))
        {
            if(command_code>=3) continue;
            task[command_code](fd,getpid());
        }
        else if(n==0)
        {
            cout<<"sub process: "<<getpid()<<" quit now.."<<endl;
            break;
        }
    }
}

 父进程可以随机的向特定管道写任务码,然后子进程再从各自相连接的管道中读取任务码,没有任务码时子进程就处于阻塞状态,有任务时就去读取并执行worker去执行我们提前构建好的任务列表中的特定任务。

 四、完整代码

#include <iostream>
#include <string>
#include <vector>
#include <cstdlib>
#include <unistd.h>
#include <ctime>
#include <sys/wait.h>
#include "task.hpp"
using namespace std;
enum
{
    UsageError=1,
    ArgError,
    PipeError
};

void Usage(const string &proc)
{
    cout<<"Usage:"<<proc<<"subprocess-num"<<endl;
}


class Channel//保存管道的信息和写端fd
{
public:
    Channel(int wfd,pid_t sub_id,const string &name)
    :_wfd(wfd),_sub_process_id(sub_id),_name(name)
    {}
    void PrintDebug()
    {
        cout<<"_wfd:"<<_wfd;
        cout<<",_sub_process_id:"<<_sub_process_id;
        cout<<",_name:"<<_name<<endl;
    }
    string name(){return _name;}
    int wfd(){return _wfd;}
    pid_t pid(){return _sub_process_id;}
    void Close(){ close(_wfd); }
    ~Channel()
    {}
private:
    int _wfd;
    pid_t _sub_process_id;
    string _name;
};
//1.创建通信信道和子进程
class ProcessPool
{
public:
    ProcessPool(int sub_process_num):_sub_process_num(sub_process_num)
    {}
    int CreateProcess(work_t work)//回调函数
    {
        vector<int> fds;
        for(int number=0;number<_sub_process_num;number++)
        {
            int pipefd[2]{0};
            int n=pipe(pipefd);
            if(n<0) return PipeError;

            pid_t id=fork();
            //子进程执行关闭写端,去读
            if(id==0)
            {
                //把父进程历史打开的写端关闭
                if(!fds.empty())
                {
                    cout<<"close w_fd:";
                    for(auto fd:fds) 
                    {
                        close(fd);
                        cout<<fd<<" ";
                    }
                    cout<<endl;
                }
                //child -> r
                close(pipefd[1]);
                //执行任务

                //为什么要dup2重定向:
                //原本从标准输入中读,变成从管道中读,
                //也就是此时当前子进程fd0中存放的就是当前管道的地址
                //之后通过访问标准输入就可以读到管道中的内容。
                dup2(pipefd[0],0);
                work(pipefd[0]);
                exit(0);
            }

            string cname="channel-"+to_string(number);
            //father 父进程执行
            close(pipefd[0]);
            channels.push_back(Channel(pipefd[1],id,cname));

            //让父进程此次创建的wfd保存,便于下次创建子进程继承时关闭之前的文件描述符
            fds.push_back(pipefd[1]);
        }
        return 0;
    }
    int NexChannel()
    {
        static int next=0;
        int c=next;
        next++;
        next%=channels.size();
        return c;
    }
    void SendTaskCode(int index,uint32_t code)
    {
        cout<<"send code: "<<code<<"to "<<channels[index].name()<<"sub process id: "<<channels[index].pid()<<endl;
        write(channels[index].wfd(),&code,sizeof(code));
    }

    //让所有子进程全部退出,只需要关闭所有的Channel w(写端)即可
    void KillAll()
    {
        for(auto &channel:channels)
        {
            channel.Close();
            pid_t pid=channel.pid();
            pid_t rid=waitpid(pid,nullptr,0);
            if(rid==pid)
            {
                cout<<"wait sub process: "<<pid<<" success..."<<endl;
            }
            cout<<channel.name()<<"close done"<<" sub process quit now: "<<channel.pid()<<endl;
            cout<<endl;
        }
    }
    void wait()
    {
        for(auto &channel:channels)
        {
            pid_t pid=channel.pid();
            pid_t rid=waitpid(pid,nullptr,0);
            if(rid==pid)
            {
                cout<<"wait sub process: "<<pid<<" success..."<<endl;
            }
        }
    }
    void Debug()
    {
        for(auto &channel:channels)
        {
            channel.PrintDebug();
        }
    }
    ~ProcessPool()
    {
    }
private:
    int _sub_process_num;
    vector<Channel> channels;
};
void CtrlProcessPool(ProcessPool* processpool_ptr,int cnt)
{
    while(cnt)
    {
        //a.选择一个进程和通道
        int channel=processpool_ptr->NexChannel();
        //b.选择一个任务
        uint32_t code=NextTask();
        //c.发送任务
        processpool_ptr->SendTaskCode(channel,code);

        sleep(1);
        cnt--;
    }
}
int main(int argc,char* argv[])
{
    if(argc!=2)
    {
        Usage(argv[0]);
        return UsageError;
    }
    int sub_process_num=stoi(argv[1]);
    if(sub_process_num<=0)
    return ArgError;
    srand((uint64_t)time(nullptr));

    //1.创建通信信道和子进程
    ProcessPool *processpool_ptr=new ProcessPool(sub_process_num);
    processpool_ptr->CreateProcess(worker);
    //2.控制子进程
    CtrlProcessPool(processpool_ptr,10);

    cout<<"task run done"<<endl;

    //3.回收进程
    processpool_ptr->KillAll();

    //防止僵尸出现,需要父进程对其进行资源回收
    //processpool_ptr->wait();

    
    delete processpool_ptr;

    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1637330.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python公务用车医院校园企业车辆管理系统

本 Python版本&#xff1a;python3.7 前端&#xff1a;vue.jselementui 框架&#xff1a;django/flask都有,都支持 后端&#xff1a;python 数据库&#xff1a;mysql 数据库工具&#xff1a;Navicat 开发软件&#xff1a;PyCharm 公务用车管理智慧云服务监管平台有管理员和用户…

托普利兹矩阵(T矩阵)及其应用(Matlab demo测试)

托普利兹矩阵&#xff08;T矩阵&#xff09;及其应用&#xff08;Matlab demo测试&#xff09; 1. 概念2. Matlab简单测试2.1 生成测试2.2 基本性质及原理2.3 性质验证 3. 其他应用总结3.1 其他性质3.2 文献阅读看到的 参考资料 1. 概念 托普利兹矩阵&#xff0c;简称为T型矩阵…

O2OA开发平台前端源码级二次开发(Vue3,React)

在使用O2OA进行项目定制化开发时&#xff0c;我们可以开发新的前端组件&#xff08;x_component&#xff09;以扩展O2OA来实现更多的业务。这种新增前端组件或者前端业务的开发通常会配合后端自定义应用实现的服务来完成系统内数据的交互。在当系统默认的界面不符合系统UI/UE设…

Sentinel 控制台学习

引言 上篇文章已经讲过 SpringCloud Sentinel集成到微服务项目中&#xff0c;接下来我们继续学习怎么使用sentinel控制台对微服务进行限流&#xff0c;熔断&#xff0c;降级等一系列操作。 控制台 接下来我们单独讲解每一个菜单按钮 实时监控 实时监控&#xff1a; 可以看到…

Leetcode 145:二叉树的后序遍历(迭代法)

给你一棵二叉树的根节点 root &#xff0c;返回其节点值的 后序遍历 。 思路&#xff1a; 迭代法的思路是&#xff0c;使用栈&#xff0c;一层一层的将树节点遍历入栈。 比如下面这个树&#xff0c;使用迭代法&#xff0c;1&#xff09;第一层&#xff0c;让根节点入栈。2&a…

2024深圳杯数学建模竞赛A题(东三省数学建模竞赛A题):建立火箭残骸音爆多源定位模型

更新完整代码和成品完整论文 《2024深圳杯&东三省数学建模思路代码成品论文》↓↓↓&#xff08;浏览器打开&#xff09; https://www.yuque.com/u42168770/qv6z0d/zx70edxvbv7rheu7?singleDoc# 2024深圳杯数学建模竞赛A题&#xff08;东三省数学建模竞赛A题&#xff0…

2024五一杯数学建模A题思路分析-钢板最优切割路径问题

文章目录 1 赛题选题分析 2 解题思路3 最新思路更新 1 赛题 A题 钢板最优切割路径问题 提高钢板下料切割过程中的工作效率&#xff0c;是模具加工企业降低成本和增加经济效益的重要途径&#xff0c;其中钢板切割的路径规划是钢板切割过程的一个关键环节。 钢板切割就是使用特殊…

附录6-1 黑马优购项目-组件与过滤器

目录 1 过滤器-格式化价格 2 组件-搜索框 3 组件-数量框 4 组件-商品概况 4.1 格式化价格 4.2 选择性使用勾选框和数量框 4.3 源码 1 过滤器-格式化价格 这个项目中仅用到格式化价格这一种过滤器。过滤器文件位置为store/filter.wxs 文件内容是这样的&#xf…

【嵌入式DIY实例】-植物自动浇水机

DIY自动植物浇水机 文章目录 DIY自动植物浇水机1、硬件准备与接线2、代码实现智能灌溉系统是一种先进的、新技术的灌溉技术,可以减少人力、时间等。在本文中,将介绍如何实现一个植物自动浇水机。通过这个项目,可以给我们身边的一些植物,所有花盆都安装这一系统。这个系统由…

【JavaEE初阶系列】——理解tomcat 带你实现最简单的Servlet的hello world程序(七大步骤)

目录 &#x1f6a9;认识Tomcat &#x1f6a9;运用Tomcat &#x1f6a9;Servlet &#x1f393;完成简单的Servlet的hello world程序 &#x1f388;创建项目Maven &#x1f388;引入依赖 &#x1f388;创建目录 &#x1f388;编写代码 &#x1f388;打包程序 &#x1…

关于用户体验和设计思维

介绍 要开发有效的原型并为用户提供出色的体验&#xff0c;了解用户体验 (UX) 和设计思维的原则至关重要。 用户体验是用户与产品、服务或系统交互并获得相应体验的过程。 设计思维是一种解决问题的方法&#xff0c;侧重于创新和创造。 在启动期实现用户体验和设计思维时&#…

头歌:SparkSQL简单使用

第1关&#xff1a;SparkSQL初识 任务描述 本关任务&#xff1a;编写一个sparksql基础程序。 相关知识 为了完成本关任务&#xff0c;你需要掌握&#xff1a;1. 什么是SparkSQL 2. 什么是SparkSession。 什么是SparkSQL Spark SQL是用来操作结构化和半结构化数据的接口。…

Tuxera NTFS使用教程 轻松实现磁盘格式转换的教程分享 ntfsMac软件怎么用

NTFS for Mac是Mac电脑里非常重要的工具之一&#xff0c;因为它太实用了&#xff0c;解决了NTFS移动硬盘在Mac上的写入问题。但是&#xff0c;小伙伴在安装完软件之后&#xff0c;通常再也不会关注它&#xff0c;甚至时间长了&#xff0c;也就忘了Mac里还有这么一个软件。 在Tu…

GB32960解析工具

几年前搞了一个用Qt开发的国标32960报文解析工具。分享给大家&#xff0c;只用1积分便可以下载。 国标32960新能源车协议解析工具资源-CSDN文库

(附源码)超级简单的SSM图书交易系统,包含技术栈、架构图、设计图、教程

先看首页效果&#xff0c;包含买家、卖家、管理员三个端口。启动有问题可以联系我解决&#xff0c;微信&#xff1a;keepgoing4u 架构图&#xff1a; 用到软件 Jdk1.8 Mysql IntelliJ IDEA Maven 项目技术&#xff1a; Spring Boot SSM JSP mybatis Maven B/S模式 配置…

云服务器的主要用途有哪些,使用云服务器具有哪些方面的优势

随着科技的飞速发展&#xff0c;云计算已经成为现代企业和个人用户不可或缺的技术支持&#xff0c;云计算技术已经逐渐渗透到我们生活的方方面面。云服务器作为云计算的核心组成部分&#xff0c;正在逐步改变我们的数据存储和处理方式&#xff0c;成为各类互联网用户实现综合业…

前端vite+rollup前端监控初始化——封装基础fmp消耗时间的npm包并且发布npm beta版本

文章目录 ⭐前言&#x1f496;vue3系列文章 ⭐初始化npm项目&#x1f496;type为module&#x1f496;rollup.config.js ⭐封装fmp耗时计算的class&#x1f496;npm build打包class对象 ⭐发布npm的beta版本&#x1f496; npm发布beta版本 ⭐安装web-performance-tool的beta版本…

2024年第二十一届 五一杯 (B题)大学生数学建模挑战赛 | 最大流问题,深度学习分析 | 数学建模完整代码解析

DeepVisionary 每日深度学习前沿科技推送&顶会论文&数学建模与科技信息前沿资讯分享&#xff0c;与你一起了解前沿科技知识&#xff01; 本次DeepVisionary带来的是五一杯的详细解读&#xff1a; 完整内容可以在文章末尾全文免费领取&阅读&#xff01; 第一个问题…

算法效率的判断及一些典型例题的讲解

一.算法效率 1.用处&#xff1a;判断算法的好坏&#xff0c;好的算法应该是高效的 2算法效率取决于时间复杂度和空间复杂度 <1>时间复杂度 1.1概念&#xff1a;算法中基本操作的执行次数就是算法的时间复杂度 1.2表示&#xff1a;大O的渐进表示法&#xff0c;例如O(N)…

什么是场内期权,场内期权是如何操作的?

今天期权懂带你了解什么是场内期权,场内期权是如何操作的&#xff1f;场内期权是标准化、规范化且在公开市场交易的金融衍生品。相比场外期权&#xff0c;场内期权具有更高的流动性和透明度。 什么是场内期权&#xff1f; 场内期权&#xff0c;也称为交易所期权&#xff0c;是…