目录
一、池化技术
二、简易进程池的实现:
Makefile
task.h
task.cpp
Initchannel函数:
创建任务:
控制子进程:
子进程执行任务:
清理收尾:
三、全部代码:
前言:
对于管道,我们已经学习了匿名管道,那么这个匿名管道有什么用呢?接下来,我们以池化技术来实现一个简易的管道进程池
一、池化技术
1、内存池:减少内存分配的系统调用开销
问题:
当使用new或 malloc 动态分配内存时,每次都会触发系统调用
例如:每次申请5字节,或者申请10字节,再次申请20字节,会触发三次系统调用
系统调用成本高,因为操作系统可能正在处理其他任务,导致等待,降低效率
解决方案:
一次性向操作系统申请更大的内存空间(如100字节或200字节)
后续需要内存时,直接从这块预先申请好的空间中分配,避免频繁触发系统调用
优点:
减少系统调用次数,分摊开销,提高内存分配效率
2、进程池:减少进程创建的开销
问题:
传统方式是父进程创建子进程,子进程完成任务后退出,父进程等待
每次创建子进程时,操作系统需要复制task_struct,页表等数据结构,开销较大
频繁创建和销毁进程效率低下
解决方案:
预先创建一批子进程,作为资源储备(进程池)
当有任务到来时,父进程直接将任务分配给池中已有的子进程,而不是每次都创建新进程
优点:
减少进程创建的开销,提高任务分配和执行的效率
3. 核心思想
资源预分配
无论是内存池还是进程池,核心思想都是预先分配资源,避免重复的系统调用或资源创建
提高效率:
通过减少高频操作的开销,显著提升程序性能,尤其适用于高并发或高性能场景
总结
内存池:一次性申请大块内存,后续直接从池中分配,减少系统调用
进程池:预先创建一批进程,任务到来时直接分配,减少进程创建开销
共同目标:通过资源预分配,优化性能,降低系统开销
二、简易进程池的实现:
在了解上述的池化技术后,我们以这种思想来设计一个简易的进程池
如上,这是一个简易的进程池,我们让父进程向子进程发送数据也就是父进程向管道中写入数据,然后子进程从管道中读数据,这样父进程创建多个子进程,每次选择某个子进程和管道进行数据的传输,这样就是一个简易的进程池了
Makefile
task:tesk.cpp
g++ -o $@ $^ -g -std=c++11
.PHONY:clean
clean:
rm -f task
task.h
#pragma once
#include<iostream>
#include<vector>
#include<string>
task.cpp
先描述:
首先通过class类来描述一个管道
#include"task.h"
//先描述,描述管道
class channel
{
public:
//构造函数通过列表进行初始化
channel(int fd, int childpid, const std::string &processname)
:_fd(fd)
,_childpid(childpid)
,_processname(processname)
{}
public:
int _fd; //管道写端的文件描述符
pid_t _childpid; //管道所连接表示子进程的pid
std::string _processname; //管道所连接的子进程的名字
};
int main()
{
//加载好任务
//通过vector将一个个管道组织起来
std::vector<channel> channels;
//初始化,创建管道
Initchannel(&channels);
//控制子进程
ctrlchild(channels);
//清理收尾
quitprocess(channels);
return 0;
}
在组织:
在通过vector这个容器来将一个个管道组织起来,然后进行初始化,控制子进程,清理等操作
Initchannel函数:
创建管道,这个管道是连接父进程和子进程的,所以我们要创建几个管道就需要创建几个子进程
通过pipe创建管道文件
初始化:
void Initchannel(std::vector<channel>* channels)
{
for(int i = 0;i<processnum;i++)
{
//创建管道
int pipefd[2];
int n = pipe(pipefd);//创建管道文件
if(n != 0) exit(0);//创建失败就退出程序
//创建完管道文件然后创建管道文件对应的子进程
pid_t id = fork();
if(id == 0)
{
//子进程模块,这里子进程去读,所以关闭写通道
close(pipefd[1]);
//重定向
dup2(pipefd[0],0);
work();//子进程进行工作的函数模块
exit(0);//工作完就结束子进程
}
//父进程去写所以这里关闭读端
close(pipefd[0]);
std::string name = "process"+std::to_string(i);
channels->push_back(channel(pipefd[1],id,name));
}
}
创建任务:
//重定义函数指针
typedef void(* task)();
void task1()
{
std::cout<<"刷新野区"<<std::endl;
}
void task2()
{
std::cout<<"刷新技能"<<std::endl;
}
void task3()
{
std::cout<<"泉水回血"<<std::endl;
}
void task4()
{
std::cout<<"技能耗蓝"<<std::endl;
}
void Loadtask(std::vector<task>* tasks)
{
tasks->push_back(task1);
tasks->push_back(task2);
tasks->push_back(task3);
tasks->push_back(task4);
}
其中Loadtask函数的作用就是保存所执行的任务
接着在程序最开始(也就是在main函数的最开始),将我们的任务都进行加载好
还需要全局变量
processnum:
控制进程池的大小,决定创建多少个子进程。
方便扩展,例如通过配置文件动态设置子进程数量。
tasks:
集中管理所有任务,便于父进程和子进程共享任务列表。
通过任务码(索引)快速定位和执行任务。
const int processnum = 5;//进程中子进程的数量
std::vector<task> tasks;//存储任务的容器
int main()
{
//加载好任务
Loadtask(&tasks);
//通过vector将一个个管道组织起来
std::vector<channel> channels;
//初始化,创建管道
Initchannel(&channels);
//控制子进程
ctrlchild();
//清理收尾
quitprocess();
return 0;
}
手动控制子进程
这里首先搞一个菜单出来
void menu()
{
std::cout << "#########################################" << std::endl;
std::cout << "# 1、刷新野区 2、刷新技能 #" << std::endl;
std::cout << "# 3、泉水回血 4、技能耗蓝 #" << std::endl;
std::cout << "# 0、退出 #" << std::endl;
std::cout << "#########################################" << std::endl;
}
在控制子进程的过程中,我们首先选好我们要完成的任务,然后向管道里写一个任务码,然后被选中的子进程就会从管道中找到任务码,就可以根据vector<task>里面的任务知道需要执行哪一个任务了
控制子进程:
根据菜单,每次输入任务码的时候,就通过write系统调用来确定子进程,发送任务
void ctrlchild(std::vector<channel>& channels)
{
int which = 0;
while(1)
{
menu();
int selet = 0;
std::cout<<"请输入所选择的任务";
std::cin>>selet;
//判断所选的任务码是否合法
if(selet<=0 || selet>=5) break;
//任务选择
int taskcode = selet - 1;
//子进程选择,轮询法
std::cout<<"father say taskcode :" << taskcode <<" already send to " << channels[which]._childpid
<< "process name " << channels[which]._processname << std::endl;
//发送任务
write(channels[which]._fd,&taskcode,sizeof(taskcode));//确定子进程,子进程所接收的任务码,和所接受的大小
//保证所选的进程合法
which++;
which %= channels.size();
}
}
子进程执行任务:
这是子进程的核心代码,通过read系统调用接口从管道中读到任务码,在通过这个任务码和函数指针来调用任务
//子进程工作代码,通过父进程向管道发送的任务码找到对应的任务,然后执行
void work()
{
while(1)
{
int teskcode = 0;
int n = read(0, &teskcode, sizeof(teskcode)); //read返回的是读取到的字节的个数。
//然后第二个参数是要读到哪里去,第三个参数是读取的大小,这一行代码执行完后,teskcode里面保存的就是要执行的任务码
if (n == sizeof(teskcode))
{
std::cout << "work get a command : " << getpid() << " : " << "teskcode" << teskcode << std::endl;
if (teskcode >= 0 && teskcode < tasks.size()) tasks[teskcode]();
}
if (n == 0) break; //如果读到0, 说明写端关闭, 读端读到文件末尾, 就需要停止读取了
}
}
清理收尾:
在清理收尾的时候不能够直接关闭然后父进程等待子进程,这样是有问题的,因为当父进程创建子进程的时候,子进程的读端也会指向对应的管道,这样的话一个管道就会有很多个读端了,如下:
每一个子进程的写端都会指向之前的所有管道,比如,如果上述有3个进程的话,那么第一个管道就有3个写端(父进程一个,父进程创建的两个子进程的写端都会指向第一个管道),那么如果直接close(c._fd)关闭管道的写端的话,那么是不能够关闭完全的,需要全部关闭,这里有两种解决方式:
第一种方式:
从后往前进行关闭,下面代码已给出,我们知道最后一个管道依然是只有一个写端的,那么当最后一个管道关闭后,前面的管道的写端就都会少一个,这样的话从后往前关闭就不会出现这样的问题了
void quitprocess(const std::vector<channel> &channels)
{
//出现问题
for(const auto &c : channels)
{
close(c._fd);
waitpid(c._childpid,nullptr,0);
}
// //解决方案1
// int last = channels.size()-1;
// for(int i = last;i>=0;i--)
// {
// close(channels[i]._fd);
// waitpid(channels[i]._childpid,nullptr,0);
// }
// for(const auto &c : channels)
// {
// close(c._fd);
// }
// for(const auto &c : channels)
// {
// waitpid(c._childpid,nullptr,0);
// }
}
第二种方式:
在创建管道的时候进行记录父进程所占用的文件描述符
三、全部代码:
makefile
task:task.cpp
g++ -o $@ $^ -g -std=c++11
.PHONY:clean
clean:
rm -f task
task.h
#pragma once
#include<iostream>
#include<vector>
#include<string>
#include<unistd.h>
#include<sys/types.h>
#include<sys/wait.h>
//重定义函数指针
typedef void(* task)();
void task1()
{
std::cout<<"刷新野区"<<std::endl;
}
void task2()
{
std::cout<<"刷新技能"<<std::endl;
}
void task3()
{
std::cout<<"泉水回血"<<std::endl;
}
void task4()
{
std::cout<<"技能耗蓝"<<std::endl;
}
//保存所执行的任务
void Loadtask(std::vector<task>* tasks)
{
tasks->push_back(task1);
tasks->push_back(task2);
tasks->push_back(task3);
tasks->push_back(task4);
}
task.cpp
#include"task.h"
const int processnum = 5;
std::vector<task> tasks;
//先描述,描述管道
class channel
{
public:
//构造函数通过列表进行初始化
channel(int fd, int childpid, const std::string &processname)
:_fd(fd)
,_childpid(childpid)
,_processname(processname)
{}
public:
int _fd; //表示父进程链接某个管道的fd
pid_t _childpid; //管道所连接表示子进程的pid
std::string _processname; //管道所连接的子进程的名字
};
//子进程工作代码,通过父进程向管道发送的任务码找到对应的任务,然后执行
void work()
{
while(1)
{
int teskcode = 0;
int n = read(0, &teskcode, sizeof(teskcode)); //read返回的是读取到的字节的个数。
//然后第二个参数是要读到哪里去,第三个参数是读取的大小,这一行代码执行完后,teskcode里面保存的就是要执行的任务码
if (n == sizeof(teskcode))
{
std::cout << "work get a command : " << getpid() << " : " << "teskcode" << teskcode << std::endl;
if (teskcode >= 0 && teskcode < tasks.size()) tasks[teskcode]();
}
if (n == 0) break; //如果读到0, 说明写端关闭, 读端读到文件末尾, 就需要停止读取了
}
}
void Initchannel(std::vector<channel>* channels)
{
//解决方案2
std::vector<int> oldfd;//创建一个数组记录父进程所在的文件描述符的写端
for(int i = 0;i<processnum;i++)
{
//创建管道
int pipefd[2];
int n = pipe(pipefd);//创建管道文件
if(n != 0) exit(0);//创建失败就退出程序
//创建完管道文件然后创建管道文件对应的子进程
pid_t id = fork();
if(id == 0)
{
//将父进程所在的文件描述符写端,子进程将其关闭
for(auto fd : oldfd) close(fd);
//子进程模块,这里子进程去读,所以关闭写通道
close(pipefd[1]);
//重定向
dup2(pipefd[0],0);
work();//子进程进行工作的函数模块
std::cout << "Process PID: " << getpid() << " quit" << std::endl;
exit(0);//工作完就结束子进程
}
//父进程去写所以这里关闭读端
close(pipefd[0]);
std::string name = "process"+std::to_string(i);
channels->push_back(channel(pipefd[1],id,name));
//每次记录父进程写端所在的文件描述符
oldfd.push_back(pipefd[1]);
}
}
void menu()
{
std::cout << "#########################################" << std::endl;
std::cout << "# 1、刷新野区 2、刷新技能 #" << std::endl;
std::cout << "# 3、泉水回血 4、技能耗蓝 #" << std::endl;
std::cout << "# 0、退出 #" << std::endl;
std::cout << "#########################################" << std::endl;
}
void ctrlchild(std::vector<channel>& channels)
{
int which = 0;
while(1)
{
menu();
int selet = 0;
std::cout<<"请输入所选择的任务";
std::cin>>selet;
//判断所选的任务码是否合法
if(selet<=0 || selet>=5) break;
//任务选择
int taskcode = selet - 1;
//子进程选择,轮询法
std::cout<<"father say taskcode :" << taskcode <<" already send to " << channels[which]._childpid
<< "process name " << channels[which]._processname << std::endl;
//发送任务
write(channels[which]._fd,&taskcode,sizeof(taskcode));//确定子进程,子进程所接收的任务码,和所接受的大小
//保证所选的进程合法
which++;
which %= channels.size();
}
}
void quitprocess(const std::vector<channel> &channels)
{
//出现问题
for(const auto &c : channels)
{
close(c._fd);
waitpid(c._childpid,nullptr,0);
}
// //解决方案1
// int last = channels.size()-1;
// for(int i = last;i>=0;i--)
// {
// close(channels[i]._fd);
// waitpid(channels[i]._childpid,nullptr,0);
// }
// for(const auto &c : channels)
// {
// close(c._fd);
// }
// for(const auto &c : channels)
// {
// waitpid(c._childpid,nullptr,0);
// }
}
int main()
{
//加载好任务
Loadtask(&tasks);
//通过vector将一个个管道组织起来
std::vector<channel> channels;
//初始化,创建管道
Initchannel(&channels);
//控制子进程
ctrlchild(channels);
//清理收尾
quitprocess(channels);
return 0;
}