我像只鱼儿在你的进程池~
只为你守候这进程间通信~
一点都不押韵
点踩吧
父进程是master,它提前创建出一堆紫禁城,如果有命令就交给子进程去执行(和shell不一样,shell是有任务才去创建子进程),提前创建子进程,父子间需要进行通信,父进程持有写端,紫禁城具有读端,每个紫禁城都从管道中进行读取,这批提前创建出的紫禁城叫进程池,减小了创建进程的成本,管道里面没有数据,worker进程就阻塞等待数据的到来,master向哪一个管道写入,就是唤醒哪一个紫禁城来处理问题
专业团队:
父进程肯定要雨露均沾,这个叫负载均衡
设计一下吧,进程池:
#include<iostream>
#include<string>
#include<vector>
#include<unistd.h>
#include<sys/types.h>
void work(int rfd)
{
while(true)
{
sleep(1);
}
}
//master
class Channel
{
public:
Channel(int wfd, pid_t id, const std::string &name)
:_wfd(wfd),_subprocessid(id),_name(name)
{}
int Getfd()
{
return _wfd;
}
pid_t GetProcessId()
{
return _subprocessid;
}
std::string GetName()
{
return _name;
}
~Channel()
{
}
private:
int _wfd;
pid_t _subprocessid;
std::string _name;
};
// ./processpool 5
int main(int argc,char* argv[])
{
if(argc!=2)
{
std::cerr<<"Usage:"<<argv[0]<<"processnum"<<std::endl;
return 1;
}
int num = std::stoi(argv[1]);
std::vector<Channel> channels;
for(int i = 0; i < num; i++)
{
//创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if(n < 0)
{
exit(1);
}
//创建紫禁城
pid_t id = fork();
if(id == 0)
{
//child
close(pipefd[1]);
work(pipefd[0]);
close(pipefd[0]);
exit(0);
}
//构建名字
std::string channel_name = "Channel " + std::to_string(i);
//父进程
close(pipefd[0]);
channels.push_back(Channel(pipefd[1],id,channel_name));
//close(pipefd[1]);
}
for(auto &channel : channels)
{
std::cout << " ---------------------- " <<std::endl;
std::cout << channel.GetName() << std::endl;
std::cout << channel.Getfd() << std::endl;
std::cout << channel.GetProcessId() << std::endl;
}
sleep(100);
return 0;
}
打印出来之后它输出都正常,符合预期:
但是代码不太优雅,有点挫,所以改一改加点功能
在cpp中,const &是输出型参数,&是输入输出型参数,*也表示输出型参数
master向哪一个管道写入就是唤醒哪一个紫禁城来处理任务
是固定长度的四字节的数组下标,是对应的任务码,对应的是函数指针数组的下标
.hpp是C++中的一种头文件,允许声明和定义写在头一文件
声明和定义分离可以打包成库,但是如果没有需求就不用这样
比如写一个需要子进程执行的任务
#pragma once
#include<iostream>
#define TaskNum 3
typedef void(*task_t)(); //函数指针
task_t tasks[TaskNum];
void Print()
{
std::cout<<"pineapple"<<std::endl;
}
void Download()
{
std::cout<<"Download Task"<<std::endl;
}
void Flush()
{
std::cout<<"Flush Task"<<std::endl;
}
void InitTask()
{
tasks[0]= Print;
tasks[1]=Download;
tasks[2]=Flush;
}
void ExcuteTask(int number)
{
if(number<0 || number>2)
{
return;
}
tasks[number]();
}
综上写一段完整的,依次把任务交给子进程的代码
Processpool.cc:
#include<iostream>
#include<string>
#include<vector>
#include<unistd.h>
#include<sys/types.h>
#include<sys/wait.h>
#include"Task.hpp"
void work(int rfd)
{
while(true)
{
int command = 0;
int n = read(rfd, &command,sizeof(command));
if(n == sizeof(int))
{
std::cout << "pid is:" << getpid() << "handler task" << std::endl;
ExcuteTask(command);
}
else if(n == 0)
{
std::cout<<"Pipe closed"<<std::endl;
break;
}
else
{
perror("read");
break;
}
}
}
//master
class Channel
{
public:
Channel(int wfd, pid_t id, const std::string &name)
:_wfd(wfd),_subprocessid(id),_name(name)
{}
int Getfd()const
{
return _wfd;
}
pid_t GetProcessId()const
{
return _subprocessid;
}
std::string GetName()const
{
return _name;
}
~Channel()
{
}
private:
int _wfd;
pid_t _subprocessid;
std::string _name;
};
void CreateChannelAndSub(std::vector<Channel>* channels,int num1)
{
for(int i = 0; i < num1; i++)
{
//创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if(n < 0)
{
perror("pipe");
exit(1);
}
//创建紫禁城
pid_t id = fork();
if(id < 0)
{
perror("fork");
exit(1);
}
if(id == 0)
{
//child
close(pipefd[1]);
work(pipefd[0]);
close(pipefd[0]);
exit(0);
}
//父进程
close(pipefd[0]);
//构建名字
std::string channel_name = "Channel " + std::to_string(i);
channels->push_back(Channel(pipefd[1],id,channel_name));
//close(pipefd[1]);
}
}
int NextChannel(int channelnum)
{
static int next = 0;
int channel = next;
next++;
next %= channelnum;
return channel;
}
void SendTaskCommand(const Channel &channel,int taskcommand)
{
size_t n = write(channel.Getfd(),&taskcommand,sizeof(taskcommand));
if(n != sizeof(taskcommand))
{
perror("write");
}
}
// ./processpool 5
int main(int argc,char* argv[])
{
if(argc!=2)
{
std::cerr<<"Usage:"<<argv[0]<<"processnum"<<std::endl;
return 1;
}
int num = std::stoi(argv[1]);
LoadTask();
std::vector<Channel> channels;
//创建信道和子进程
CreateChannelAndSub(&channels,num);
//通过channel来控制紫禁城
while(true)
{
sleep(1);
//选择任务
int taskcommand = Select();
//选择信道和进程
int channel_index = NextChannel(channels.size());
//发送任务
SendTaskCommand(channels[channel_index],taskcommand);
std::cout << "taskcommand:" << taskcommand << " channel:"\
<<channels[channel_index].GetName() << " sub process:"\
<< channels[channel_index].GetProcessId() << std::endl;
}
// for(auto &channel : channels)
// {
// std::cout << " ---------------------- " <<std::endl;
// std::cout << channel.GetName() << std::endl;
// std::cout << channel.Getfd() << std::endl;
// std::cout << channel.GetProcessId() << std::endl;
// }
// sleep(100);
return 0;
}
Task.hpp:
#pragma once
#include<iostream>
#include<ctime>
#include<cstdlib>
#include<sys/types.h>
#include<unistd.h>
#define TaskNum 3
typedef void(*task_t)(); //函数指针
task_t tasks[TaskNum];
void Print()
{
std::cout<<"pineapple"<<std::endl;
}
void Download()
{
std::cout<<"Download Task"<<std::endl;
}
void Flush()
{
std::cout<<"Flush Task"<<std::endl;
}
void LoadTask()
{
srand(time(nullptr) ^ getpid() ^ 114514); //没别的意思我随便写的
tasks[0]= Print;
tasks[1]=Download;
tasks[2]=Flush;
}
void ExcuteTask(int number)
{
if(number<0 || number>2)
{
return;
}
tasks[number]();
}
int Select()
{
return rand() % TaskNum;
}
之前觉得chat很好用来着
但是这次进程池很明显它没理解我的意思,把我的类加了析构,还对这几个子进程进行了等待回收(回收是没问题的,但是它回收的时机不对,石矶的活!):
但是等会它好像也行?好像对但又好像不对
#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"
// Ensure proper function declarations
void LoadTask();
int Select();
void ExcuteTask(int command);
void work(int rfd)
{
while (true)
{
int command = 0;
ssize_t n = read(rfd, &command, sizeof(command));
if (n == sizeof(int))
{
std::cout << "pid is: " << getpid() << " handling task" << std::endl;
ExcuteTask(command);
}
else if (n == 0)
{
// Pipe closed; exit the loop if there's no more data
break;
}
else
{
perror("read");
break;
}
}
}
class Channel
{
public:
Channel(int wfd, pid_t id, const std::string &name)
: _wfd(wfd), _subprocessid(id), _name(name)
{}
int Getfd() const
{
return _wfd;
}
pid_t GetProcessId() const
{
return _subprocessid;
}
std::string GetName() const
{
return _name;
}
~Channel()
{
close(_wfd); // Close file descriptor in the destructor
}
private:
int _wfd;
pid_t _subprocessid;
std::string _name;
};
void CreateChannelAndSub(std::vector<Channel>* channels, int num1)
{
for (int i = 0; i < num1; i++)
{
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
{
perror("pipe");
exit(1);
}
pid_t id = fork();
if (id < 0)
{
perror("fork");
exit(1);
}
if (id == 0)
{
close(pipefd[1]);
work(pipefd[0]);
close(pipefd[0]);
exit(0);
}
close(pipefd[0]);
std::string channel_name = "Channel " + std::to_string(i);
channels->emplace_back(pipefd[1], id, channel_name);
}
}
int NextChannel(int channelnum)
{
static int next = 0;
int channel = next;
next++;
next %= channelnum;
return channel;
}
void SendTaskCommand(const Channel &channel, int taskcommand)
{
ssize_t n = write(channel.Getfd(), &taskcommand, sizeof(taskcommand));
if (n != sizeof(taskcommand))
{
perror("write");
}
}
int main(int argc, char* argv[])
{
if (argc != 2)
{
std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;
return 1;
}
int num = std::stoi(argv[1]);
LoadTask();
std::vector<Channel> channels;
CreateChannelAndSub(&channels, num);
while (true)
{
int taskcommand = Select();
int channel_index = NextChannel(channels.size());
SendTaskCommand(channels[channel_index], taskcommand);
std::cout << "taskcommand: " << taskcommand
<< " channel: " << channels[channel_index].GetName()
<< " sub process: " << channels[channel_index].GetProcessId()
<< std::endl;
}
// Wait for all child processes to terminate
for (auto &channel : channels)
{
int status;
waitpid(channel.GetProcessId(), &status, 0);
}
return 0;
}
应该不对貌似是
不是好像是对的
是在析构之后关闭
关闭然后等待吗?蒙蒙的
子进程结束之后进行析构然后关闭对应的写文件描述符(不采用循环,一个结束关一个)
进行循环等待回收子进程
析构是关闭写的文件描述符
但是没有封装之后的代码优越
错大抵是没错的
我给他的本来也是一个半成品代码
都没通过channel对子进程实行控制
所以它写的照完备的应该还算正确,还是很好用的,只是需要自己注意下别踩坑才是
不对应该还是不对
没法正常关闭子进程好像是,等我试验下
它对的,我没看懂罢了啊啊啊啊
代码还是有点挫,需要加个函数控制下:
//通过channel来控制紫禁城
void CtrlProcess(std::vector<Channel> &channels)
{
while(true)
{
sleep(1);
//选择任务
int taskcommand = Select();
//选择信道和进程
int channel_index = NextChannel(channels.size());
//发送任务
SendTaskCommand(channels[channel_index],taskcommand);
std::cout << "taskcommand:" << taskcommand << " channel:"\
<<channels[channel_index].GetName() << " sub process:"\
<< channels[channel_index].GetProcessId() << std::endl;
}
}
曝光每一个抽象群U:
想要回收管道和子进程怎么办捏?
首先需要关闭所有的写端,然后对子进程等待回收
完事了(有办法控制,回收正常):
#include<iostream>
#include<string>
#include<vector>
#include<unistd.h>
#include<sys/types.h>
#include<sys/wait.h>
#include"Task.hpp"
void work(int rfd)
{
while(true)
{
int command = 0;
int n = read(rfd, &command,sizeof(command));
if(n == sizeof(int))
{
std::cout << "pid is:" << getpid() << "handler task" << std::endl;
ExcuteTask(command);
}
else if(n == 0)
{
std::cout<<"Pipe closed"<<std::endl;
break;
}
else
{
perror("read");
break;
}
}
}
//master
class Channel
{
public:
Channel(int wfd, pid_t id, const std::string &name)
:_wfd(wfd),_subprocessid(id),_name(name)
{}
int Getfd()const
{
return _wfd;
}
pid_t GetProcessId()const
{
return _subprocessid;
}
std::string GetName()const
{
return _name;
}
void CloseChannel()
{
close(_wfd);
}
void Wait()
{
pid_t rid = waitpid(_subprocessid,nullptr,0);
if(rid > 0)
{
std::cout << "wait " <<rid << "success" << std::endl;
}
}
~Channel()
{
}
private:
int _wfd;
pid_t _subprocessid;
std::string _name;
};
void CreateChannelAndSub(std::vector<Channel>* channels,int num1)
{
for(int i = 0; i < num1; i++)
{
//创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if(n < 0)
{
perror("pipe");
exit(1);
}
//创建紫禁城
pid_t id = fork();
if(id < 0)
{
perror("fork");
exit(1);
}
if(id == 0)
{
//child
close(pipefd[1]);
work(pipefd[0]);
close(pipefd[0]);
exit(0);
}
//父进程
close(pipefd[0]);
//构建名字
std::string channel_name = "Channel " + std::to_string(i);
channels->push_back(Channel(pipefd[1],id,channel_name));
//close(pipefd[1]);
}
}
int NextChannel(int channelnum)
{
static int next = 0;
int channel = next;
next++;
next %= channelnum;
return channel;
}
void SendTaskCommand(const Channel &channel,int taskcommand)
{
size_t n = write(channel.Getfd(),&taskcommand,sizeof(taskcommand));
if(n != sizeof(taskcommand))
{
perror("write");
}
}
void CtrlProcessOnce(std::vector<Channel> &channels)
{
sleep(1);
//选择任务
int taskcommand = Select();
//选择信道和进程
int channel_index = NextChannel(channels.size());
//发送任务
SendTaskCommand(channels[channel_index],taskcommand);
std::cout << "taskcommand:" << taskcommand << " channel:"\
<<channels[channel_index].GetName() << " sub process:"\
<< channels[channel_index].GetProcessId() << std::endl;
}
//通过channel来控制紫禁城
void CtrlProcess(std::vector<Channel> &channels,int times = -1)
{
if(times > 0)
{
while(times--)
{
CtrlProcessOnce(channels);
}
}
else
{
while(true)
{
CtrlProcessOnce(channels);
}
}
}
//回收管道和子进程
void CleanUpChannels(std::vector<Channel> &channels)
{
for(auto &channel : channels)
{
channel.CloseChannel();
}
for(auto &channel : channels)
{
channel.Wait();
}
}
// ./processpool 5
int main(int argc,char* argv[])
{
if(argc!=2)
{
std::cerr<<"Usage:"<<argv[0]<<"processnum"<<std::endl;
return 1;
}
int num = std::stoi(argv[1]);
LoadTask();
std::vector<Channel> channels;
//创建信道和子进程
CreateChannelAndSub(&channels,num);
//通过channel控制子进程
CtrlProcess(channels,10);
CleanUpChannels(channels);
// for(auto &channel : channels)
// {
// std::cout << " ---------------------- " <<std::endl;
// std::cout << channel.GetName() << std::endl;
// std::cout << channel.Getfd() << std::endl;
// std::cout << channel.GetProcessId() << std::endl;
// }
// sleep(100);
return 0;
}
可以重定向一下让每次的读从标准输入读,而不是从管道中,这样做的好处是紫禁城执行这些再也不用操心勒!(不关心是否从管道里读)
创建出的紫禁城要帮助我们执行任务,而work本身也是一项任务,task_t task被称为回调函数,回调work函数,而work函数被存放在Task.hpp中,这样可以进行解耦(将耦合度降低)
改完的大概是这样:
ProcessPool.cc:
#include<iostream>
#include<string>
#include<vector>
#include<unistd.h>
#include<sys/types.h>
#include<sys/wait.h>
#include"Task.hpp"
// void work(int rfd)
// {
// while(true)
// {
// int command = 0;
// int n = read(rfd, &command,sizeof(command));
// if(n == sizeof(int))
// {
// std::cout << "pid is:" << getpid() << "handler task" << std::endl;
// ExcuteTask(command);
// }
// else if(n == 0)
// {
// std::cout<<"Pipe closed"<<std::endl;
// break;
// }
// else
// {
// perror("read");
// break;
// }
// }
// }
//master
class Channel
{
public:
Channel(int wfd, pid_t id, const std::string &name)
:_wfd(wfd),_subprocessid(id),_name(name)
{}
int Getfd()const
{
return _wfd;
}
pid_t GetProcessId()const
{
return _subprocessid;
}
std::string GetName()const
{
return _name;
}
void CloseChannel()
{
close(_wfd);
}
void Wait()
{
pid_t rid = waitpid(_subprocessid,nullptr,0);
if(rid > 0)
{
std::cout << "wait " <<rid << "success" << std::endl;
}
}
~Channel()
{
}
private:
int _wfd;
pid_t _subprocessid;
std::string _name;
};
void CreateChannelAndSub(std::vector<Channel>* channels,int num1,task_t task)
{
for(int i = 0; i < num1; i++)
{
//创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if(n < 0)
{
perror("pipe");
exit(1);
}
//创建紫禁城
pid_t id = fork();
if(id < 0)
{
perror("fork");
exit(1);
}
if(id == 0)
{
//child
close(pipefd[1]);
dup2(pipefd[0],0);
task();
close(pipefd[0]);
exit(0);
}
//父进程
close(pipefd[0]);
//构建名字
std::string channel_name = "Channel " + std::to_string(i);
channels->push_back(Channel(pipefd[1],id,channel_name));
//close(pipefd[1]);
}
}
int NextChannel(int channelnum)
{
static int next = 0;
int channel = next;
next++;
next %= channelnum;
return channel;
}
void SendTaskCommand(const Channel &channel,int taskcommand)
{
size_t n = write(channel.Getfd(),&taskcommand,sizeof(taskcommand));
if(n != sizeof(taskcommand))
{
perror("write");
}
}
void CtrlProcessOnce(std::vector<Channel> &channels)
{
sleep(1);
//选择任务
int taskcommand = Select();
//选择信道和进程
int channel_index = NextChannel(channels.size());
//发送任务
SendTaskCommand(channels[channel_index],taskcommand);
std::cout << "taskcommand:" << taskcommand << " channel:"\
<<channels[channel_index].GetName() << " sub process:"\
<< channels[channel_index].GetProcessId() << std::endl;
}
//通过channel来控制紫禁城
void CtrlProcess(std::vector<Channel> &channels,int times = -1)
{
if(times > 0)
{
while(times--)
{
CtrlProcessOnce(channels);
}
}
else
{
while(true)
{
CtrlProcessOnce(channels);
}
}
}
//回收管道和子进程
void CleanUpChannels(std::vector<Channel> &channels)
{
for(auto &channel : channels)
{
channel.CloseChannel();
}
for(auto &channel : channels)
{
channel.Wait();
}
}
// ./processpool 5
int main(int argc,char* argv[])
{
if(argc!=2)
{
std::cerr<<"Usage:"<<argv[0]<<"processnum"<<std::endl;
return 1;
}
int num = std::stoi(argv[1]);
LoadTask();
std::vector<Channel> channels;
//创建信道和子进程
CreateChannelAndSub(&channels,num,work);
//通过channel控制子进程
CtrlProcess(channels,10);
CleanUpChannels(channels);
// for(auto &channel : channels)
// {
// std::cout << " ---------------------- " <<std::endl;
// std::cout << channel.GetName() << std::endl;
// std::cout << channel.Getfd() << std::endl;
// std::cout << channel.GetProcessId() << std::endl;
// }
// sleep(100);
return 0;
}
Task.hpp:
#pragma once
#include<iostream>
#include<ctime>
#include<cstdlib>
#include<sys/types.h>
#include<unistd.h>
#define TaskNum 3
typedef void(*task_t)(); //函数指针
task_t tasks[TaskNum];
void work()
{
while(true)
{
int command = 0;
int n = read(0, &command,sizeof(command));
if(n == sizeof(int))
{
std::cout << "pid is:" << getpid() << "handler task" << std::endl;
ExcuteTask(command);
}
else if(n == 0)
{
std::cout<<"Pipe closed"<<std::endl;
break;
}
else
{
perror("read");
break;
}
}
}
void Print()
{
std::cout<<"pineapple"<<std::endl;
}
void Download()
{
std::cout<<"Download Task"<<std::endl;
}
void Flush()
{
std::cout<<"Flush Task"<<std::endl;
}
void LoadTask()
{
srand(time(nullptr) ^ getpid() ^ 114514); //没别的意思我随便写的
tasks[0]= Print;
tasks[1]=Download;
tasks[2]=Flush;
}
void ExcuteTask(int number)
{
if(number<0 || number>2)
{
return;
}
tasks[number]();
}
int Select()
{
return rand() % TaskNum;
}
但是有个问题:回收的代码:
void CleanUpChannels(std::vector<Channel> &channels)
{
for(auto &channel : channels)
{
channel.CloseChannel();
}
for(auto &channel : channels)
{
channel.Wait();
}
}
能不能这样改:
void CleanUpChannels(std::vector<Channel> &channels)
{
for(auto &channel : channels)
{
channel.CloseChannel();
channel.Wait();
}
}
试试不就知道了
结果就是阻塞在那
也不退出
为什么捏?
因为我们的代码默认有bug
父进程有的文件描述符表是3456...
一旦创建了管道,紫禁城会继承父进程的文件描述符表
所以一个非常大的问题是,管道的写端可能不止一个文件描述符指向
但是我们关闭的时候只关闭了父进程的一个
紫禁城还有很多货接着指向呢
引用计数不为0,那文件就不会被销毁,所以接着阻塞,即读端什么都读不到
但是为什么上面那样两个循环就可以了呢?
因为那样是从上向下关闭,递归式
也就是说这样也是可以的:
void CleanUpChannel(std::vector<Channel> &channels)
{
int num = channels.size() - 1;
while(num >= 0)
{
channels[num].CloseChannel();
channels[num--].Wait();
}
}
但是做人不能这样
你明知道有bug还不修改么?
但是改bug的事下次再说罢
我忙着看案子呢拜拜