【Linux进程】基于管道实现进程池

news2025/1/22 14:55:04

目录

前言

 1. 进程池

   1.1 基本结构:

1.2. 池化技术

 1.3. 思路分析

1.4. 代码实现

总结


前言

        上篇文章介绍了管道及其使用,本文在管道的基础上,通过匿名管道来实现一个进程池;

在这里插入图片描述

 1. 进程池

 父进程创建一组子进程,子进程根据父进程的发送的信号,来做出相应的操作;

   1.1 基本结构:

 master为父进程,父进程通过管道向子进程发送对应的信号,让子进程执行相关的操作;

1.2. 池化技术

为什么要有进程池?
        要解答这个问题,需要先了解池化技术;池化技术是一种常见的优化方法,可用于提高计算和存储资源的利用率,从而提高系统性能。通过分类和管理资源或任务的池,可以实现资源的高效共享和复用;

        举个最简单的例子:我们在写vector时,它有一个扩容操作,我们在实现时一般是2倍扩容,为什么要多扩容?——为了防止频繁的申请空间;池化技术也是类似的优化方法,通过一次性申请一定数量的资源,然后自己管理这些资源的分配和回收,从而减少频繁向操作系统申请资源的次数在操作系统中,申请空间、创建进程等操作都需要一定的时间开销。因此,频繁地进行这些操作会降低系统的效率;
        进程池会提前创建一定数量的进程并保存在进程池中,当需要使用新的进程时,可以直接从进程池中获取已经存在的空闲进程来执行任务,而不需要每次都创建新的进程,从而减少了创建和销毁进程的开销;

注意:

         在实现上,把任务分配给不同的信道一定要平均,不能是有的信道很忙,有的信道很闲,这样也无法提高效率;

 在此之前,为了便于理解,这里再次回顾一下管道的特点,及几种不同的情况:

a. 管道的4种情况

  1. 正常情况,如果管道没有数据了,读端必须等待,直到有数据为止(写端写入数据了)
  2. 正常情况,如果管道被写满了,写端必须等待,直到有空间为止(读端读走数据)
  3. 写端关闭,读端一直读取, 读端会读到read返回值为0, 表示读到文件结尾
  4. 读端关闭,写端一直写入,OS会直接杀掉写端进程,通过想目标进程发送SIGPIPE(13)信号,终止目标进程

b. 匿名管道的5种特性

  1. 匿名管道,可以允许具有血缘关系的进程之间进行进程间通信,常用与父子,仅限于此
  2. 匿名管道,默认给读写端要提供同步机制 --- 了解现象就行
  3. 面向字节流的 --- 了解现象就行
  4. 管道的生命周期是随进程的
  5. 管道是单向通信的,半双工通信的一种特殊情况

 1.3. 思路分析

         要创建一个管道用于父进程于子进程的通信简单:

 问题在于后续管道的创建:

结构图如下:

         在创建第二个管道时,父进程新建子进程,子进程继承父进程的属性;然后关闭父进程的读端,子进程的写端,构建单向信道;
        问题就出在这里,看上图结构:子进程会继承父进程属性,所以第二个子进程的3号文件描述符也会指向的1号管道的写端(正常情况下是不能指定的);
        以此类推,第三个进程也会指向1号管道和2号管道;只有最后一个管道,是只有一个写端指向,其余的管道都有多个写端;

在实际上不会出现子进程向管道写入的情况,但是在关闭管道的时候容易出问题;不注意就会导致程序阻塞;

正常的关闭信道这样写:

for (const auto& e : c)
{
    close(e.ctrlfd);
    waitpid(e.workerid, nullptr, 0);
}

 遍历这个数组,关闭父进程对每个管道的写端;看似很完美,而实际情况是:

        除最后一个管道,其余管道依然会有写端指向,而只有当所有写端都关闭后,调用read函数时,才会返回0,表示已经读取到文件末尾;此时表示执行完毕,会进入下一步的回收;
        如果存在写端,并且写端一直不写入数据,在这种情况下,read函数不会返回而是一直等待数据到来。读端(子进程)的read函数会一直阻塞,直到有数据写入为止;这也就会导致程序阻塞住;

如何解决?

方法一:

        倒着遍历去关闭管道;

        倒着去关闭,最后一个管道没有写端,管道正常关闭,回收子进程,子进程被回收,文件描述符也会被释放,其余管道的写端就会减少;依次关闭,就可以保证所有管道正常关闭;

 方法二:

        借助数据结构去解决,记录父进程中的写端fd,在子进程中依次关闭;这样创建出来的信道之间连接关系是理想的,关系不会那么乱;

        父进程中创建一个临时vector,父进程记录每个管道写端fd,把数据存储道临时vector中一份;这样每个子进程拿到的就是当前父进程所有写端的fd,子进程全部关掉即可;

1.4. 代码实现

模拟执行任务,编写一个任务类:

// Task.hpp
#pragma once

#include <iostream>
#include <functional>
#include <vector>
#include <ctime>

typedef std::function<void()> task_t;
void Download()
{
    std::cout << "我是一个下载任务"
        << " 处理者: " << getpid() << std::endl;
}

void PrintLog()
{
    std::cout << "我是一个打印日志的任务"
        << " 处理者: " << getpid() << std::endl;
}
void PushVideoStream()
{
    std::cout << "这是一个推送视频流的任务"
        << " 处理者: " << getpid() << std::endl;
}

class Tasklist
{

public:
    Tasklist()
    {
        tasks.push_back(Download);
        tasks.push_back(PrintLog);
        tasks.push_back(PushVideoStream);
        // 模拟随机任务
        srand(time(nullptr));
    }
    // 检查信号是否合法
    bool CheckSafe(int code)
    {
        if (code >= 0 && code < tasks.size())
            return true;
        else
            return false;
    }
    // 执行任务
    void RunTask(int code)
    {
        return tasks[code]();
    }
    // 随机选择任务
    int SelectTask()
    {
        return rand() % tasks.size();
    }
    // 信号转换
    std::string ToDesc(int code)
    {
        switch (code)
        {
        case g_download_code:
            return "Download";
        case g_printlog_code:
            return "PrintLog";
        case g_push_videostream_code:
            return "PushVideoStream";
        default:
            return "Unknow";
        }
    }
public:
    const static int g_download_code = 0;
    const static int g_printlog_code = 1;
    const static int g_push_videostream_code = 2;

    std::vector<task_t> tasks;
};

Tasklist init;//定义对象

 进程池:

#include <iostream>
#include <vector>
#include <string>
#include <unistd.h>
#include <assert.h>
#include <sys/wait.h>
#include <sys/types.h>
#include "Task.hpp"

const int nums = 5;
static int number = 1;

// 将管道描述成一个对象
class channel
{
public:
    channel(int fd, pid_t id)
        :ctrlfd(fd)
        , workerid(id)
    {
        name = "channel " + std::to_string(number++);
    }
public:
    int ctrlfd;
    pid_t workerid;//进程id
    std::string name;

};
// 工作接口
void work()
{
    while (true)
    {
        int code = 0;
        // 该接口由子进程执行
        // 从标准输入去读(其实就是从管道去读)
        // 为了方便读数据,所以将对应管道读端,重定向到标准输入,否则还需记录子进程对应管道的读端
        ssize_t n = read(0, &code, sizeof(code));
        //assert(n == sizeof(code)); //父进程一旦退出,子进程也会退出,没有数据写入,读到的字节是0,assert强制停止;
        // 判断读到的数据是否正常
        if (n == sizeof(code))
        {
            if (!init.CheckSafe(code)) continue;
            init.RunTask(code);
        }
        else if (n == 0)
        {
            break;
        }
        else
        {

        }
    }
    std::cout << "child quit " << std::endl;
}

void PrintFd(const std::vector<int>& fds)
{
    std::cout << getpid() << " close fds: ";
    for (auto fd : fds)
    {
        std::cout << fd << " ";
    }
    std::cout << std::endl;
}

// 创建管道
void CreateChannel(std::vector<channel>* c)
{
    std::vector<int> tmp;
    for (int i = 0; i < nums; i++)
    {
        // 创建信道
        int pipefd[2];
        int n = pipe(pipefd);//管道建立成功返回0,失败返回错误码
        assert(n == 0);
        (void)n;

        // 创建进程
        pid_t id = fork();
        assert(id >= 0); // 条件为假返回报错信息

        // 构建单向信道
        if (id == 0) //child
        {
            if (!tmp.empty())
            {
                // 方法二
                // 关闭其余写端的fd
                for (auto& e : tmp)
                {
                    close(e);
                }
                PrintFd(tmp);
            }
            // 这里并没有重复关闭,先创建的管道,再创建子进程,然后再关闭将写端fd加入到tmp
            // 这里是关闭子进程对当前管道的写端
            close(pipefd[1]);
            // 将标准输入重定向到管道读端,管道从标准输入中读数据
            dup2(pipefd[0], 0);
            // TODO
            work();

            exit(0);
        }

        // father
        close(pipefd[0]); //关闭父进程读
        // 将管道存储起来方便后续管理
        c->push_back(channel(pipefd[1], id));
        tmp.push_back(pipefd[1]);
    }
}


void SendCommand(const std::vector<channel>& c, bool flag, int nums = -1)
{
    int pos = 0;
    while (true)
    {
        //1.选择任务
        int command = init.SelectTask();
        //2.选择信道
        const auto& channel = c[pos++];
        pos %= c.size();
        //debug
        std::cout << "send command " << init.ToDesc(command) << "[" << command << "]"
            << " in "
            << channel.name << " worker is : " << channel.workerid << std::endl;
        //3.发送任务
        write(channel.ctrlfd, &command, sizeof(command));
        // 判断任务执行完是否退出
        if (!flag)
        {
            nums--;
            if (nums <= 0)
                break;
        }
        sleep(1);
    }
    std::cout << "SendCommand done..." << std::endl;
}
void ReleaseChannel(std::vector<channel>& c)
{
    //倒着回收
    // int n = c.size() - 1;
    // for(; n >= 0; n--)
    // {
    //     close(c[n].ctrlfd);
    //     waitpid(c[n].workerid, nullptr, 0);
    // }
    for (const auto& e : c)
    {
        close(e.ctrlfd);
        waitpid(e.workerid, nullptr, 0);
    }
}

const bool g_always_loop = true;
int main()
{
    std::vector<channel> channels;
    //创建进程创建信道
    CreateChannel(&channels);

    //开始执行任务
    SendCommand(channels, !g_always_loop, 10);

    //回收资源等待子进程退出
    ReleaseChannel(channels);

    return 0;
}

 结构并不复杂,这里只是一个简单的示例;这个示例比较考验对多进程编程;

1.5. 思考

在进程池体系中,如果一个子进程退出了(一个管道的读端关闭)会怎样?

       如果父进程知道子进程已经退出,即通过监控子进程状态并处理子进程退出的情况下,父进程在得知子进程退出后就不会再继续向已经退出的子进程的管道中写入数据也不会因为收到信号而终止
        如果父进程没有正确地监控子进程的状态,不知道子进程已经退出,那么当父进程尝试向已经退出的子进程的管道中写入数据时,会收到SIGPIPE信号而终止。在这种情况下,剩余的子进程会成为孤儿进程,由操作系统接管;

因此在设计时也可以进行特殊处理,处理子进程退出,避免父进程被OS杀死的情况;如何去处理?

        可以在选择信道那里多一步判断,判断信道对应的子进程是否已经退出,通过信道描述类,找到对应的进程id,通过waitpid的非阻塞等待判断是否退出;


总结

        以上便是本文的全部内容,希望对你有所帮助或启发,感谢阅读!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2263670.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PCL点云库入门——PCL库中点云数据拓扑关系之K-D树(KDtree)

1、点云的拓扑邻域 在三维空间数据处理的领域中&#xff0c;点云的邻域概念显得尤为关键&#xff0c;它不仅链接了点云数据之间的拓扑结构&#xff0c;而且在构建点云间的拓扑关系时起到了桥梁的作用。这种关系的建立&#xff0c;使得我们能够以一种高效、迅速的方式管理庞大的…

Leecode刷题C语言之根据第k场考试的分数排序

执行结果:通过 执行用时和内存消耗如下&#xff1a; int gk 0;int compare(const void* a, const void* b) {int* ua *(int**)a;int* ub *(int**)b;return ub[gk] - ua[gk]; }int** sortTheStudents(int** score, int scoreSize, int* scoreColSize, int k, int* returnSiz…

由popover框一起的操作demo问题

场景&#xff1a; 当popover框弹出的时候&#xff0c;又有MessageBox 提示&#xff0c;此时关闭MessageBox 提示&#xff0c;popover就关闭了。将popover改为手动激活&#xff0c;可以解决这个问题&#xff0c;但是会引起另外一个问题&#xff0c;之前&#xff08;click触发的时…

QT修改运行窗口的图标

首先&#xff0c;在.pro下添加两行&#xff1a; Debug:DESTDIR $$PWD Release:DESTDIR $$PWD 指定目标文件的路径 指定生成的debug和release文件夹路径在当前项目下 上面是为了防止爆奇怪的错 右键项目添加新文件 选择QT-》QT Resource File 起个名&#xff0c;然后下一步…

降低Mobx技术债问题-React前端数据流方案调研整理

我们现在主要是使用Mobx&#xff0c;但是Mobx的易于上手和灵活度也带来了很多预期以外的问题&#xff0c;随着项目的增长我们的代码技术债变得愈加沉重&#xff0c;不同的模块杂糅一起、单一store无限膨胀。 为此我们的调研是希望能找到一个更好的state配置、数据流的约定方案。…

sql server索引优化语句

第一步 建一个测试表 --create table TestUsers --( -- Id int primary key identity(1,1), -- Username varchar(30) not null, -- Password varchar(10) not null, -- CreateDateTime datetime not null --)第二步 插入100w数据 大概1分钟执行时间 ----插入数据…

aioice里面candidate固定UDP端口测试

环境&#xff1a; aioice0.9.0 问题描述&#xff1a; aioice里面candidate固定UDP端口测试 解决方案&#xff1a; /miniconda3/envs/nerfstream/lib/python3.10/site-packages/aioice import hashlib import ipaddress import random from typing import Optional import…

Java(二十五)final关键字

Java中的final关键字在编写程序中,比较常用。尤其是在上文中的匿名内部类中。 final 表示最终,也可以称为完结器,表示对象是最终形态的,不可改变的意思。 使用final修饰的的类,是“断子绝孙”的。 一:final修饰成员变量 Final修饰的类的成员变量是常量,不可被改变。 …

MySQL三大日志-Redo Log

Redo Log简介 事务中修改的任何数据&#xff0c;将最新的数据备份存储的位置&#xff08;Redo Log&#xff09;&#xff0c;被称为重做日志。 Redo Log 的生成和释放 随着事务操作的执行&#xff0c;就会生成Redo Log&#xff0c;在事务提交时会将产生Redo Log写入Log Buff…

【libuv】Fargo信令2:【深入】client为什么收不到服务端响应的ack消息

客户端处理server的ack回复,判断链接连接建立 【Fargo】28:字节序列【libuv】Fargo信令1:client发connect消息给到server客户端启动后理解监听read消息 但是,这个代码似乎没有触发ack消息的接收: // 客户端初始化 void start_client(uv_loop_t

html中实用标签dl dt dd(有些小众的标签 但是很好用)

背景描述 html <dl> <dt> <dd>是一组合标签&#xff0c;他们与ol li、ul li标签很相似 但是他却是没有默认前缀并且有缩进的标签 使用方式与table表格的标签一致 使用方式 dt和dd是放于dl标签内&#xff0c;dt与dd处于dl下相同级。就是dt不能放入dd内&am…

Mysql索引类型总结

按照数据结构维度划分&#xff1a; BTree 索引&#xff1a;MySQL 里默认和最常用的索引类型。只有叶子节点存储 value&#xff0c;非叶子节点只有指针和 key。存储引擎 MyISAM 和 InnoDB 实现 BTree 索引都是使用 BTree&#xff0c;但二者实现方式不一样&#xff08;前面已经介…

kubeadm_k8s_v1.31高可用部署教程

kubeadm_k8s_v1.31高可用部署教程 实验环境部署拓扑图**部署署架构****Load Balance****Control plane node****Worker node****资源分配&#xff08;8台虚拟机&#xff09;**集群列表 前置准备关闭swap开启ipv4转发更多设置 1、Verify the MAC address and product_uuid are u…

M3D: 基于多模态大模型的新型3D医学影像分析框架,将3D医学图像分析从“看图片“提升到“理解空间“的层次,支持检索、报告生成、问答、定位和分割等8类任务

M3D: 基于多模态大模型的新型3D医学影像分析框架&#xff0c;将3D医学图像分析从“看图片“提升到“理解空间“的层次&#xff0c;支持检索、报告生成、问答、定位和分割等8类任务 论文大纲理解1. 确认目标2. 分析过程&#xff08;目标-手段分析&#xff09;核心问题拆解 3. 实…

Word图片嵌入格式不正确的解决办法

问题描述: 如图, 粘贴到word的图片只显示底部一部分 解决方法: 第一步 先将图片嵌入文本行中 第二步 再将图片设置为正文格式 然后就出来了

深入浅出:内网黄金票据与白银票据

在域环境中&#xff0c;Kerberos认证是确保安全通信的基石&#xff0c;而黄金票据和白银票据则是攻击者常用的两种经典手段。为了帮助大家更形象地理解它们的工作原理及防御措施&#xff0c;我们不妨将其与在私人电影院购票的情景做类比。具体内容参考如下图示即可&#xff1a;…

Eclipse2024无法创建Dynamic Web project解决方法

Dynamic Web Project 是由 Eclipse Web Developer Tools 提供的&#xff0c;确保你已经安装了该插件。 在 Eclipse 中&#xff0c;点击菜单栏的 Help > Eclipse Marketplace&#xff0c;搜索 Eclipse Web Developer Tools&#xff0c;然后安装或更新它。 等待安装完成重启一…

Unity复刻胡闹厨房复盘 模块一 新输入系统订阅链与重绑定

本文仅作学习交流&#xff0c;不做任何商业用途 郑重感谢siki老师的汉化教程与代码猴的免费教程以及搬运烤肉的小伙伴 版本&#xff1a;Unity6 模板&#xff1a;3D 核心 渲染管线&#xff1a;URP ------------------------------…

Edge Scdn防御网站怎么样?

酷盾安全Edge Scdn&#xff0c;即边缘式高防御内容分发网络&#xff0c;主要是通过分布在不同地理位置的多个节点&#xff0c;使用户能够更快地访问网站内容。同时&#xff0c;Edge Scdn通过先进的技术手段&#xff0c;提高了网上内容传输的安全性&#xff0c;防止各种网络攻击…

开源数字人系统源码短视频文案提取文案改写去水印小程序

应用场景 短视频去水印&#xff1a; 个人用户&#xff1a;在社交媒体上分享短视频时&#xff0c;去除原视频中的水印&#xff0c;以保护个人隐私或避免侵权问题。企业用户&#xff1a;在广告、宣传和营销活动中&#xff0c;使用无水印的短视频以提高品牌知名度和吸引力。 文案提…