【Linux】匿名管道通信场景——进程池

news2024/12/27 8:48:09
🔥 个人主页:大耳朵土土垚
🔥 所属专栏:Linux系统编程

这里将会不定期更新有关Linux的内容,欢迎大家点赞,收藏,评论🥳🥳🎉🎉🎉

文章目录

  • 1. 初始化进程池
  • 2. 进程池执行任务
    • 2.1 任务管理
    • 2.2 执行任务
  • 3. 清理进程池
  • 4. 封装与完整实现
  • 5. 结语

1. 初始化进程池

  进程池的实现是依靠匿名管道,通过进程间通信使得父进程能够管理多个进程任务,相当于父进程拥有了很多个进程——进程池,通过不同的进程完成指定的任务。
  所以我们需要创建多个匿名管道和子进程,进行进程间通信,发送信息给子进程让它们根据接收到的信息处理相关任务。
  因为有多个管道和子进程,为了方便父进程使用不同管道发送对应信息给子进程,我们需要将管道的文件描述符以及对应子进程的pid保存起来,我们选择将它们封装在一个Channel类中。又因为有多个匿名管道和子进程,所以将多个Channel类对象储存在C++STL中的容器vector中来方便父进程进行管理进程池。

代码如下:

int InitProcesspool(int num,std::vector<Channel>& channels)
{
    for(int i = 0; i < num; i++)//使用循环创建多个匿名管道和子进程
    {
        //1.创建匿名管道
        int pipefd[2] = {0};
        int n = pipe(pipefd);
        if(n < 0) return 2;//根据不同的返回值判断原因,也可以使用枚举来约定返回值代表的内容

        //2.创建子进程
        pid_t id = fork();
        if(id < 0) return 3;

        //3.建立通信管道,父子进程关闭读端或写端
        if(id == 0)//子进程
        {
            //子进程读取,关闭写端
            ::close(pipefd[1]);
            //dup2
            dup2(pipefd[0],0);
            //子进程需要执行的内容
            Work();
            ::exit(0);
        }

        //父进程
        //父进程写入,关闭读端
        ::close(pipefd[0]);
        channels.emplace_back(pipefd[1],id);//保存在channel对象中并存入vector
    }
    return 0;
}

对子进程内部,我们使用dup2系统调用将匿名管道读端文件描述符与标准输入stdin交换,这样我们就不需要保存不同进程对应匿名管道的读端文件描述符,只需要统一从0号文件描述符中读取内容即可。

对于Channel类

class Channel{
public:
    Channel(int fd,pid_t who):_fd(fd),_who(who)
    {
        _name = "Channel-"+std::to_string(fd)+"-"+std::to_string(who);
    }

    std::string GetName()
    {
        return _name;
    }
    int GetFd()
    {
        return _fd;
    }
    pid_t GetWho()
    {
        return _who;
    }
    void Send(int num)//父进程往匿名管道发送信息
    {
        ::write(_fd,&num,sizeof(num));
    }
    ~Channel()
    {

    }
private:
    int _fd;//保存匿名管道通信的文件描述符
    std::string _name;//名字(自己取的)
    pid_t _who;//子进程pid
};

对于父进程发送给子进程的信息我们选择约定一个数字对应一个任务,不同数字对应不同需要完成的任务,子进程接收到信息后就可以根据数字来确定不同的任务。

2. 进程池执行任务

2.1 任务管理

  执行任务之前我们需要先确定有哪些任务,怎么执行…所以我们需要进行任务管理,同样我们也是使用一个类TaskManager来进行任务管理:

#include<iostream>
#include<unordered_map>
#include<functional>
#include<ctime>

using task_t = std::function<void()>;//函数指针


//不同任务函数
    void Load()
    {
        std::cout<<"正在执行加载任务..."<<std::endl;
    }
    void Del()
    {
        std::cout<<"正在执行删除任务..."<<std::endl;
    }
    void Log()
    {
        std::cout<<"正在执行日志任务..."<<std::endl;
    }

static int number = 0;
class TaskManager
{
public:
    TaskManager()
    {
        srand(time(nullptr));
        InsertTask(Load);
        InsertTask(Del);
        InsertTask(Log);
    }
    int SelectTask()
    {
        return rand()%number;
    }
    void InsertTask(task_t t)
    {
        m[number++] = t;
    }
    void Excute(int num)
    {
        if(m.find(num) == m.end())
            return;
        m[num]();//执行任务
    }
    ~TaskManager()
    {
    }
private:
    std::unordered_map<int,task_t> m;//使用map封装数字与对应的任务函数指针
};

TaskManager tm;

  选择新创建一个源文件Task.hpp来封装上述内容,上述任务管理类中我们使用map来保存数字与任务函数指针的相关关系,这样通过数字就可以确定是哪一个任务函数;此外选择任务使用的方法是随机数的方法,大家可以根据自己的想法确定不同的方式。

2.2 执行任务

  • 发送任务

使用按顺序轮询的方式派发任务给不同的子进程——设置10次任务循环,先通过任务管理类中的选择函数获取任务编号,然后通过父进程进程池管理类将任务编号发送给子进程。

void ExcuteTask(std::vector<Channel>& channels)
{
    int n = 0;
    int count = 10;
    while(count--)//执行10次任务
    {
        //1.选择任务,获取任务编号
        int tasknum = tm.SelectTask();

        //2.选择子进程,使用轮询选择,派发任务
        channels[n++].Send(tasknum);
        n%=channels.size();

        std::cout<<std::endl;
        std::cout<<"*****成功发送"<<10-count<<"个任务*****"<<std::endl;
        
        sleep(2);//每个2s发送一个任务

    }
}

  • 接受并执行任务

子进程接受并执行任务——先通过匿名管道接受父进程发送的任务编号,然后通过任务管理类对象执行任务编号所对应的任务函数。

//子进程接受并执行任务
void Work()
{
    while(true)
    {
        int num = 0;
        int n = ::read(0,&num,sizeof(num));
        if(n == sizeof(num))//读取成功
            tm.Excute(num);//不要发成n
        else if(n == 0)
        {
            break;
        }
        else
        {
            break;
        }
    }
}

3. 清理进程池

我们需要回收匿名管道的文件描述符和子进程

void CleanProcesspool(std::vector<Channel>& channels)
{
    for(auto& c : channels)
        ::close(c.GetFd());
    for(auto& c : channels)
    {
        pid_t rid = ::waitpid(c.GetWho(),nullptr,0);
        if(rid <= 0)
        {
            std::cout<<std::endl;
            std::cout<<"清理子进程失败..."<<std::endl;
            return;
        }
    }

    std::cout<<std::endl;
    std::cout<<"成功清理子进程..."<<std::endl;
}

  注意这里不能使用一个循环来进行清理,如下面代码是错误的:

void CleanProcesspool(std::vector<Channel>& channels)
{
    for(auto& c : channels)//只使用一次循环
    {
    	::close(c.GetFd());
        pid_t rid = ::waitpid(c.GetWho(),nullptr,0);
        if(rid <= 0)
        {
            std::cout<<std::endl;
            std::cout<<"清理子进程失败..."<<std::endl;
            return;
        }
    }

    std::cout<<std::endl;
    std::cout<<"成功清理子进程..."<<std::endl;
}

这是因为在创建子进程时,子进程会继承父进程的文件描述符表,因此在第一个匿名管道创建后,例如父进程的4号文件描述符指向该匿名管道写端,那么在创建第二个子进程时,该子进程会继承4号文件描述符也指向第一个匿名管道写端,因此创建的子进程越多,前面匿名管道写端被指向的就越多,所以仅仅关闭一个进程的写端指向,还有其他的写端指向,所以读端无法读到0,也就无法退出,如下图所示:
在这里插入图片描述
在这里插入图片描述

当创建2个子进程时,第一个匿名管道写端就有两个进程指向,当创建的进程越多,该写端指向的也就越多。

如果要使用一个循环来清理回收子进程,我们可以从后往前关闭文件描述符,因为最后一个管道写端只有父进程指向。

4. 封装与完整实现

  对于父进程管理进程池我们可以封装一个类来更好的进行管理与实现:

#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>
#include <sys/types.h>
#include <stdlib.h>
#include <sys/wait.h>

#include "Task.hpp"
#include "Channel.hpp"
// master


class ProcessPool
{
public:
    int InitProcesspool(int num)
    {
        for (int i = 0; i < num; i++)
        {
            // 1.创建匿名管道
            int pipefd[2] = {0};
            int n = pipe(pipefd);
            if (n < 0)
                return 2;

            // 2.创建子进程
            pid_t id = fork();
            if (id < 0)
                return 3;

            // 3.建立通信管道,父子进程关闭读端或写端
            if (id == 0) // 子进程
            {
                // 子进程读取,关闭写端
                ::close(pipefd[1]);
                // dup2
                dup2(pipefd[0], 0);
                Work();
                ::exit(0);
            }

            // 父进程
            // 父进程写入,关闭读端
            ::close(pipefd[0]);
            channels.emplace_back(pipefd[1], id);
        }
        return 0;
    }
    void ExcuteTask()
    {
        int n = 0;
        int count = 10;
        while (count--) // 执行10次任务
        {
            // 1.选择任务,获取任务编号
            int tasknum = tm.SelectTask();

            // 2.选择子进程,使用轮询选择,派发任务
            channels[n++].Send(tasknum);
            n %= channels.size();

            std::cout << std::endl;
            // std::cout<<"**************************"<<std::endl;
            std::cout << "*****成功发送" << 10 - count << "个任务*****" << std::endl;
            // std::cout<<"**************************"<<std::endl;
            // std::cout<<std::endl;

            sleep(3);
        }
    }

    void CleanProcesspool()
    {
        for (auto &c : channels)
            ::close(c.GetFd());

        for (auto &c : channels)
        {
            pid_t rid = ::waitpid(c.GetWho(), nullptr, 0);
            if (rid <= 0)
            {
                std::cout << std::endl;
                std::cout << "清理子进程失败..." << std::endl;
                return;
            }
        }

        std::cout << std::endl;
        std::cout << "成功清理子进程..." << std::endl;
    }

private:
    std::vector<Channel> channels;
};

main函数:

#include "ProcessPool.hpp"

int main(int argc, char* argv[])
{
    //0.获取应该创建管道个数num个
    if(argc!=2)
    {
        std::cout<<"请输入管道个数."<<std::endl;
        return 1;
    }
    int num = std::stoi(argv[1]);
    std::vector<Channel> channels;
    
    ProcessPool* pp = new ProcessPool;

    //1.初始化进程池——创建进程池
    pp->InitProcesspool(num);
    
    //2.执行任务
    pp->ExcuteTask();

    //3.任务执行完成,回收子进程
    pp->CleanProcesspool();

    delete pp;
    
    return 0;
}

运行结果如下:

在这里插入图片描述

5. 结语

  以上就是基于匿名管道通信实现的进程池,子进程会根据接受到的信息执行不同的任务,父进程可以看作Master来进行管理创建的多个进程。关键点在于对进程管理的封装以及回收子进程时会有多个进程指向匿名管道的读端,所以回收时要注意可能会出现的bug。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2251630.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C#基础之集合讲解

文章目录 1 集合1.1 数组1.1.1 简介1.1.2 声明使用1.1.2.1 声明 & 初始化1.1.2.2 赋值给数组1.1.2.3 访问数组元素 1.1.3 多维数组1.1.3.1 声明1.1.3.2 初始化二维数组1.1.3.3 访问二维数组元素 1.1.4 交错数组1.1.5 传递数组给函数1.1.6 Array1.1.6.1 简介1.1.6.2 属性1.1…

Azure DevOps Server:使用甘特图Gantt展示需求进度

自从Azure DevOps Server取消与Project Server的集成后&#xff0c;许多用户都在关注如何使用甘特图来展示项目进度。 在Azure DevOps Server开放扩展Extension功能后&#xff0c;许多开发者或专业开发团队做了很多甘特图Gantt相关的开发工作&#xff0c;使用比较多的是(GANTT …

数据湖的概念(包含数据中台、数据湖、数据仓库、数据集市的区别)--了解数据湖,这一篇就够了

文章目录 一、数据湖概念1、企业对数据的困扰2、什么是数据湖3、数据中台、数据湖、数据仓库、数据集市的区别 网上看了好多有关数据湖的帖子&#xff0c;还有数据中台、数据湖、数据仓库、数据集市的区别的帖子&#xff0c;发现帖子写的都很多&#xff0c;而且专业名词很多&am…

Kali Linux怎么开python虚拟环境

相信很多朋友再学习的过程中都会遇到一些pip失效&#xff0c;或者报错的时候&#xff0c;他们要求我们要使用虚拟环境&#xff0c;但是不知道怎么搭建&#xff0c;下面这篇文章就来告诉你如何搭建虚拟环境&#xff0c;这个方法在所有Linux的服务器都通用&#xff0c;就两行命令…

Flink四大基石之State(状态) 的使用详解

目录 一、有状态计算与无状态计算 &#xff08;一&#xff09;概念差异 &#xff08;二&#xff09;应用场景 二、有状态计算中的状态分类 &#xff08;一&#xff09;托管状态&#xff08;Managed State&#xff09;与原生状态&#xff08;Raw State&#xff09; 两者的…

【数据结构计数排序】计数排序

非比较排序概念 非比较排序是一种排序算法&#xff0c;它不是通过比较元素大小进行排序的&#xff0c;而是基于元素的特征和属性排序。这种排序方法在特定情况下&#xff0c;可以做到比元素比较排序&#xff08;快排&#xff0c;归并&#xff09;更有效率。尤其是在处理大量数…

Java GET请求 请求参数在Body中使用Json格式传参

业务需要调个三方接口 使用GET请求方式 但是&#xff01;请求参数不在Query中&#xff0c;竟然在Body中&#xff0c;使用Json格式传参 在API调试工具里面可以调通 在java代码里&#xff0c;死活调不通 网上搜了搜&#xff0c;找到一个靠谱的&#xff0c;记录一下 import o…

Linux的文件系统

这里写目录标题 一.文件系统的基本组成索引节点目录项文件数据的存储扇区三个存储区域 二.虚拟文件系统文件系统分类进程文件表读写过程 三.文件的存储连续空间存放方式缺点 非连续空间存放方式链表方式隐式链表缺点显示链接 索引数据库缺陷索引的方式优点&#xff1a;多级索引…

[golang][MAC]Go环境搭建+VsCode配置

一、go环境搭建 1.1 安装SDK 1、下载go官方SDK 官方&#xff1a;go 官方地址 中文&#xff1a;go 中文社区 根据你的设备下载对应的安装包&#xff1a; 2、打开压缩包&#xff0c;根据引导一路下一步安装。 3、检测安装是否完成打开终端&#xff0c;输入&#xff1a; go ve…

从繁琐到高效:智能生成PPT的神秘力量

在这个技术爆炸的时代&#xff0c;一场精彩的演讲离不开一份出色的PPT。但制作PPT&#xff0c;就像是一场与时间的博弈&#xff0c;费尽心思构思版式、精炼文案、选择配图&#xff0c;稍不留神&#xff0c;就会被拖入无底深渊。可是你知道吗&#xff1f;现在只需动动手指&#…

二分法篇——于上下边界的扭转压缩间,窥见正解辉映之光(1)

前言 二分法&#xff0c;这一看似简单却又充满哲理的算法&#xff0c;犹如一道精巧的数学之门&#xff0c;带领我们在问题的迷雾中找到清晰的道路。它的名字虽简单&#xff0c;却深藏着智慧的光辉。在科学的浩瀚星空中&#xff0c;二分法如一颗璀璨的星辰&#xff0c;指引着我们…

【软件各类应用解决方案】ERP企业资源管理系统整体解决方案,采购管理方案,仓库管理方案,财务管理方案,人力管理方案,资产管理方案,对账管理(word完整版)

目录 第一部分 概述 第二部分 方案介绍 第三部分 系统业务流程 3.1 关键需求概括分析 3.1.1 销售管理方面 3.1.2 采购管理方面 3.1.3 仓库管理方面 3.1.4 财务管理方面 3.1.5 人力资源方面 3.2 关键需求具体分析 3.2.1 财务管理 3.2.1.1会计凭证解决 3.2.1.2钞…

实验二 选择结构程序设计

实验名称 实验二 选择结构程序设计 实验目的 &#xff08;1&#xff09;掌握关系运算符和逻辑运算符的使用方法&#xff0c;能够表达复杂的逻辑条件。 &#xff08;2&#xff09;掌握if语句的使用方法&#xff0c;掌握多重条件下的if语句嵌套使用。 &#xff08;3&#xff09;…

第33周:运动鞋识别(Tensorflow实战第五周)

目录 前言 一、前期工作 1.1 设置GPU 1.2 导入数据 1.3 查看数据 二、数据预处理 2.1 加载数据 2.2 可视化数据 2.3 再次检查数据 2.4 配置数据集 2.4.1 基本概念介绍 2.4.2 代码完成 三、构建CNN网络 四、训练模型 4.1 设置动态学习率 4.2 早停与保存最佳模型…

Robot Screw Theory (Product of Exponentials)机器人螺旋理论(指数积)

Screw Theory 螺旋理论 Screw theory uses the fact that every rigid body transformation can be expressed by a rotational and translational movement.螺旋理论利用了每个刚体变换都可以通过旋转和平移运动来表示这一事实。 Robert Ball developed the theory in 19th ce…

【maven-5】Maven 项目构建的生命周期:深入理解与应用

1. 生命周期是什么 ​在Maven出现之前&#xff0c;项目构建的生命周期就已经存在&#xff0c;软件开发人员每天都在对项目进行清理&#xff0c;编译&#xff0c;测试及部署。虽然大家都在不停地做构建工作&#xff0c;但公司和公司间&#xff0c;项目和项目间&#xff0c;往往…

skywalking 配置elasticsearch持久化

下载和启动elasticsearch elasticsearch-7.17.25-linux-x86_64.tar.gz&#xff0c;解密文件tar -xvf elasticsearch-7.17.25-linux-x86_64.tar.gz 进入到bin目录&#xff0c;启动 elasticsearch -d 后台运行 下载skywalking服务包 apache-skywalking-apm-9.3.0.tar.gz&#x…

MySQL 复合查询

实际开发中往往数据来自不同的表&#xff0c;所以需要多表查询。本节我们用一个简单的公司管理系统&#xff0c;有三张表EMP,DEPT,SALGRADE 来演示如何进行多表查询。表结构的代码以及插入的数据如下&#xff1a; DROP database IF EXISTS scott; CREATE database IF NOT EXIST…

数据集-目标检测系列- 海边漫步锻炼人检测数据集 person >> DataBall

数据集-目标检测系列- 海边漫步锻炼人检测数据集 person >> DataBall DataBall 助力快速掌握数据集的信息和使用方式&#xff0c;会员享有 百种数据集&#xff0c;持续增加中。 需要更多数据资源和技术解决方案&#xff0c;知识星球&#xff1a; “DataBall - X 数据球…

【汇编语言】call 和 ret 指令(三) —— 深度解析汇编语言中的批量数据传递与寄存器冲突

文章目录 前言1. 批量数据的传递1.1 存在的问题1.2 如何解决这个问题1.3 示例演示1.3.1 问题说明1.3.2 程序实现 2. 寄存器冲突问题的引入2.1 问题引入2.2 分析与解决问题2.2.1 字符串定义方式2.2.2 分析子程序功能2.2.3 得到子程序代码 2.3 子程序的应用2.3.1 示例12.3.2 示例…