【Linux】匿名管道的应用场景-----管道进程池

news2025/4/8 17:14:49

目录

一、池化技术

二、简易进程池的实现:

Makefile

task.h

task.cpp

Initchannel函数:

创建任务:

控制子进程:

子进程执行任务:

清理收尾:

三、全部代码:


前言:

对于管道,我们已经学习了匿名管道,那么这个匿名管道有什么用呢?接下来,我们以池化技术来实现一个简易的管道进程池

一、池化技术

1、内存池:减少内存分配的系统调用开销

问题:
当使用new或  malloc 动态分配内存时,每次都会触发系统调用
例如:每次申请5字节,或者申请10字节,再次申请20字节,会触发三次系统调用
系统调用成本高,因为操作系统可能正在处理其他任务,导致等待,降低效率

解决方案:
一次性向操作系统申请更大的内存空间(如100字节或200字节)
后续需要内存时,直接从这块预先申请好的空间中分配,避免频繁触发系统调用

优点:
减少系统调用次数,分摊开销,提高内存分配效率

2、进程池:减少进程创建的开销

问题:
传统方式是父进程创建子进程,子进程完成任务后退出,父进程等待
每次创建子进程时,操作系统需要复制task_struct,页表等数据结构,开销较大
频繁创建和销毁进程效率低下

解决方案:
预先创建一批子进程,作为资源储备(进程池)
当有任务到来时,父进程直接将任务分配给池中已有的子进程,而不是每次都创建新进程

优点:
减少进程创建的开销,提高任务分配和执行的效率

3. 核心思想

资源预分配
无论是内存池还是进程池,核心思想都是预先分配资源,避免重复的系统调用或资源创建
提高效率:
通过减少高频操作的开销,显著提升程序性能,尤其适用于高并发或高性能场景

总结
内存池:一次性申请大块内存,后续直接从池中分配,减少系统调用
进程池:预先创建一批进程,任务到来时直接分配,减少进程创建开销
共同目标:通过资源预分配,优化性能,降低系统开销

二、简易进程池的实现:

在了解上述的池化技术后,我们以这种思想来设计一个简易的进程池

如上,这是一个简易的进程池,我们让父进程向子进程发送数据也就是父进程向管道中写入数据,然后子进程从管道中读数据,这样父进程创建多个子进程,每次选择某个子进程和管道进行数据的传输,这样就是一个简易的进程池了

Makefile

task:tesk.cpp
	g++ -o $@ $^ -g -std=c++11
.PHONY:clean
clean:
	rm -f task

task.h

#pragma once 

#include<iostream>
#include<vector>
#include<string>

task.cpp

先描述:

首先通过class类来描述一个管道

#include"task.h"

//先描述,描述管道
class channel
{
public:
    //构造函数通过列表进行初始化
    channel(int fd, int childpid, const std::string &processname)
        :_fd(fd)
        ,_childpid(childpid)
        ,_processname(processname)
    {}
public:
    int _fd; //管道写端的文件描述符
    pid_t _childpid; //管道所连接表示子进程的pid
    std::string _processname; //管道所连接的子进程的名字
};

int main()
{
    //加载好任务

    //通过vector将一个个管道组织起来
    std::vector<channel> channels;

    //初始化,创建管道
    Initchannel(&channels);
    //控制子进程
    ctrlchild(channels);
    //清理收尾
    quitprocess(channels);
    return 0;
}

在组织:

在通过vector这个容器来将一个个管道组织起来,然后进行初始化,控制子进程,清理等操作

Initchannel函数:

创建管道,这个管道是连接父进程和子进程的,所以我们要创建几个管道就需要创建几个子进程

通过pipe创建管道文件

初始化:

void Initchannel(std::vector<channel>* channels)
{
    for(int i = 0;i<processnum;i++)
    {
        //创建管道
        int pipefd[2];
        int n = pipe(pipefd);//创建管道文件
        if(n != 0) exit(0);//创建失败就退出程序

        //创建完管道文件然后创建管道文件对应的子进程
        pid_t id = fork();
        if(id == 0)
        {
            //子进程模块,这里子进程去读,所以关闭写通道
            close(pipefd[1]);
            //重定向
            dup2(pipefd[0],0);
            work();//子进程进行工作的函数模块

            exit(0);//工作完就结束子进程
        }
        //父进程去写所以这里关闭读端
        close(pipefd[0]);

        std::string name = "process"+std::to_string(i);
        channels->push_back(channel(pipefd[1],id,name));
    }
}

创建任务:

//重定义函数指针
typedef void(* task)();

void task1()
{
    std::cout<<"刷新野区"<<std::endl;
}

void task2()
{
    std::cout<<"刷新技能"<<std::endl;
}

void task3()
{
    std::cout<<"泉水回血"<<std::endl;
}

void task4()
{
    std::cout<<"技能耗蓝"<<std::endl;
}

void Loadtask(std::vector<task>* tasks)
{
    tasks->push_back(task1);
    tasks->push_back(task2);
    tasks->push_back(task3);
    tasks->push_back(task4);
}

其中Loadtask函数的作用就是保存所执行的任务

接着在程序最开始(也就是在main函数的最开始),将我们的任务都进行加载好

还需要全局变量

processnum:

控制进程池的大小,决定创建多少个子进程。

方便扩展,例如通过配置文件动态设置子进程数量。

tasks:

集中管理所有任务,便于父进程和子进程共享任务列表。

通过任务码(索引)快速定位和执行任务。

const int processnum = 5;//进程中子进程的数量
std::vector<task> tasks;//存储任务的容器

int main()
{
    //加载好任务
    Loadtask(&tasks);

    //通过vector将一个个管道组织起来
    std::vector<channel> channels;

    //初始化,创建管道
    Initchannel(&channels);
    //控制子进程
    ctrlchild();
    //清理收尾
    quitprocess();

    return 0;
}

手动控制子进程

这里首先搞一个菜单出来

void menu()
{
    std::cout << "#########################################" << std::endl;
    std::cout << "# 1、刷新野区            2、刷新技能      #" << std::endl;
    std::cout << "# 3、泉水回血            4、技能耗蓝      #" << std::endl;
    std::cout << "# 0、退出                                #" << std::endl;
    std::cout << "#########################################" << std::endl;
}

在控制子进程的过程中,我们首先选好我们要完成的任务,然后向管道里写一个任务码,然后被选中的子进程就会从管道中找到任务码,就可以根据vector<task>里面的任务知道需要执行哪一个任务了

控制子进程:

根据菜单,每次输入任务码的时候,就通过write系统调用来确定子进程,发送任务

void ctrlchild(std::vector<channel>& channels)
{
    int which = 0;
    while(1)
    {
        menu();
        int selet = 0;
        std::cout<<"请输入所选择的任务";
        std::cin>>selet;
        
        //判断所选的任务码是否合法
        if(selet<=0 || selet>=5) break;
        
        //任务选择
        int taskcode = selet - 1;

        //子进程选择,轮询法
        std::cout<<"father say taskcode :" << taskcode <<" already send to " << channels[which]._childpid 
            << "process name " << channels[which]._processname << std::endl;
        //发送任务
        write(channels[which]._fd,&taskcode,sizeof(taskcode));//确定子进程,子进程所接收的任务码,和所接受的大小

        //保证所选的进程合法
        which++;
        which %= channels.size();
    }
}

子进程执行任务:

这是子进程的核心代码,通过read系统调用接口从管道中读到任务码,在通过这个任务码和函数指针来调用任务

//子进程工作代码,通过父进程向管道发送的任务码找到对应的任务,然后执行
void work()
{
    while(1)
    {
        int teskcode = 0;
        int n = read(0, &teskcode, sizeof(teskcode)); //read返回的是读取到的字节的个数。 
        //然后第二个参数是要读到哪里去,第三个参数是读取的大小,这一行代码执行完后,teskcode里面保存的就是要执行的任务码
 
        if (n == sizeof(teskcode))
        {
            std::cout << "work get a command : " << getpid() << " : " << "teskcode" << teskcode << std::endl;
            if (teskcode >= 0 && teskcode < tasks.size()) tasks[teskcode]();
        }
        if (n == 0) break; //如果读到0, 说明写端关闭, 读端读到文件末尾, 就需要停止读取了
    }
}

清理收尾:

在清理收尾的时候不能够直接关闭然后父进程等待子进程,这样是有问题的,因为当父进程创建子进程的时候,子进程的读端也会指向对应的管道,这样的话一个管道就会有很多个读端了,如下:

每一个子进程的写端都会指向之前的所有管道,比如,如果上述有3个进程的话,那么第一个管道就有3个写端(父进程一个,父进程创建的两个子进程的写端都会指向第一个管道),那么如果直接close(c._fd)关闭管道的写端的话,那么是不能够关闭完全的,需要全部关闭,这里有两种解决方式:

第一种方式:

从后往前进行关闭,下面代码已给出,我们知道最后一个管道依然是只有一个写端的,那么当最后一个管道关闭后,前面的管道的写端就都会少一个,这样的话从后往前关闭就不会出现这样的问题了

void quitprocess(const std::vector<channel> &channels)
{
    //出现问题
    for(const auto &c : channels)
    {
        close(c._fd);   
        waitpid(c._childpid,nullptr,0);
    }


    // //解决方案1
    // int last = channels.size()-1;
    // for(int i = last;i>=0;i--)
    // {
    //     close(channels[i]._fd);
    //     waitpid(channels[i]._childpid,nullptr,0);
    // }



    // for(const auto &c : channels)
    // {
    //     close(c._fd);
    // }
    // for(const auto &c : channels)
    // {
    //     waitpid(c._childpid,nullptr,0);
    // }
}

第二种方式:

在创建管道的时候进行记录父进程所占用的文件描述符

三、全部代码:

makefile

task:task.cpp
	g++ -o $@ $^ -g -std=c++11
.PHONY:clean
clean:
	rm -f task

task.h

#pragma once 

#include<iostream>
#include<vector>
#include<string>

#include<unistd.h>
#include<sys/types.h>
#include<sys/wait.h>

//重定义函数指针
typedef void(* task)();

void task1()
{
    std::cout<<"刷新野区"<<std::endl;
}

void task2()
{
    std::cout<<"刷新技能"<<std::endl;
}

void task3()
{
    std::cout<<"泉水回血"<<std::endl;
}

void task4()
{
    std::cout<<"技能耗蓝"<<std::endl;
}

//保存所执行的任务
void Loadtask(std::vector<task>* tasks)
{
    tasks->push_back(task1);
    tasks->push_back(task2);
    tasks->push_back(task3);
    tasks->push_back(task4);
}

task.cpp

#include"task.h"

const int processnum = 5;
std::vector<task> tasks;
//先描述,描述管道
class channel
{
public:
    //构造函数通过列表进行初始化
    channel(int fd, int childpid, const std::string &processname)
        :_fd(fd)
        ,_childpid(childpid)
        ,_processname(processname)
    {}
public:
    int _fd; //表示父进程链接某个管道的fd
    pid_t _childpid; //管道所连接表示子进程的pid
    std::string _processname; //管道所连接的子进程的名字
};

//子进程工作代码,通过父进程向管道发送的任务码找到对应的任务,然后执行
void work()
{
    while(1)
    {
        int teskcode = 0;
        int n = read(0, &teskcode, sizeof(teskcode)); //read返回的是读取到的字节的个数。 
        //然后第二个参数是要读到哪里去,第三个参数是读取的大小,这一行代码执行完后,teskcode里面保存的就是要执行的任务码
 
        if (n == sizeof(teskcode))
        {
            std::cout << "work get a command : " << getpid() << " : " << "teskcode" << teskcode << std::endl;
            if (teskcode >= 0 && teskcode < tasks.size()) tasks[teskcode]();
        }
        if (n == 0) break; //如果读到0, 说明写端关闭, 读端读到文件末尾, 就需要停止读取了
    }
}

void Initchannel(std::vector<channel>* channels)
{
    //解决方案2
    std::vector<int> oldfd;//创建一个数组记录父进程所在的文件描述符的写端
    for(int i = 0;i<processnum;i++)
    {
        //创建管道
       
        int pipefd[2];
        int n = pipe(pipefd);//创建管道文件
        if(n != 0) exit(0);//创建失败就退出程序

        //创建完管道文件然后创建管道文件对应的子进程
        pid_t id = fork();
        if(id == 0)
        {
            //将父进程所在的文件描述符写端,子进程将其关闭
            for(auto fd : oldfd) close(fd);

            //子进程模块,这里子进程去读,所以关闭写通道
            close(pipefd[1]);
            //重定向
            dup2(pipefd[0],0);

            work();//子进程进行工作的函数模块
            std::cout << "Process PID: " << getpid() << " quit" << std::endl;
            exit(0);//工作完就结束子进程
        }
        //父进程去写所以这里关闭读端
        close(pipefd[0]);

        std::string name = "process"+std::to_string(i);
        channels->push_back(channel(pipefd[1],id,name));
        //每次记录父进程写端所在的文件描述符
        oldfd.push_back(pipefd[1]); 
    }
}

void menu()
{
    std::cout << "#########################################" << std::endl;
    std::cout << "# 1、刷新野区            2、刷新技能      #" << std::endl;
    std::cout << "# 3、泉水回血            4、技能耗蓝      #" << std::endl;
    std::cout << "# 0、退出                                #" << std::endl;
    std::cout << "#########################################" << std::endl;
}

void ctrlchild(std::vector<channel>& channels)
{
    int which = 0;
    while(1)
    {
        menu();
        int selet = 0;
        std::cout<<"请输入所选择的任务";
        std::cin>>selet;
        
        //判断所选的任务码是否合法
        if(selet<=0 || selet>=5) break;
        
        //任务选择
        int taskcode = selet - 1;

        //子进程选择,轮询法
        std::cout<<"father say taskcode :" << taskcode <<" already send to " << channels[which]._childpid 
            << "process name " << channels[which]._processname << std::endl;
        //发送任务
        write(channels[which]._fd,&taskcode,sizeof(taskcode));//确定子进程,子进程所接收的任务码,和所接受的大小

        //保证所选的进程合法
        which++;
        which %= channels.size();
    }
}

void quitprocess(const std::vector<channel> &channels)
{
    //出现问题
    for(const auto &c : channels)
    {
        close(c._fd);   
        waitpid(c._childpid,nullptr,0);
    }


    // //解决方案1
    // int last = channels.size()-1;
    // for(int i = last;i>=0;i--)
    // {
    //     close(channels[i]._fd);
    //     waitpid(channels[i]._childpid,nullptr,0);
    // }



    // for(const auto &c : channels)
    // {
    //     close(c._fd);
    // }
    // for(const auto &c : channels)
    // {
    //     waitpid(c._childpid,nullptr,0);
    // }
}

int main()
{
    //加载好任务
    Loadtask(&tasks);

    //通过vector将一个个管道组织起来
    std::vector<channel> channels;

    //初始化,创建管道
    Initchannel(&channels);
    //控制子进程
    ctrlchild(channels);
    //清理收尾
    quitprocess(channels);

    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2301402.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PostgreSQL的学习心得和知识总结(一百六十九)|深入理解PostgreSQL数据库之 Group By 键值消除 的使用和实现

目录结构 注&#xff1a;提前言明 本文借鉴了以下博主、书籍或网站的内容&#xff0c;其列表如下&#xff1a; 1、参考书籍&#xff1a;《PostgreSQL数据库内核分析》 2、参考书籍&#xff1a;《数据库事务处理的艺术&#xff1a;事务管理与并发控制》 3、PostgreSQL数据库仓库…

Python基于循环神经网络的情感分类系统(附源码,文档说明)

博主介绍&#xff1a;✌IT徐师兄、7年大厂程序员经历。全网粉丝15W、csdn博客专家、掘金/华为云//InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&#x1f3…

Zookeeper应用案例-分布式锁-实现思路

以下是具体实现代码 第一步&#xff1a;注册锁节点 第二步&#xff1a;获取锁节点&#xff0c;如果自己是最小的节点&#xff0c;就获取权限 第三步&#xff1a;拿到锁就开始自己的业务逻辑 第四步&#xff1a;业务逻辑好了就要释放这把锁 第五步&#xff1a;重新注册监听&…

java练习(32)

ps&#xff1a;题目来自力扣 环形链表 给你一个链表的头节点 head &#xff0c;判断链表中是否有环。 如果链表中有某个节点&#xff0c;可以通过连续跟踪 next 指针再次到达&#xff0c;则链表中存在环。 为了表示给定链表中的环&#xff0c;评测系统内部使用整数 pos 来表…

伯克利 CS61A 课堂笔记 10 —— Trees

本系列为加州伯克利大学著名 Python 基础课程 CS61A 的课堂笔记整理&#xff0c;全英文内容&#xff0c;文末附词汇解释。 目录 01 Trees 树 Ⅰ Tree Abstraction Ⅱ Implementing the Tree Abstraction 02 Tree Processing 建树过程 Ⅰ Fibonacci tree Ⅱ Tree Process…

让编程变成一种享受-明基RD320U显示器

引言 作为一名有着多年JAVA开发经验的从业者&#xff0c;在工作过程中&#xff0c;显示器的重要性不言而喻。它不仅是我们与代码交互的窗口&#xff0c;更是影响工作效率和体验的关键因素。在多年的编程生涯中&#xff0c;我遇到过各种各样的问题。比如&#xff0c;在进行代码…

10分钟上手DeepSeek开发:SpringBoot + Vue2快速构建AI对话系统

作者&#xff1a;后端小肥肠 目录 1. 前言 为什么选择DeepSeek&#xff1f; 本文技术栈 2. 环境准备 2.1. 后端项目初始化 2.2. 前端项目初始化 3. 后端服务开发 3.1. 配置文件 3.2. 核心服务实现 4. 前端服务开发 4.1. 聊天组件ChatWindow.vue开发 5. 效果展示及源…

Linux环境开发工具

Linux软件包管理器yum Linux下安装软件方式&#xff1a; 源代码安装rpm安装——Linux安装包yum安装——解决安装源、安装版本、安装依赖的问题 yum对应于Windows系统下的应用商店 使用Linux系统的人&#xff1a;大部分是职业程序员 客户端怎么知道去哪里下载软件&#xff1…

JupyterNotebook高级使用:常用魔法命令

%%writefile test.py def Test(name):print("Test",name,"success")运行结果&#xff1a;就是在我们的文件目录下面创建了这个test.py文件&#xff0c;主要是认识一下这个里面的%%writefile表示创建新的文件&#xff0c;这个文件里面的内容就是上面我们定义…

C++ Primer 类的作用域

欢迎阅读我的 【CPrimer】专栏 专栏简介&#xff1a;本专栏主要面向C初学者&#xff0c;解释C的一些基本概念和基础语言特性&#xff0c;涉及C标准库的用法&#xff0c;面向对象特性&#xff0c;泛型特性高级用法。通过使用标准库中定义的抽象设施&#xff0c;使你更加适应高级…

50页PDF|数字化转型成熟度模型与评估(附下载)

一、前言 这份报告依据GBT 43439-2023标准&#xff0c;详细介绍了数字化转型的成熟度模型和评估方法。报告将成熟度分为五个等级&#xff0c;从一级的基础转型意识&#xff0c;到五级的基于数据的生态价值构建与创新&#xff0c;涵盖了组织、技术、数据、资源、数字化运营等多…

机器学习实战(8):降维技术——主成分分析(PCA)

第8集&#xff1a;降维技术——主成分分析&#xff08;PCA&#xff09; 在机器学习中&#xff0c;降维&#xff08;Dimensionality Reduction&#xff09; 是一种重要的数据处理技术&#xff0c;用于减少特征维度、去除噪声并提高模型效率。主成分分析&#xff08;Principal C…

前端插件使用xlsx-populate,花样配置excel内容,根据坐添加标替换excel内容,修改颜色,合并单元格...。

需求要求&#xff1a;业务人员有个非常复杂得excel表格&#xff0c;各种表头等&#xff0c;但是模板是固定得。当然也可以实现在excel上搞出各种表格&#xff0c;但是不如直接用已有模板替换其中要动态得内容方便&#xff0c;这里我们用到CSDN得 xlsx-populate 插件。 实列中我…

分布式大语言模型服务引擎vLLM论文解读

论文地址&#xff1a;Efficient Memory Management for Large Language Model Serving with PagedAttention 摘要 大语言模型&#xff08;LLMs&#xff09;的高吞吐量服务需要一次对足够多的请求进行批处理。然而&#xff0c;现有系统面临困境&#xff0c;因为每个请求的键值…

如何开发一个大模型应用?

1. 背景 AIGC技术的突破性进展彻底改变了技术开发的范式&#xff0c;尤其是以GPT为代表的LLM&#xff0c;凭借其强大的自然语言理解与生成能力&#xff0c;迅速成为全球科技领域的焦点。2023年末&#xff0c;随着ChatGPT的爆火&#xff0c;AIGC技术从实验室走向规模化应用&…

[数据结构]二叉搜索树详解

目录 一、二叉搜索树的概念 二、二叉搜索树的性能分析 三、二叉搜索树的中序遍历用于排序去重 四、二叉搜索树的查找 1、查找的非递归写法 2、查找的递归写法 五、二叉搜索树的插入 1、插入的非递归写法 2、插入的递归写法 六、二叉搜索树的删除 1、删除的非递归写法…

撕碎QT面具(2):groupBox内容居中显示

问题描述&#xff1a; 当笔者在GroupBox中使用Form Layout构建图中内容时&#xff0c;不能居中显示。 解决方案&#xff1a; 1、首先在form layout左右添加横向弹簧&#xff0c;并ctrl进行选中这三个控件。点击水平布局&#xff0c;让中间的控件不变形。 2、选中groupBox&#…

SpringBoot速成(14)文件上传P23-P26

1. 什么是 multipart/form-data&#xff1f; 想象一下&#xff0c;你有一个包裹要寄给朋友&#xff0c;但包裹里有不同类型的东西&#xff1a;比如一封信&#xff08;文字&#xff09;、一张照片&#xff08;图片&#xff09;和一个小礼物&#xff08;文件&#xff09;。为了确…

图论入门算法:拓扑排序(C++)

上文中我们了解了图的遍历(DFS/BFS), 本节我们来学习拓扑排序. 在图论中, 拓扑排序(Topological Sorting)是对一个有向无环图(Directed Acyclic Graph, DAG)的所有顶点进行排序的一种算法, 使得如果存在一条从顶点 u 到顶点 v 的有向边 (u, v) , 那么在排序后的序列中, u 一定…

【iOS】SwiftUI状态管理

State ObservedObject StateObject 的使用 import SwiftUIclass CountModel: ObservableObject {Published var count: Int 0 // 通过 Published 标记的变量会触发视图更新init() {print("TimerModel initialized at \(count)")} }struct ContentView: View {State…