Linux - 进程间通信(2)

news2025/1/31 4:25:28

目录

2、进程池

1)理解进程池

 2)进程池的实现

整体框架:

a. 加载任务

b. 先描述,再组织

I. 先描述

II. 再组织

c. 创建信道和子进程

d. 通过channel控制子进程

e. 回收管道和子进程

问题1:

解答1:

问题2:

解答2:

f. 将进程池本身和任务文件本身进行解耦

3)完整代码

processpool.cc:

Task.hpp:

Makefile:


命令行中的 | ,就是匿名管道

它们的父进程都是bash

2、进程池

1)理解进程池

a. 可以将任务写入管道来给到子进程,从而可以提前创建子进程想让哪个子进程完成任务,我就让写入到哪个子进程相对的管道中

b. 管道里面没有数据,worker进程就在阻塞等待,等待务的到来!!

c. master向哪一个管道进行写入,就是唤醒哪一个子进程来处理任务

d. 均衡的向后端子进程划分任务,称之为负载均衡父进程要进行后端任务的负载均衡

父进程直接向管道里写入固定长度的四字节(int)数组下标(任务码)

函数指针数组中元素分别指向不同的任务,以便master控制worker完成指定工作

 2)进程池的实现
整体框架:
// ./processpool 3
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;
    }

    int num = std::stoi(argv[1]);
    LoadTask(); // 加载任务

    std::vector<Channel> channels; // 将管道组织起来
    // 1.创建信道和子进程
    CreateChannelAndSub(num, &channels);

    // 2.通过channel控制子进程
    CtrlProcess(channels, 10);

    // 3.回收管道和子进程 a.关闭所有的写端 b.回收子进程
    ClearUpChannel(channels);

    return 0;
}
a. 加载任务

我这里用的是打印的方式来模拟任务的分配,通过打印从而了解子进程执行任务的情况,通过种下随机数种子,产生随机数,进而随机的向子进程分配任务,work即为子进程需要做的工作

Task.hpp:

#pragma once

#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>

#define TaskNum 3

typedef void (*task_t)(); // task_t 函数指针

void Print()
{
    std::cout << "I am print task" << std::endl;
}
void DownLoad()
{
    std::cout << "I am a download task" << std::endl;
}
void Flush()
{
    std::cout << "I am a flush task" << std::endl;
}

task_t tasks[TaskNum];

void LoadTask()
{
    srand(time(nullptr) ^ getpid() ^ 177); // 种一个随机种子
    tasks[0] = Print;
    tasks[1] = DownLoad;
    tasks[2] = Flush;
}

void ExcuteTask(int number)
{
    if(number < 0 || number > 2) return;
    tasks[number]();
}

int SelectTask()
{
    return rand() % TaskNum;
}

void work(int rfd)
{
    while (true)
    {
        int command = 0;
        int n = read(rfd, &command, sizeof(command));
        if (n == sizeof(int))
        {
            std::cout << "pid is: " << getpid() << " handler task" << std::endl;
            ExcuteTask(command);
        }
        else if (n == 0)
        {
            std::cout << "sub process: " << getpid() << " quit" << std::endl;
            break;
        }
    }
}
// 命令行规范 --> ./processpool 3
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;
    }

    int num = std::stoi(argv[1]);
    LoadTask(); // 加载任务

    return 0;
}
b. 先描述,再组织
I. 先描述

需要控制的信道(即发送端wfd)数量多且繁琐,需要管理起来从而方便控制给子进程发送任务

class Channel
{
private:
    int _wfd;
    int _subprocessid;
    std::string _name;
};

在信道中,我们需要知道发送的文件描述符wfd,还有知道子进程的pid,以及信道的命名(用来区分信道)

II. 再组织
std::vector<Channel> channels;

我们通过用一个vector数组将所有的Channel存储起来,从而实现对它们的增删查改,以方便管理

c. 创建信道和子进程
void CreateChannelAndSub(int num, std::vector<Channel> *channels)
{
    for (int i = 0; i < num; ++i)
    {
        // 1.创建管道
        int pipefd[2] = {0};
        int n = pipe(pipefd);
        if (n < 0) exit(1); // 创建管道失败

        // 2.创建子进程
        pid_t id = fork();
        if (id == 0)
        {
            // Child
            close(pipefd[1]);

            work();

            close(pipefd[0]);
            exit(0);
        }

        // 3.构建一个名字
        std::string channel_name = "Channel-" + std::to_string(i);
        // Father
        close(pipefd[0]);
        // 拿到了 a.子进程的pid b.父进程需要的管道的w端
        channels->push_back(Channel(pipefd[1], id, channel_name));
    }
}

用命令行参数的方式传入得到的argv[1]即为输入命令需要的子进程和管道个数

通过for循环,创建 num个 pipe管道以及子进程,当创建完子进程时,需要关闭掉不需要的文件描述符(即wfd -- pipefd[1])(当然,父进程也需要关闭不需要的fd -- rfd),在执行完work(子进程的工作)之后关闭掉rfd(即工作完成了,关闭其管道),然后exit(0)退出进程等待父进程回收

d. 通过channel控制子进程
// 轮询方案 -- 负载均衡
int NextChannel(int channelnum) 
{
    static int next = 0;
    int channel = next;
    next++;
    next %= channelnum;

    return channel;
}

// 发送相应的任务码到对应管道内
void SendTaskCommand(Channel &channel, int taskCommand)
{
    write(channel.GetWfd(), &taskCommand, sizeof(taskCommand));
}
void CtrlProcessOnce(std::vector<Channel> &channels)
{
    sleep(1);
    // a. 选择一个任务
    int taskcommand = SelectTask();
    // b. 选择一个信道和进程
    int channel_index = NextChannel(channels.size());
    // c. 发送任务
    SendTaskCommand(channels[channel_index], taskcommand);
    std::cout << "=================================" << std::endl;
    std::cout << "taskcommand: " << taskcommand << " channel: " 
              << channels[channel_index].GetName() << " sub process: " 
              << channels[channel_index].GetProcessId() << std::endl;
}

void CtrlProcess(std::vector<Channel> &channels, int times = -1)
{
    if (times > 0)
    {
        while (times--)
        {
            CtrlProcessOnce(channels);
        }
    }
    else
    {
        while (true)
        {
            CtrlProcessOnce(channels);
        }
    }
}

向其发送任务之前,我们需要先选择一个任务,通过随机种子随机数的方式,随机选择我们的一个任务,拿到其任务码(即指针数组下标),然后选择相应的信道和进程(信道和进程一体的),从而向管道发送任务码给子进程

e. 回收管道和子进程
void ClearUpChannel(std::vector<Channel> &channels)
{
    for (auto &channel : channels)
    {
        channel.CloseChannel();
    }
    
    for (auto &channel : channels)
    {
        channel.Wait();
    }
}

我们先将所有的信道关闭,然后在逐个将子进程等待回收

问题1:

那为什么不能边关闭信道边回收呢??

解答1:

在我们创建子进程的过程中,由于父子进程之间的继承,从而导致子进程会拥有父进程的文件描述符内容(即指向同一地方),如果我们边关闭边回收的话,如上图所示,当我们关闭父进程的第一个管道的wfd时,这时候第一个管道的读端的引用计数并未清0,因为子进程2它继承了父进程指向第一个管道的wfd(读端),从而使得读端阻塞,进程不退出,然后wait子进程的时候就会阻塞

在work结束后,才会到下一步close和exit退出子进程;

work结束需要的条件是 n == 0,即读端返回值为0,即

因此上述那种边关闭信道,边wait子进程的方法会阻塞

问题2:

为什么这种方法又能成功回收呢??

解答2:

因为当我们将所有信道关闭时,关闭到最后一个子进程对应的管道的wfd的时候,该管道的读端的引用计数就会为0,从而读端读到0,该子进程退出,随子进程退出就会使得该子进程指向的前面管道的读端回收,就不会造成前面那种情况

f. 将进程池本身和任务文件本身进行解耦

用回调函数可以很好的改善代码的耦合性

通过文件描述符重定向 dup2将标准输入(文件描述符 0)重定向到 rfd 所代表的文件,然后再回调task()函数

// 重定向

这样做可以彻底的让我们的子进程执行对应的work时,再也不需要知道有什么管道的读端

(不用管从哪里接收信息,直接认为从标准输入拿到信息即可)

--- 将管道的逻辑和执行任务的逻辑进一步进行解耦

// task_t task : 回调函数

有了它,我们进程池本身的代码和我们任务本身两个文件就彻底解耦了

--- 即既不关心从哪个文件描述符,直接默认从0里面去读,也不关心将来谁调它,因为子进程会自动回调它

3)完整代码
processpool.cc:
#include <iostream>
#include <string>
#include <unistd.h>
#include <vector>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"

class Channel
{
public:
    Channel(int wfd, pid_t id, const std::string name)
        : _wfd(wfd), _subprocessid(id), _name(name)
    {
    }

    int GetWfd() 
    { 
        return _wfd; 
    }

    pid_t GetProcessId() 
    { 
        return _subprocessid; 
    }

    std::string GetName() 
    { 
        return _name; 
    }

    void CloseChannel() 
    { 
        close(_wfd); 
    }

    void Wait()
    {
        pid_t rid = waitpid(_subprocessid, nullptr, 0);
        if (rid > 0)
        {
            std::cout << "wait " << rid << " success" << std::endl;
        }
    }

    ~Channel()
    {
    }

private:
    int _wfd;
    int _subprocessid;
    std::string _name;
};

// 形参和命名规范
// const & : 输入型参数
// & : 输入输出型参数
// * : 输出型参数

// task_t task : 回调函数
// 有了它,我们进程池本身的代码和我们任务本身两个文件就彻底解耦了
// --- 即既不关心从哪个文件描述符,直接默认从0里面去读,也不关心将来谁调它,因为子进程会自动回调它
void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
{
    for (int i = 0; i < num; ++i)
    {
        // 1.创建管道
        int pipefd[2] = {0};
        int n = pipe(pipefd);
        if (n < 0)
            exit(1); // 创建管道失败

        // 2.创建子进程
        pid_t id = fork();
        if (id == 0)
        {
            // Child
            close(pipefd[1]);

            dup2(pipefd[0], 0);
            task();

            close(pipefd[0]);
            exit(0);
        }

        // 3.构建一个名字
        std::string channel_name = "Channel-" + std::to_string(i);
        // Father
        close(pipefd[0]);
        // 拿到了 a.子进程的pid b.父进程需要的管道的w端
        channels->push_back(Channel(pipefd[1], id, channel_name));
    }
}

int NextChannel(int channelnum) // 轮询方案 -- 负载均衡
{
    static int next = 0;
    int channel = next;
    next++;
    next %= channelnum;

    return channel;
}

void SendTaskCommand(Channel &channel, int taskCommand)
{
    write(channel.GetWfd(), &taskCommand, sizeof(taskCommand));
}

void CtrlProcessOnce(std::vector<Channel> &channels)
{
    sleep(1);
    // a. 选择一个任务
    int taskcommand = SelectTask();
    // b. 选择一个信道和进程
    int channel_index = NextChannel(channels.size());
    // c. 发送任务
    SendTaskCommand(channels[channel_index], taskcommand);
    std::cout << "=================================" << std::endl;
    std::cout << "taskcommand: " << taskcommand << " channel: " << channels[channel_index].GetName()
              << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}
void CtrlProcess(std::vector<Channel> &channels, int times = -1)
{
    if (times > 0)
    {
        while (times--)
        {
            CtrlProcessOnce(channels);
        }
    }
    else
    {
        while (true)
        {
            CtrlProcessOnce(channels);
        }
    }
}

void ClearUpChannel(std::vector<Channel> &channels)
{
    // for (auto &channel : channels)
    // {
    //     channel.CloseChannel();
    //     channel.Wait();
    // }

    // int num = channels.size() -1;
    // while(num >= 0)
    // {
    //     channels[num].CloseChannel();
    //     channels[num--].Wait();
    // }

    for (auto &channel : channels)
    {
        channel.CloseChannel();
    }
    
    for (auto &channel : channels)
    {
        channel.Wait();
    }
}

// ./processpool 3
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;
    }

    int num = std::stoi(argv[1]);
    LoadTask(); // 加载任务

    std::vector<Channel> channels; // 将管道组织起来
    // 1.创建信道和子进程
    CreateChannelAndSub(num, &channels, work);

    // 2.通过channel控制子进程
    CtrlProcess(channels, 10);

    // 3.回收管道和子进程 a.关闭所有的写端 b.回收子进程
    ClearUpChannel(channels);

    // // for test
    // for(auto &channel : channels)
    // {
    //     std::cout << "====================" << std::endl;
    //     std::cout << channel.GetName() << std::endl;
    //     std::cout << channel.GetWfd() << std::endl;
    //     std::cout << channel.GetProcessId() << std::endl;
    // }
    // sleep(100);
    return 0;
}
Task.hpp:
#pragma once

#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>

#define TaskNum 3

typedef void (*task_t)(); // task_t 函数指针

void Print()
{
    std::cout << "I am print task" << std::endl;
}
void DownLoad()
{
    std::cout << "I am a download task" << std::endl;
}
void Flush()
{
    std::cout << "I am a flush task" << std::endl;
}

task_t tasks[TaskNum];

void LoadTask()
{
    srand(time(nullptr) ^ getpid() ^ 177); // 种一个随机种子
    tasks[0] = Print;
    tasks[1] = DownLoad;
    tasks[2] = Flush;
}

void ExcuteTask(int number)
{
    if(number < 0 || number > 2) return;
    tasks[number]();
}

int SelectTask()
{
    return rand() % TaskNum;
}

// void work(int rfd)
// {
//     while (true)
//     {
//         int command = 0;
//         int n = read(rfd, &command, sizeof(command));
//         if (n == sizeof(int))
//         {
//             std::cout << "pid is: " << getpid() << " handler task" << std::endl;
//             ExcuteTask(command);
//         }
//         else if (n == 0)
//         {
//             std::cout << "sub process: " << getpid() << " quit" << std::endl;
//             break;
//         }
//     }
// }

// 这样做可以彻底的让我们的子进程执行对应的work时,再也不需要知道有什么管道的读端
// (不用管从哪里接收信息,直接认为从标准输入拿到信息即可)
// 将管道的逻辑和执行任务的逻辑进一步进行解耦
void work()
{
    while (true)
    {
        int command = 0;
        int n = read(0, &command, sizeof(command));
        if (n == sizeof(int))
        {
            std::cout << "pid is: " << getpid() << " handler task" << std::endl;
            ExcuteTask(command);
        }
        else if (n == 0)
        {
            std::cout << "sub process: " << getpid() << " quit" << std::endl;
            break;
        }
    }
}
Makefile:
processpool:processpool.cc
	g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
	rm -f processpool

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2285081.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

langchain基础(二)

一、输出解析器&#xff08;Output Parser&#xff09; 作用&#xff1a;&#xff08;1&#xff09;让模型按照指定的格式输出&#xff1b; &#xff08;2&#xff09;解析模型输出&#xff0c;提取所需的信息 1、逗号分隔列表 CommaSeparatedListOutputParser&#xff1a;…

解除阿里云盘压缩包分享限制的最新工具(2025年更新)

前言 前段时间&#xff0c;为了在阿里云盘分享一些资料&#xff0c;尝试了好多种方法&#xff1a;改文件名后缀&#xff0c;打包自解压&#xff0c;使用将压缩文件追加在图片文件后&#xff0c;还有的一些工具&#xff0c;虽然能伪装文件但并不太好用&#xff0c;最后自己写了…

2025神奇的数字—新年快乐

2025年&#xff0c;一个神奇的数字&#xff0c;承载着数学的奥秘与无限可能。它是45的平方&#xff08;45&#xff09;&#xff0c;上一个这样的年份是1936年&#xff08;44&#xff09;&#xff0c;下一个则是2116年&#xff08;46&#xff09;&#xff0c;一生仅此一次。2025…

PWM频率测量方法

测量PWM&#xff08;脉宽调制&#xff09;信号的频率是嵌入式系统中的常见需求&#xff0c;尤其是在电机控制、LED调光、传感器信号处理等场景中。 在这里介绍两种测量PWM频率的方法&#xff1a;测频法与测周法。 1、测频&#xff08;率&#xff09;法 原理&#xff1a;在闸门…

【解决方案】VMware虚拟机adb连接宿主机夜神模拟器

1、本机&#xff08;宿主机&#xff0c;系统windows10&#xff09;ip为192.168.31.108 2、运行模拟器后本机cmd查看端口为62026 3、VMware虚拟机&#xff08;系统&#xff0c;kali&#xff09;adb连接192.168.31.108:62026报错 failed to connect to 192.168.31.108:16416: Co…

DroneXtract:一款针对无人机的网络安全数字取证工具

关于DroneXtract DroneXtract是一款使用 Golang 开发的适用于DJI无人机的综合数字取证套件&#xff0c;该工具可用于分析无人机传感器值和遥测数据、可视化无人机飞行地图、审计威胁活动以及提取多种文件格式中的相关数据。 功能介绍 DroneXtract 具有四个用于无人机取证和审…

基于springboot+vue的流浪动物救助系统的设计与实现

开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;…

利用ue5制作CG动画笔记

tips&#xff1a; 按住鼠标中键可以拖动枢轴点 在曲线编辑器中按住shift可以使曲线编辑保持在x轴 专业术语&#xff1a; CGI&#xff1a;计算机生成图象&#xff08;computer-generated imagery&#xff09;真实的不算&#xff0c;计算机生成的 Compositing&#xff1a;合…

AI 图片涌入百度图库

在这个信息爆炸的时代&#xff0c;我们习惯了通过搜索引擎来获取各种想要的信息和图片。然而&#xff0c;现在打开搜索引擎看到的却是许多真假难辨的信息——AI图片&#xff0c;这部分数据正以惊人的速度涌入百度图库&#xff0c;让小编不禁想问&#xff1a;未来打开百度图库不…

《多阶段渐进式图像修复》学习笔记

paper&#xff1a;2102.02808 GitHub&#xff1a;swz30/MPRNet: [CVPR 2021] Multi-Stage Progressive Image Restoration. SOTA results for Image deblurring, deraining, and denoising. 目录 摘要 1、介绍 2、相关工作 2.1 单阶段方法 2.2 多阶段方法 2.3 注意力机…

2025.1.26机器学习笔记:C-RNN-GAN文献阅读

2025.1.26周报 文献阅读题目信息摘要Abstract创新点网络架构实验结论缺点以及后续展望 总结 文献阅读 题目信息 题目&#xff1a; C-RNN-GAN: Continuous recurrent neural networks with adversarial training会议期刊&#xff1a; NIPS作者&#xff1a; Olof Mogren发表时间…

设置jmeter界面图标字体大小

设置jmeter界面图标字体大小 方法&#xff1a;点击“选项” -> 点击放大、缩小。&#xff08;可进行全局的菜单、左侧目录结构树、元件界面显示等字体图标的放大、缩小。&#xff09;

使用 MSYS2 qemu 尝鲜Arm64架构国产Linux系统

近期&#xff0c;我的师弟咨询我关于Arm64架构的国产CPU国产OS开发工具链问题。他们公司因为接手了一个国企的单子&#xff0c;需要在这类环境下开发程序。说实在的我也没有用过这个平台&#xff0c;但是基于常识&#xff0c;推测只要基于C和Qt&#xff0c;应该问题不大。 1. …

RocketMQ实战—1.订单系统面临的技术挑战

大纲 1.一个订单系统的整体架构、业务流程及负载情况 2.订单系统面临的技术问题一&#xff1a;下订单的同时还要发券、发红包、Push推送等导致性能太差 3.订单系统面临的技术问题二&#xff1a;订单退款时经常流程失败导致无法完成退款 4.订单系统面临的技术问题三&#xf…

Linux学习笔记——用户管理

一、用户管理命令 useradd #用户增加命令 usermod #用户修改命令 passwd #密码修改命令 userdel #用户删除命令 su #用户提权命令 1、useradd命令&#xff08;加用户&#xff09;&#xff1a; 创建并设置用户信息&#xff0c;使用us…

【AI】【本地部署】OpenWebUI的升级并移植旧有用户信息

【背景】 OpenWebUI的版本升级频率很高&#xff0c;并会修改旧版本的Bug&#xff0c;不过对于已经在使用的系统&#xff0c;升级后现有用户信息都会丢失&#xff0c;于是研究如何在升级后将现有的用户信息移植到升级后版本。 【准备工作】 OpenWebUI的升级步骤在Docker中有现…

PyCharm接入DeepSeek实现AI编程

目录 效果演示 创建API key 在PyCharm中下载CodeGPT插件 配置Continue DeepSeek 是一家专注于人工智能技术研发的公司&#xff0c;致力于开发高性能、低成本的 AI 模型。DeepSeek-V3 是 DeepSeek 公司推出的最新一代 AI 模型。其前身是 DeepSeek-V2.5&#xff0c;经过持续的…

21款炫酷烟花合集

系列专栏 《Python趣味编程》《C/C趣味编程》《HTML趣味编程》《Java趣味编程》 写在前面 Python、C/C、HTML、Java等4种语言实现18款炫酷烟花的代码。 Python Python烟花① 完整代码&#xff1a;Python动漫烟花&#xff08;完整代码&#xff09; ​ Python烟花② 完整…

zyNo.15(Web题型总结1)

web 一、工具使用 1.sqlmap使用 在目录页输入cmd就可以打开程序 使用方法查看输入python sqlmap.py --help 二、web攻防知识体系 新手村 WEB CTF入门 md5绕过、变量覆盖、随机数问题 sql注入 MySQL注入介绍与联合…

将 OneLake 数据索引到 Elasticsearch - 第 1 部分

作者&#xff1a;来自 Elastic Gustavo Llermaly 学习配置 OneLake&#xff0c;使用 Python 消费数据并在 Elasticsearch 中索引文档&#xff0c;然后运行语义搜索。 OneLake 是一款工具&#xff0c;可让你连接到不同的 Microsoft 数据源&#xff0c;例如 Power BI、Data Activ…