Linux:利用匿名管道构建进程池

news2025/1/15 17:50:47

文章目录

  • 进程池
  • 实现进程池
    • 创建信道和进程
    • 发送任务
    • 释放资源
  • 进程池代码
  • 总结

本篇的主题是借助前面所学的基础管道实现一个进程池,那么在实现进程池前先了解进程池是什么,进程池有什么意义,进而对于进程池有一个基本的把握

进程池

给定一个进程,为这个进程创建多个管道,并且对于每一个管道都设置一个子进程,当父进程发送消息,子进程就能获取到消息的内容,而父进程的存在价值就是控制子进程,并且给子进程布置服务等等,而这个内容就叫做进程池

在谈到进程池前,要先提及的是池化技术,池化技术是很重要的技术,在STL容器中也存在诸如空间配置器这样的内容,这种内容诞生的原因就是因为,在调用系统中的资源是需要成本的,例如有空间资源,时间资源等等,而如果不断的申请这些资源就会耗费多余的资源,于是诞生了池化技术,提前申请一块大的空间,由申请人自己来管理,这个就是基本的池化技术

在STL的空间配置器中就有这样的内容,利用空间配置器可以提前申请一部分内容,并且基于这个空间进行自我管理,随时进行申请,就能减少从系统中申请资源带来的额外的成本,而进程池也是基于这样的原理,系统调用也是有成本的,就连最基本的函数跳转都有成本,所以才出现了内联函数来帮助提高效率,更别说对于这样大型的系统调用的接口,必然是有成本的,因此借助池化技术来完成工作时有很大的必要的

实现进程池

对于本篇实现的这个进程池来说,功能也是比较简单的:

  1. 创建信道和进程
  2. 发送任务
  3. 回收资源

创建信道和进程

要实现进程池,就要先清楚进程池整体的结构是什么,对于进程池来说,首先要有进程,有管道,这都进程池创建的必要组成部分,那么在创建进程池的阶段就要实现这些内容,所以要先创建并定义管道

// 定义要创建的子进程个数
const int num = 5;

int main()
{
    for (int i = 0; i < num; i++)
    {
        // 创建管道
        int pipefd[2];
        int n = pipe(pipefd);
        assert(n == 0);

        // 创建子进程
        pid_t id = fork();
        if (id == 0)
        {
            // 子进程负责读
            close(pipefd[1]);
            // do something...
            cout << "this is child" << endl;
            exit(0);
        }
        // 父进程负责写
        close(pipefd[0]);
        cout << "this is father" << endl;
    }
    return 0;
}

但是这样写固然是有问题的,问题点在于,父进程创建的子进程的数据会丢失,不利于进行管理,那么在管理这样的数据之前,首先要对这些内容进行描述,所以要创建对应的结构体进行描述,要管理的这些数据包括有,父进程管理的子进程的pid是多少,父进程和这个子进程传输的文件描述符下标是多少,最好还能用名字来管理,基于这些信息就能创建对应的描述结构体

class channel
{
public:
    channel(int fd, pid_t id) : ctrlfd(fd), workerid(id)
    {
        name = "channel-" + to_string(number++);
    }

public:
    int ctrlfd;
    pid_t workerid;
    string name;
};

因此基于这个内容,就创建好了描述的内容,而可以创建一张表,用来描述存储信息的情况,这样父进程对于子进程的管理就转换成了对于顺序表的管理,这样就能对于数据做一个很好的管理工作,再对创建过程做出一个具体的封装,就能封装出下面的代码:

void Create(vector<channel> *c)
{
    for (int i = 0; i < num; i++)
    {
        // 创建管道
        int pipefd[2];
        int n = pipe(pipefd);
        assert(n == 0);

        // 创建子进程
        pid_t id = fork();
        if (id == 0)
        {
            // 子进程负责读
            close(pipefd[1]);
            // do something...
            cout << "this is child" << endl;
            exit(0);
        }
        // 父进程负责写
        close(pipefd[0]);
        c->push_back(channel(pipefd[1], id));
    }
}

int main()
{
    vector<channel> channels;
    Create(&channels);
    return 0;
}

到此,封装的第一步就完成了,对象已经被创建完毕了

发送任务

对于任务这个版块,本篇的实现方式采用的是模拟调用的方式,给定三种调用的方式,然后利用随机调用的方式对这三种方式进行调用,用来模拟父进程给子进程派发任务这样的一个过程,具体的实现如下:

#pragma once
#include <iostream>
#include <functional>
#include <vector>
#include <ctime>
#include <unistd.h>
using namespace std;

typedef function<void()> task_t;

void Download()
{
    cout << "this is download" << endl;
}

void PrintLog()
{
    cout << "this is PrintLog" << endl;
}

void PushVideoStream()
{
    cout << "this is pushvideo" << endl;
}

class Init
{
public:
    // 任务码
    const static int g_download_code = 0;
    const static int g_printlog_code = 1;
    const static int g_push_videostream_code = 2;
    // 任务集合
    vector<task_t> tasks;

public:
    Init()
    {
        tasks.push_back(Download);
        tasks.push_back(PrintLog);
        tasks.push_back(PushVideoStream);
        srand(time(nullptr));
    }
    bool CheckSafe(int code)
    {
        if (code >= 0 && code < tasks.size())
            return true;
        else
            return false;
    }
    void RunTask(int code)
    {
        return tasks[code]();
    }
    int SelectTask()
    {
        return rand() % tasks.size();
    }
    string ToDesc(int code)
    {
        switch (code)
        {
        case g_download_code:
            return "Download";
        case g_printlog_code:
            return "PrintLog";
        case g_push_videostream_code:
            return "PushVideoStream";
        default:
            return "Unknow";
        }
    }
};

Init init;

将任务整合到函数体实现的对应位置,如下所示:

// 发送任务
void SendCommand(const vector<channel> &c, int count)
{
    int pos = 0;
    while (count--)
    {
        // 1. 选择任务
        int command = init.SelectTask();

        // 2. 选择信道(进程)
        const auto &channel = c[pos++];
        pos %= c.size();

        // 3. 发送任务
        write(channel.ctrlfd, &command, sizeof(command));

        sleep(1);
    }

    cout << "SendCommand done" << endl;
}

释放资源

对于释放资源来讲,主体思路主要是将管道对应的读端和写端都关闭,这样就可以使得管道得以释放了,于是有初步的实现设计

void ReleaseChannels(vector<channel> c)
{
    for (const auto &channel : c)
    {
        pid_t rid = waitpid(channel.workerid, nullptr, 0);
        if (rid == channel.workerid)
        {
            std::cout << "wait child: " << channel.workerid << " success" << std::endl;
        }
    }
}

从表象上看,这样的释放过程是没有问题的,但是从深层次的角度来讲,代码中其实隐藏着一些问题,用下图来表示

下图所示的是在理想状态下,在操作系统内部应该形成的状态体系结构,但是实际上,这样的图是有问题的,在父进程创建子进程的时候,子进程会继承的还有父进程的文件描述符表,正是依据这个原理才能设计出管道这样的结构,但是在第二个子进程继承父进程时,也会继承一份指向,所以管道1的引用计数实际上是比想象的要多一个指针的
在这里插入图片描述
下图所示的是操作系统内部真正的示意图

在这里插入图片描述
基于这样的原因,在进行设计的时候,可以记录之前已经打开的文件描述符,在后续创建子进程的时候刻意的将这些文件描述符都关掉,这样就可以解决问题

进程池代码

// task.hpp
#pragma once
#include <iostream>
#include <functional>
#include <vector>
#include <ctime>
#include <unistd.h>
using namespace std;

typedef function<void()> task_t;

void Download()
{
    cout << "this is download" << endl;
}

void PrintLog()
{
    cout << "this is PrintLog" << endl;
}

void PushVideoStream()
{
    cout << "this is pushvideo" << endl;
}

class Init
{
public:
    // 任务码
    const static int g_download_code = 0;
    const static int g_printlog_code = 1;
    const static int g_push_videostream_code = 2;
    // 任务集合
    vector<task_t> tasks;

public:
    Init()
    {
        tasks.push_back(Download);
        tasks.push_back(PrintLog);
        tasks.push_back(PushVideoStream);
        srand(time(nullptr));
    }
    bool CheckSafe(int code)
    {
        if (code >= 0 && code < tasks.size())
            return true;
        else
            return false;
    }
    void RunTask(int code)
    {
        return tasks[code]();
    }
    int SelectTask()
    {
        return rand() % tasks.size();
    }
    string ToDesc(int code)
    {
        switch (code)
        {
        case g_download_code:
            return "Download";
        case g_printlog_code:
            return "PrintLog";
        case g_push_videostream_code:
            return "PushVideoStream";
        default:
            return "Unknow";
        }
    }
};

Init init;
// processpool.cc
#include <iostream>
#include <string>
#include <vector>
#include <cassert>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "task.hpp"
using namespace std;

// 定义要创建的子进程个数
const int num = 5;
static int number = 1;

class channel
{
public:
    channel(int fd, pid_t id) : ctrlfd(fd), workerid(id)
    {
        name = "channel-" + to_string(number++);
    }

public:
    int ctrlfd;
    pid_t workerid;
    string name;
};

void Work()
{
    while (true)
    {
        int code = 0;
        ssize_t n = read(0, &code, sizeof(code));
        if (n == sizeof(code))
        {
            if (!init.CheckSafe(code))
                continue;
            init.RunTask(code);
        }
        else if (n == 0)
        {
            break;
        }
        else
        {
            // do nothing
        }
    }

    cout << "child quit" << endl;
}

void Create(vector<channel> *c)
{
    vector<int> old;
    for (int i = 0; i < num; i++)
    {
        // 1. 定义并创建管道
        int pipefd[2];
        int n = pipe(pipefd);
        assert(n == 0);
        (void)n;

        // 2. 创建进程
        pid_t id = fork();
        assert(id != -1);

        // 3. 构建单向通信信道
        if (id == 0) // child
        {
            if (!old.empty())
            {
                for (auto fd : old)
                {
                    close(fd);
                }
            }
            close(pipefd[1]);
            dup2(pipefd[0], 0);
            Work();
            exit(0); // 会自动关闭自己打开的所有的fd
        }

        // father
        close(pipefd[0]);
        c->push_back(channel(pipefd[1], id));
        old.push_back(pipefd[1]);
        // childid, pipefd[1]
    }
}

// 发送任务
void SendCommand(const vector<channel> &c, int count)
{
    int pos = 0;
    while (count--)
    {
        // 1. 选择任务
        int command = init.SelectTask();

        // 2. 选择信道(进程)
        const auto &channel = c[pos++];
        pos %= c.size();

        // 3. 发送任务
        write(channel.ctrlfd, &command, sizeof(command));

        sleep(1);
    }

    cout << "SendCommand done" << endl;
}

// 回收资源
void ReleaseChannels(std::vector<channel> c)
{
    int num = c.size() - 1;
    for (; num >= 0; num--)
    {
        close(c[num].ctrlfd);
        waitpid(c[num].workerid, nullptr, 0);
    }
}

int main()
{
    vector<channel> channels;

    // 1. 创建进程
    Create(&channels);

    // 2. 发送任务
    SendCommand(channels, 10);

    // 3. 回收资源
    ReleaseChannels(channels);

    return 0;
}

运行结果如下所示

在这里插入图片描述

总结

对于进程池来说,就是预先创建一批进程,然后让master进程直接向目标进程排放任务,这个进程被预先创建好之后,有任务就处理,没有任务就进行阻塞

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1417068.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

sql注入中的过滤问题

直接上题目&#xff1a; 当我输入?id1 or 11#时提示错误&#xff0c;从这里可以看出对面过滤了点什么导致我的语句出错了&#xff0c;因为我这个语句是用真的&#xff0c;不管放在哪都是会一定正确的&#xff0c;但是这里提示错误的。 然后我第一想法就是空格&#xff0c;我在…

代码增强LLM

大模型时代的语言模型&#xff08;LLM&#xff09;不仅在尺寸上变得更大了&#xff0c;而且训练数据也同时包含了自然语言和形式语言&#xff08;代码&#xff09;。作为人类和计算机之间的媒介&#xff0c;代码可以将高级目标转换为可执行的中间步骤&#xff0c;具有语法标准、…

VCRUNTIME140_1.dll丢失是怎么回事,需要如何修复

vcruntime140_1.dll 是一个动态链接库&#xff08;DLL&#xff09;文件&#xff0c;它是 Microsoft Visual C 2015 Redistributable 组件的一部分。这个文件包含了微软的 C 标准库的运行时组件&#xff0c;特别是与并行编程相关的部分。当开发者使用 Visual C 2015 及以上版本编…

HCIP-三层架构实验

实验拓扑 实验需求 实验思路 配置IP地址 链路聚合 vlan配置 配置生产树 实验步骤 配置IP地址 以R1为例 <Huawei>sys [Huawei]sys r1 [r1]int g0/0/02 [r1-GigabitEthernet0/0/2]ip address 12.1.1.1 24 Jan 28 2024 17:09:03-08:00 r1 %%01IFNET/4/LINK_STATE(l…

BGP:02 BGP认证

这是实验拓扑&#xff0c;物理接口IP地址来建立BGP邻居关系。 认证是指路由器对路由信息来源的可靠性及路由信息本身的完整性进行检测的机制。 下面是基本和BGP配置&#xff1a; R1: sys sysname R1 int loop 0 ip add 1.1.1.1 24 int g0/0/0 ip add 192.168.12.1 24 qbgp 1…

【vue3】Vue3 + Vite 项目搭建

Vue3 Vite 项目搭建 创建项目添加Vue Router 4路由配置添加Vant UI 组件库移动端rem适配添加iconfont字体图标库二次封装Axios请求库添加CSS预处理器Less添加全局状态管理插件Vuex 1.创建项目 Vite方式 1.1 进入开发目录, 执行指令创建新项目 更行node版本18 npm 7.x版本 su…

重写Sylar基于协程的服务器(0、搭建开发环境以及项目框架 || 下载编译简化版Sylar)

重写Sylar基于协程的服务器&#xff08;0、搭建开发环境以及项目框架 || 下载编译简化版Sylar&#xff09; 重写Sylar基于协程的服务器系列&#xff1a; 重写Sylar基于协程的服务器&#xff08;0、搭建开发环境以及项目框架 || 下载编译简化版Sylar&#xff09; 前言 sylar是…

竞赛练一练 第30期:GESP和电子学会相关题目练习

Day14&#xff1a;CIE一级2022.06_报时的公鸡 故事背景&#xff1a;公鸡在黎明时分会打鸣迎接太阳升起&#xff0c;古人也将鸡鸣声当做晨起的“闹钟”。 1. 准备工作 &#xff08;1&#xff09;背景&#xff1a;根据下图绘制两张背景&#xff1b; 01 02 &#xff08;2&…

时序分解 | MATLAB实现CEEMDAN+SE自适应经验模态分解+样本熵计算

时序分解 | MATLAB实现CEEMDANSE自适应经验模态分解样本熵计算 目录 时序分解 | MATLAB实现CEEMDANSE自适应经验模态分解样本熵计算效果一览基本介绍程序设计参考资料 效果一览 基本介绍 MATLAB实现CEEMDANSE自适应经验模态分解样本熵计算 包括频谱图 附赠案例数据 可直接运行 …

2024年社区店加盟:最火爆的项目投资小指南

在这个追求健康与品质的时代&#xff0c;社区店加盟成为了越来越多创业者的首选。 而在众多项目中&#xff0c;鲜奶吧因其独特的产品魅力和广泛的市场需求&#xff0c;成为了最火爆的投资项目之一。 作为一名拥有多年鲜奶吧经营经验的创业者&#xff0c;同时也是自媒体创业板…

C++11(中):智能指针

智能指针 1.内存泄漏1.1内存泄漏的概念以及危害1.2内存泄漏的场景1.3如何避免内存泄漏 2.智能指针的使用及原理2.1RAII2.2智能指针的原理2.3 std::auto_ptr2.4 定制删除器2.5 std::unique_ptr2.6 std::shared_ptr2.7 std::weak_ptr2.7.1 std::shared_ptr的循环引用2.7.2 循环引…

《WebKit 技术内幕》学习之十五(4):Web前端的未来

4 Cordova项目 Cordova是一个开源项目&#xff0c;能够提供将Web网页打包成本地应用格式的可运行文件。读者可能对Cordova项目陌生&#xff0c;但是大家可能对它的前身非常熟悉&#xff0c;那就是PhoneGap项目&#xff0c;它后来被Adobe公司收购。 图15-4描述了Cordova的主要工…

“Morpheus-1”的全新人工智能模型声称能引发清醒梦境

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

探索半导体制造业中的健永科技RFID读写器的应用方案

一、引言 在当今高度自动化的工业环境中&#xff0c;无线射频识别&#xff08;RFID&#xff09;技术已经成为实现高效生产的重要一环。特别是在半导体制造业中&#xff0c;由于产品的高价值和复杂性&#xff0c;生产过程的追踪和管理显得尤为重要。健永科技RFID读写器以其出色…

leetcode 第三弹

链表声明&#xff1a; * Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}* ListNode(int x) : val(x), next(nullptr) {}* ListNode(int x, ListNode *next) : val(x), next(n…

服务端开发小记03——vsftpd

这里写目录标题 vsftpd简介vsftpd在Linux下的安装vsftpd验证vsftpd常用命令 vsftpd简介 vsftpd是“very secure FTP daemon”的缩写&#xff0c;是一个用于Linux环境下的免费开源的ftp服务器软件。vsftpd在Linux发行版中最受推崇&#xff0c;小巧轻快&#xff0c;安全易用&…

解决:IDEA无法下载源码,Cannot download sources, sources not found for: xxxx

原因 Maven版本太高&#xff0c;遇到http协议的镜像网站会阻塞&#xff0c;要改为使用https协议的镜像网站 解决方案 1.打开设置 2. 拿到settings.xml路径 3. 将步骤2里箭头2的User settings file&#xff1a;settings.xml打开&#xff0c;作以下修改 保存即可。如果还不行…

《PCI Express体系结构导读》随记 —— 第II篇 第4章 PCIe总线概述(1)

随着现代处理器技术的发展&#xff0c;在互连领域中&#xff0c;使用高速差分总线替代并行总线是大势所趋。与单端并行信号相比&#xff0c;高速差分信号可以使用更高的时钟频率&#xff0c;使用更少的信号线&#xff0c;完成之前需要许多单端并行数据信号才能达到的总线带宽。…

多用户多店商城小程序开发价格_高品质源码_免费部署_OctShop

电商行业不断的发展壮大&#xff0c;市场份额越来越大的形势下&#xff0c;越来越多的企业开始开发自己的商城系统&#xff0c;搭建自己的电商平台&#xff0c;而这其中的一些大中型企业直接就开发像京东淘宝类似的多用户商城系统或多用户商城小程序&#xff0c;来实现将自己的…

Docker 安装nacos本地服务

docker 安装nacos实现服务注册与发现 本篇文章旨在快速搭建本地nacos服务 1 寻找nacos镜像 docker search nacos/nacos-server 2 拉取镜像 docker pull nacos/nacos-server docker pull nacos/nacos-server:v2.3.0 3docker run运行nacos docker run -d --name nacos -p 884…