1.管道
2.信号量
2.1 概念
信号量 是一个计数器,用于实现进程间互斥和同步
。
信号量的取值可以是任何自然数。
最简单的信号量是只能取 0 和 1 的变量,这也是信号量最常见的一种形式,叫做二进制信号量
(Binary Semaphore)。
而可以取多个正整数的信号量被称为通用信号量。
2.2 特点
1. 信号量用于进程间同步,若要在进程间传递数据需要结合共享内存。
2. 信号量基于操作系统的 PV 操作,程序对信号量的操作都是 原子操作。
3. 每次对信号量的 PV 操作不仅限于对信号量值加 1 或减 1,而且可以加减任意正整数。
4. 支持信号量组。
2.3 原型
2.3.1 semget 创建或获取一个信号量集函数
#include <sys/sem.h>
// 创建或获取一个信号量集 若成功返回信号量集ID,失败返回-1
int semget(key_t key, int num_sems, int sem_flags);
key:
一个键值,用来标识一个全局唯一的信号量集。
要通过信号量通信的进程使用相同的 key 来创建或获取该信号量
num_sems:
指定要 创建/获取信号量集中信号量的数目。
创建:
该值必须被指定。
获取:
可以设置为 0.
sem_flags:
指定一组标志。
它低端 9 个比特是信号量的权限,其格式和含义都与系统调用的 open、model参数相同。
此外,它还可以和 IPC_CREAT 标志做按位 “或”运算以创建新的信号量集。
确保创建一组新的、唯一的信号量集:
联合使用 IPC_CREAT 和 IPC_EXCL.
此时,若创建的信号量集已存在,则 semget 返回错误 并设置 errno 为 EEXIST.
返回:
成功:
返回正整数值,是信号量级的标识符
失败:
-1,并设置 errno
扩展:
若 semget 用于创建信号量集,则与之关联的内核数据结构体 semid_ds 将被创建并初始化。
semid_ds结构体定义如下:
#include <sys/sem,h>
//此结构体用于描述 IPC 对象(信号量、共享内存和消息队列)的权限
struct ipc_perm
{
key_t key; //键值
uid_t uid; //所有者的有效用户 ID
gid_t gid; //所有者的有效组 ID
uid_t cuid; //创建者的有效用户 ID
gid_t cgid; //创建者的有效组 ID
mode_t mode; //访问权限
... //省略其他字段
}
struct semid_ds
{
struct ipc_perm sem_perm; //信号量的操作权限
unsigned long int sem_nsems; //该信号量集中的信号量数目
time_t sem_otime; //最后一次调用 semop 的时间
time_t sem_ctime; //最后一次调用 semctl 的时间
... //省略其他填充字段
}
semget 对 semid_ds 结构体初始化包括:
sem_perm.cuid 和 sem_perm.uid 设置为调用进程的有效用户 ID
sem_perm.cgid 和 sem_perm.gid 设置为调用进程的有效组 ID
sem_perm.mode 的最低 9 位设置为 sem_flags 参数的最低 9 位
sem_nsems 设置为 num_sems
sem_otime 设置为 0
sem_ctime 设置为当前系统时间
2.3.1.1 semget 特殊参数 IPC_PRIVATE
可以给 semget 函数传一个特殊的键值 IPC_PRIVATE(其值为 0),
这样无论信号量是否已存在, semget 都将创建一个新的信号量。
注意,不要因为这个名字就认为这个创建的信号量就是私有的,
其他进程,尤其是子进程,也有方法来访问这个信号量。
如下例,父、子进程间使用一个 IPC_PRIVATE 信号量来同步:
#include <sys/sem.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>
#include <time.h>
union semun
{
int val;
struct semid_ds* buf;
unsigned short int* array;
struct seminfo* __buf;
};
// op=-1 执行 P,op=1 执行 V
void pv( int sem_id, int op )
{
struct sembuf sem_b;
sem_b.sem_num = 0;
sem_b.sem_op = op;
sem_b.sem_flg = SEM_UNDO;
semop( sem_id, &sem_b, 1 );
}
int main( int argc, char* argv[] )
{
int sem_id = semget( IPC_PRIVATE, 1, 0666 ); //创建一个信号量集 ,权限设置为 0666
union semun sem_un;
sem_un.val = 1;
semctl( sem_id, 0, SETVAL, sem_un ); //设置创建的信号量集 信号量 的值 为 1
pid_t id = fork();
if( id < 0 )
{
return 1;
}
else if( id == 0 )
{
time_t cur = time( NULL );
printf( "child try to get binary sem: %s\n" ,ctime(&cur));
//在父子进程间共享 IPC_PRIVATE 信号量的关键在于二者都可以操作该信号量的标识符 sem_id
pv( sem_id, -1 ); //V
printf( "child get the sem and would release it after 5 seconds\n" );
sleep( 5 );
pv( sem_id, 1 ); //P
time_t cur1 = time( NULL );
printf( "child try to get binary sem: %s\n" ,ctime(&cur1));
exit( 0 );
}
else
{
time_t cur = time( NULL );
printf( "parent try to get binary sem:%s\n" ,ctime(&cur));
pv( sem_id, -1 ); //V
printf( "parent get the sem and would release it after 5 seconds\n" );
sleep( 5 );
time_t cur1 = time( NULL );
printf( "child try to get binary sem: %s\n" ,ctime(&cur1));
pv( sem_id, 1 ); //P
}
waitpid( id, NULL, 0 );
semctl( sem_id, 0, IPC_RMID, sem_un ); //立即移除信号量集
return 0;
}
2.3.2 semop 对信号量集操作函数
与信号量关联的一些重要内核变量:
unsigned short semval; //信号量的值
unsigned short semzcnt; //等待信号量值变为 0 的进程数量
unsigned short semncnt; //等待信号量值增加的进程数量
pid_t sempid; //最后一次执行 semop 操作的进程 ID
// 对信号量集进行操作(就是改变上面的内核变量),改变信号量的值,即执行 P、V 操作,
#include <sys/sem.h>
int semop(int sem_id, struct sembuf * sem_ops, size_t num_sem_ops);
sem_id:
由 semget 调用返回的信号量集标识符,用以指定被操作的目标信号量集。
sem_ops:
指向一个 sembuf 结构体类型的数组,
struct sembug
{
unsigned short int sem_num;
short int sem_op;
short int sem_flg;
}
sem_num:
信号量集中信号量的编号,
0表示信号量中第一个信号量。
sem_op:
指定操作类型, 可选值为 正整数、0 和 负整数。
每种类型的操作行为受 参数 sem_flg 影响。
sem_flg:
可选值: IPC_NOWAIT SEM_UNDO.
含义:
IPC_NOWAIT :
无论信号量操作是否成功,semop 调用都将立即返回,类似非阻塞 I/O。
SEM_UNDO:
当进程退出时取消正在进行的 semop 操作。
sembuf 中参数 sem_op 和 sem_flg 按如下方式影响 函数 semop 的行为:
1. sem_op > 0:
将内核信号量的值在增加 sem_op,此操作要求调用进程对被操作信号量集拥有写权限。
若此时设置了 SEM_UNDO 标志,
则系统将更新进程的 semadj 变量(用以跟踪进程对信号量的修改情况)
2. sem_op = 0:
表示这是一个“等待0(wait-for-zero)” 操作,此操作要求调用进程对被操作信号量集有 读 权限。
1.若此时 信号量值是 0:
则调用立即成功返回。
2.若此时 信号量值不是 0,
则 semop 函数 失败返回 或阻塞进程以等待信号量变为 0.
在这种情况下:
若 sem_flg 设置了 IPC_NOWAIT , semop 函数立即返回一个错误,并设置 errno 为 EAGAIN.
若未设置 IPC_NOWAIT ,则 内核信号量值 +1,进程被投入睡眠直到下列 3 个条件之一 发生:
1.信号量的值 semval 变为 0 ,此时系统将该信号量的 semzcnt 值 -1
2.被操作信号量所在信号量集被进程移除, 此时 semop 调用失败返回,errno 设置为 EIDRM
3.调用被信号中断,此时 semop 调用失败返回, errno 被设置为 EINTR,
同时系统将 内核信号量 semzcnt 值 -1
3. sem_op < 0
表示对信号量值进行减操作。此操作要求调用进程对被操作信号量集拥有写权限。
1.若 内核信号量的值 semval 大于或等于 sem_op 的绝对值,则 将 semval - |sem_op| 操作成功。
此时若设置了 SEM_UNDO 标志,则系统将更新进程的 semadj 变量。
2.若 内核信号量的值 semval 小于 sem_op 的绝对值,则 semop 失败返回或阻塞进程以等待信号量可用。
在这种情况下,
若 IPC_NOWAIT 标志被指定时, sempop 立即返回一个错误,并设置 errno 为 EAGAIN.
若 IPC_NOWAIT 标志未指定 , 则 内核信号量 semncnt 值 +1,进程被投入睡眠直到下列 3 个条件之一 发生:
1. 信号量 semval 的值变得 >= sem_op的绝对值,此时系统将该信号量的 semncnt 值 -1,并将 semval 减去 sem_op 的绝对值。
同时,如果 SEM_UNDO 标志被设置,则系统更新 semadj 变量。
2.被操作信号量所在的信号量集被移除,此时 semop 调用失败返回, errno 被设置为 EIDRM
3.调用被信号中断,此时 semop 调用失败返回,errno 被设置为 EINTR,同时系统将该信号量的 semncnt 值 -1.
num_sem_ops:
指定要执行的操作个数,即 sem_ops 数组中元素个数。
返回:
成功:
0
失败:
-1及 errno。
失败时,sem_ops 数组中指定所有操作都不执行。
扩展:
semop 函数对数组 sem_op 中每个成员按照数组顺序依次执行,且该过程是
原子操作(不可中断的一个或者一系列操作, 也就是不会被线程调度机制打断的操作, 运行期间不会有任何的上下文切换(context switch))。
2.3.3 semctl 对信号量集操作函数
#include <sys/sem.h>
// 控制信号量集的相关信息
int semctl(int semid, int sem_num, int command, ...);
semid:
由 semget 调用返回的信号量集标识符,以指定被操作的信号量集。
sem_num:
指定被操作的信号量在信号量集中的编号。
command:
指定要执行的命令,详见下表 13-2
有的命令需要第 4 个参数
...:
这个参数配合 command 使用,由用户自己定义,但 sys/sem.h 头文件给出推荐格式如下:
union semun
{
int val; //用于 SETVAL 命令
struct semid_ds * buf; //用于 IPC_STAT 和 IPC_SET 命令
unsigned short * array; //用于 GETALL 和 SETALL 命令
struct seminfo * __buf; //用于 IPC_INFO 命令
};
struct seminfo
{
int semmap; //Linux 内核没有使用
int semmni; //系统最多可以拥有的信号量集数目
int semmns; //系统最多可以拥有的信号量数目
int semmnu; //Linux 内核没有使用
int semmsl; //一个信号量集 最多允许包含的信号量数目
int semopm; //semop 一次最多能执行的 sem_op 操作数目
int semume; //Linux 内核没有使用
int semusz; //sem_undo 结构体的大小
int semvmx; //最大允许的信号量值
//最多允许的 UNDO 次数(带 SEM_UNDO 标志的 semop 操作的次数)
int semaem;
}
返回:
成功;
返回值取决于 command 参数。
失败:
-1 及 errno。
2.共享内存
2.1 概念
共享内存 是最高效的 IPC 机制,因为 不涉及进程之间任何数据传输。
但需要辅助手段来同步进程对共享内存的访问,否则会产生竟态条件。
Linux 共享内存包含 4 个系统调用:
shmget、shmat、shmdt、shmctl。
2.2 shmget 系统调用
//创建一段新的共享内存,或者获取一段已经存在的共享内存
#include <sys/shm.h>
int shmget(key_t key, size_t size, int shmflg);
key:
key 是一个键值,用来标识一段全局唯一的共享内存。
size:
指定共享内存大小。
若是创建,则 size 必须被指定。
若是获取,则 size 可被设为 0.
shmflg:
与信号量的 semget 系统调用的 sem_flags参数相同,但 这个再多两个标志:
1. SHM_HUGETLB
系统将使用 “大页面” 来为共享内存分配空间
2. SHM_NORESERVE
不为共享内存保留交换分区(swap空间)。
这样,当物理内存不足时,对该共享内存执行写操作将触发 SIGSEGV 信号。
返回:
成功:
正整数值,共享内存标识符。
失败:
-1,errno
扩展:
shmget 创建共享内存时,所有字节都被初始化为 0,
与之关联的内核数据结构 shmid_ds 将被创建并初始化:
struct shmid_ds
{
struct ipc_perm shm_perm; //共享内存的操作权限
size_t shm_segsz; //共享内存大小,单位是字节
__time_t shm_atime; //对这段内存最后一次调用 shmat 的时间
__time_t shm_dtime; //对这段内存最后一次调用 shmdt 的时间
__time_t shm_ctime; //对这段内存最后一次调用 shmct 的时间
__pid_t shm_cpid; //创建者 pid
__pid_t shm_lpid; //最后一次执行 shmat 或 shmdt 操作的 进程 pid
shmatt_t shm_nattach; //目前关联到此共享内存的进程数量
... //省略一些字段
}
#include <sys/sem,h>
//此结构体用于描述 IPC 对象(信号量、共享内存和消息队列)的权限
struct ipc_perm
{
key_t key; //键值
uid_t uid; //所有者的有效用户 ID
gid_t gid; //所有者的有效组 ID
uid_t cuid; //创建者的有效用户 ID
gid_t cgid; //创建者的有效组 ID
mode_t mode; //访问权限
... //省略其他字段
}
shmget 对 shmid_ds 初始化包括:
shm_perm.cuid 和 shm_perm.uid 设置为调用进程的有效用户 ID
shm_perm.cgid 和 shm_perm.gid 设置为调用进程的有效组 ID
shm_perm.mode 的最低 9 位设置为 sem_flags 参数的最低 9 位
shm_segzs 设置为 size
shm_lpid、shm_nattach、shm_atime、shm_dtime 设置为 0
sem_ctime 设置为当前系统时间
2.3 shmat 和 shmdt 系统调用
共享内存创建完后,
1.不能立即访问,而是首先将它关联到进程地址空间(shmat 实现)
2.使用完后,必须将它从进程地址空间中分离(shmdt 实现)
#include <sys/shm.h>
void shmat(int shm_id, const void * shm_addr, int shmflg);
shm_id:
创建的共享内存标识。
shm_addr:
指定将共享内存关联到进程的哪块地址空间。
最终效果受 参数 shmflg 的可选标志 SHM_RND 的影响。
可能的情况:
1. 若 shm_addr 为 NULL,
则被关联的地址由操作系统选择。推荐这样做。
2. 若 shm_addr 非空 且 SHM_RND 未设置,
则共享内存被关联到 addr 指定地址处。
3. 若 shm_addr 非空 且 设置了 SHM_RND ,
则被关联的地址是 [shm_addr -(shm_addr % SHMLBA)]。
SHMLBA 含义是“段低端边界地址倍数”(Segment Low Boundary Address Multiple),
它必须是内存页面大小(PAGE_SIZE)的整数倍。
SHM_RND 的含义是圆整(round),即将共享内存被关联的地址向下圆整到离 shm_addr 最近的 SHMLBA 的整数倍地址处。
shmflg:
可选标志。如下:
SHM_RND :
SHM_RDONLY:
进程仅能读取共享内存的内容。
若没有指定此标志,则进程可以对共享内存进行读写操作(这取需要创建共享内存时指定其读写权限)。
SHM_REMAP:
如果地址 shmaddr 已经被关联到一段共享内存上,则重新关联。
SHM_EXEC:
指定对共享内存执行权限。
返回:
成功:
返回共享内存被关联到的地址。
同时将修改内核数据结构 shmid_ds 部分字段,如下:
shm_nattach 加 1.
shm_lpid 设置为调用进程的 PID。
shm_atime 设置为当前的时间。
失败:
(void*)-1,errno
//将 shm_addr 处的共享内存从进程分离。
int shmdt (const void * shm_addr);
返回:
成功:
0。
成功将修改内核 shmid_ds 部分字段:
shm_attach 减 1。
shm_lpid 设置为 调用进程ID。
shm_dtime 设置为当前时间。
失败:
-1,errno
2.4 shmctl 系统调用
//控制共享内存某些属性
#include <sys/shm.h>
int shmctl(int shm_id, int command ,struct shmid_ds * buf);
shm_id:
共享内存标识符。
command:
指定要执行的命令。
所有支持命令如下表 13-3。
bufL:
配合 command 使用,详见表 13-3.
返回:
成功:
返回值取决于 command 命令,详见 表 13-3.
失败:
-1,errno。
2.5 POSIX 的共享内存方法
注意:
使用如下 POSIX 共享内存函数,编译时需要指定链接选项 -lrt
//创建或打开一个 POSIX 共享内存对象
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
int shm_open(const char * name, int oflag, mode_t mode);
shm_open 的使用方法与 open 系统调用完全相同。
name:
指定要创建/打开的共享内存对象。
从可移植性考虑:
此参数应使用 “/somename”的格式:
以 “/” 开始,后接多个字符,且这些字符都不是 “/”;
以 “\0” 结尾,长度不超过 NAME_MAX (通常是 255)。
oflag:
指定创建方式。可以是下列标志中 一个 或 多个的 按位或。
O_RDONLY
只读方式打开共享内存对象。
O_RDWR
以可读、可写方式打开共享内存对象
O_CREAT
若共享内存不存在,则创建之。
此时 mode 参数低 9 位指定该共享内存对象的访问权限。
共享内存被创建时,初始长度为 0.
O_EXCL
和 O_CREAT 一起使用。
若由 name 指定的共享内存对象已存在,则 shm_open 返回错误,否则就创建一个新的共享内存对象。
O_TRUNC
若共享内存已存在,则把它截断,使其长度为 0。
//删除创建的共享内存
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
int shm_unlink(const char * name);
name:
将该参数指定的共享内存对象标记为等待删除。
当使用该共享内存对象的进程都使用 ummap 将它从进程分离后,系统将销毁这个共享内存对象所占据的资源。
2.6 共享内存实例
/*注意点:
1.每个子进程只会往自己所处理连接所对应的那一部分读缓存中写入数据,所以使用共享内存的目的是“共享读”。
因此,每个子进程在使用共享内存时都无需加锁,这样符合“聊天室服务器”的应用场景,同时提高性能。
2.程序启动时给 users 分配了足够的空间,使他可以存储所有可能的客户连接的相关数据。
同时,sub_process 也分配了足够的空间。
这是牺牲空间换时间的例子。
*/
//一个子进程处理一个客户连接
//所有客户 socket 连接的读缓冲设计为一块共享内存
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#define USER_LIMIT 5
#define BUFFER_SIZE 1024
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define PROCESS_LIMIT 65536
//处理客户连接必要的数据
struct client_data
{
sockaddr_in address;
int connfd;
pid_t pid; //处理这个连接的子进程 ID
int pipefd[2]; //和父进程通信用的管道
};
static const char* shm_name = "/my_shm";
int sig_pipefd[2];
int epollfd;
int listenfd;
int shmfd;
char* share_mem = 0;
//客户连接数组
client_data* users = 0;
//子进程和客户连接的映射关系表,用进程PID来索引这个数组即可获得该进程所处理的客户连接的编号
int* sub_process = 0;
//当前客户数量
int user_count = 0;
bool stop_child = false;
int setnonblocking( int fd )
{
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}
void addfd( int epollfd, int fd )
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
setnonblocking( fd );
}
void sig_handler( int sig )
{
int save_errno = errno;
int msg = sig;
send( sig_pipefd[1], ( char* )&msg, 1, 0 );
errno = save_errno;
}
void addsig( int sig, void(*handler)(int), bool restart = true )
{
struct sigaction sa;
memset( &sa, '\0', sizeof( sa ) );
sa.sa_handler = handler;
if( restart )
{
sa.sa_flags |= SA_RESTART;
}
sigfillset( &sa.sa_mask );
assert( sigaction( sig, &sa, NULL ) != -1 );
}
void del_resource()
{
close( sig_pipefd[0] );
close( sig_pipefd[1] );
close( listenfd );
close( epollfd );
shm_unlink( shm_name );
delete [] users;
delete [] sub_process;
}
//停止一个子进程
void child_term_handler( int sig )
{
stop_child = true;
}
//子进程运行函数,
//idx:子进程处理客户连接的编号
//users:保存所有客户连接数据的数组
//share_mem:住处共享内存地址
int run_child( int idx, client_data* users, char* share_mem )
{
//子进程用 epoll 监听客户连接 和 与父进程通信的管道文件描述符
epoll_event events[ MAX_EVENT_NUMBER ];
int child_epollfd = epoll_create( 5 );
assert( child_epollfd != -1 );
int connfd = users[idx].connfd;
addfd( child_epollfd, connfd );
int pipefd = users[idx].pipefd[1];
addfd( child_epollfd, pipefd );
int ret;
//子进程设置自己的信号处理函数
addsig( SIGTERM, child_term_handler, false );
while( !stop_child )
{
int number = epoll_wait( child_epollfd, events, MAX_EVENT_NUMBER, -1 );
if ( ( number < 0 ) && ( errno != EINTR ) )
{
printf( "epoll failure\n" );
break;
}
for ( int i = 0; i < number; i++ )
{
int sockfd = events[i].data.fd;
//客户连接有数据到达
if( ( sockfd == connfd ) && ( events[i].events & EPOLLIN ) )
{
memset( share_mem + idx*BUFFER_SIZE, '\0', BUFFER_SIZE );
//读取到对应的读缓存中,读缓存是共享内存的一段,它开始于 idx*BUFFER_SIZE 处,长度为 BUFFER_SIZE 字节。故每个客户连接的读缓存是共享的。
ret = recv( connfd, share_mem + idx*BUFFER_SIZE, BUFFER_SIZE-1, 0 );
if( ret < 0 )
{
if( errno != EAGAIN )
{
stop_child = true;
}
}
else if( ret == 0 )
{
stop_child = true;
}
else
{
//成功读取客户数据后通知主进程(通过管道)来处理
send( pipefd, ( char* )&idx, sizeof( idx ), 0 );
}
}
//主进程通知本进程(通过管道)将第client个客户额数据发送到本进程负责的客户端
else if( ( sockfd == pipefd ) && ( events[i].events & EPOLLIN ) )
{
int client = 0;
//接收主进程发送来的数据,即有客户数据到达的连接编号
ret = recv( sockfd, ( char* )&client, sizeof( client ), 0 );
if( ret < 0 )
{
if( errno != EAGAIN )
{
stop_child = true;
}
}
else if( ret == 0 )
{
stop_child = true;
}
else
{
send( connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0 );
}
}
else
{
continue;
}
}
}
close( connfd );
close( pipefd );
close( child_epollfd );
return 0;
}
int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );
int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );
listenfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( listenfd >= 0 );
ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
assert( ret != -1 );
ret = listen( listenfd, 5 );
assert( ret != -1 );
user_count = 0;
users = new client_data [ USER_LIMIT+1 ];
sub_process = new int [ PROCESS_LIMIT ];
for( int i = 0; i < PROCESS_LIMIT; ++i )
{
sub_process[i] = -1;
}
epoll_event events[ MAX_EVENT_NUMBER ];
epollfd = epoll_create( 5 );
assert( epollfd != -1 );
addfd( epollfd, listenfd );
ret = socketpair( PF_UNIX, SOCK_STREAM, 0, sig_pipefd );
assert( ret != -1 );
setnonblocking( sig_pipefd[1] );
addfd( epollfd, sig_pipefd[0] );
addsig( SIGCHLD, sig_handler );
addsig( SIGTERM, sig_handler );
addsig( SIGINT, sig_handler );
addsig( SIGPIPE, SIG_IGN );
bool stop_server = false;
bool terminate = false;
//创建共享内存,作为所有客户 socket 连接的读缓存
shmfd = shm_open( shm_name, O_CREAT | O_RDWR, 0666 );
assert( shmfd != -1 );
ret = ftruncate( shmfd, USER_LIMIT * BUFFER_SIZE );
assert( ret != -1 );
share_mem = (char*)mmap( NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0 );
assert( share_mem != MAP_FAILED );
close( shmfd );
while( !stop_server )
{
int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
if ( ( number < 0 ) && ( errno != EINTR ) )
{
printf( "epoll failure\n" );
break;
}
for ( int i = 0; i < number; i++ )
{
int sockfd = events[i].data.fd;
if( sockfd == listenfd ) //新连接到来
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
if ( connfd < 0 )
{
printf( "errno is: %d\n", errno );
continue;
}
if( user_count >= USER_LIMIT )
{
const char* info = "too many users\n";
printf( "%s", info );
send( connfd, info, strlen( info ), 0 );
close( connfd );
continue;
}
//保存新连接相关数据
users[user_count].address = client_address;
users[user_count].connfd = connfd;
//创建与子进程管道
ret = socketpair( PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd );
assert( ret != -1 );
pid_t pid = fork();
if( pid < 0 )
{
close( connfd );
continue;
}
else if( pid == 0 )
{
close( epollfd );
close( listenfd );
close( users[user_count].pipefd[0] );
close( sig_pipefd[0] );
close( sig_pipefd[1] );
run_child( user_count, users, share_mem );
munmap( (void*)share_mem, USER_LIMIT * BUFFER_SIZE );
exit( 0 );
}
else
{
close( connfd );
close( users[user_count].pipefd[1] );
addfd( epollfd, users[user_count].pipefd[0] );
users[user_count].pid = pid;
//记录新客户连接在数组 users中的索引值,建立进程pid 和该索引值间的映射
sub_process[pid] = user_count;
user_count++;
}
}
//处理信号
else if( ( sockfd == sig_pipefd[0] ) && ( events[i].events & EPOLLIN ) )
{
int sig;
char signals[1024];
ret = recv( sig_pipefd[0], signals, sizeof( signals ), 0 );
if( ret == -1 )
{
continue;
}
else if( ret == 0 )
{
continue;
}
else
{
for( int i = 0; i < ret; ++i )
{
switch( signals[i] )
{
case SIGCHLD: //子进程退出,表示某个客户端关闭了连接
{
pid_t pid;
int stat;
while ( ( pid = waitpid( -1, &stat, WNOHANG ) ) > 0 )
{
//子进程pid取得被关闭连接编号
int del_user = sub_process[pid];
sub_process[pid] = -1;
if( ( del_user < 0 ) || ( del_user > USER_LIMIT ) )
{
printf( "the deleted user was not change\n" );
continue;
}
//清除关闭连接的相关数据
epoll_ctl( epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0 );
close( users[del_user].pipefd[0] );
users[del_user] = users[--user_count];
sub_process[users[del_user].pid] = del_user;
printf( "child %d exit, now we have %d users\n", del_user, user_count );
}
if( terminate && user_count == 0 )
{
stop_server = true;
}
break;
}
case SIGTERM:
case SIGINT:
{
//结束服务器程序
printf( "kill all the clild now\n" );
//addsig( SIGTERM, SIG_IGN );
//addsig( SIGINT, SIG_IGN );
if( user_count == 0 )
{
stop_server = true;
break;
}
for( int i = 0; i < user_count; ++i )
{
int pid = users[i].pid;
kill( pid, SIGTERM );
}
terminate = true;
break;
}
default:
{
break;
}
}
}
}
}
else if( events[i].events & EPOLLIN ) //某个子进程向父进程写了数据
{
int child = 0;
ret = recv( sockfd, ( char* )&child, sizeof( child ), 0 ); //读取管道数据,child 变量记录哪个客户连接有数据到达
printf( "read data from child accross pipe\n" );
if( ret == -1 )
{
continue;
}
else if( ret == 0 )
{
continue;
}
else
{
//向除负责处理第 chold 个客户连接的子进程外的 其他子进程发送消息,通知它们有客户要写数据
for( int j = 0; j < user_count; ++j )
{
if( users[j].pipefd[0] != sockfd )
{
printf( "send data to child accross pipe\n" );
send( users[j].pipefd[0], ( char* )&child, sizeof( child ), 0 );
}
}
}
}
}
}
del_resource();
return 0;
}
3. 消息队列
消息队列在两个进程间传递 二进制块数据
的方式。
每个数据块都有一个类型,接收方可以根据类型来有选择的接收数据
。
而不像管道 和 命名管道那样必须先进先出的接收数据。
Linux消息队列 API 定义在 sys/msg.h 头文件中,
包括 4 个系统调用:
msgget、msgsnd、msgrcv、msgctl。
3.1 msgget 系统调用
//创建一个消息队列或者获取一个消息队列
#include <sys/msg.h>
int msgget (key_t key, int msgflg);
key:
表用来标识一个全局唯一的消息队列。
msgflg:
使用和含义与 semget 的参数 sem_flags参数相同。
指定一组标志。
它低端 9 个比特是信号量的权限,其格式和含义都与系统调用的 open、model参数相同。
此外,它还可以和 IPC_CREAT 标志做按位 “或”运算以创建新的信号量集。
确保创建一组新的、唯一的信号量集:
联合使用 IPC_CREAT 和 IPC_EXCL.
此时,若创建的信号量集已存在,则 semget 返回错误 并设置 errno 为 EEXIST.
返回:
成功:
返回正整数值,是消息队列的标识符
失败:
-1,并设置 errno
扩展:
若msgget用于创建消息队列,则与之关联的内核数据结构 msqid_ds 将被创建并初始化:
struct msqid_ds
{
struct ipc_perm msg_perm; //消息队列的操作权限
time_t msg_stime; //最后一次调用 msgsnd 的时间
time_t msg_rtime; //最后一次调用 msgrcv 的时间
time_t msg_ctime; //最后一次被修改的时间
unsigned long __msg_cbytes; //消息队列中已有的字节数
msqqnum_t msg_qnum; //消息队列中已有的消息数
msglen_t msg_qbytes; //消息队列允许最大字节数
pid_t msg_lspid; //最后执行 msgsnd 的进程 PID
pid_t msg_lrpid; //最后执行 msgrcv 的进程 PID
}
3.2 msgsnd 系统调用
//把一条消息添加到消息队列
#include <sys/msg.h>
int msgsnd(int msqid, const void * msg_ptr, size_t msg_sz, int msgflg);
msqid:
消息队列标识符。即 msgget 调用返回值。
msg_ptr:
指向一个准备发送的消息,必须被定义为如下类型:
struct msgbuf
{
long mtype; //消息类型,必须是一个正整数。
char mtext[512]; //消息数据,
}
msg_sz:
消息的数据部分(mtext)的长度。
可以为 0,表示没有消息数据。
msgflg:
控制 msgsnd 的行为。
支持 IPC_NOWAIT 标志,即以非阻塞方式发送消息。此时 msgsnd 将立即返回并设置 errno 为 EAGAIN.
默认情况下,发送消息时若消息队列满了,则 msgsnd 将阻塞。
处于阻塞状态的 msgsnd 调用可能被如下两种情况所中断:
1.消息队列被移除。此时 msgsnd 将立即返回并设置 errno 为 EIDRM。
2.程序接收到信号。此时 msgsnd 将立即返回并设置 errno 为 EINTR。
返回:
成功:
0.
成功将修改内核数据 msqid_ds 的部分字段,如下:
msg_qnum 加1
msg_lspid 设置为调用进程的 PID
msg_stime 设置为当前时间
失败:
-1,errno
3.3 msgrcv 系统调用
//从消息队列中获取消息
#include <sys/msg.h>
int msgrcv(int msqid, void * msg_ptr,size_t msg_sz, long int msgtype, int msgflg);
msqid:
消息队列标识符
msg_ptr:
用于存储接收的消息
msg_sz:
消息数据部分长度。
msgtype:
指定消息类型。
msgtype = 0: 读取消息队列中第一个消息。
msgtype > 0: 读取消息队列中第一个类型为 msgtype 的消息(除非指定了标志 MSG_EXCEPT)
msgtype < 0: 读取消息队列中第一个类型值比 msgtype 的绝对值小的消息。
msgflg:
控制 msgrcv 的行为,可以是如下标志的按位或:
IPC_NOWAIT: 如果消息队列中没有消息,则 msgrcv 立即返回并设置 errno 为 ENOMSG.
MSG_EXCEPT: 若 msgtype > 0 ,则接收消息队列中第一个非 msgtype 类型的消息。
MSG_NOERROR:如果消息数据部分长度超过 msg_sz ,就将它截断。
返回:
成功:
0.
成功将修改 内核数据结构 msqid_ds 中部分字段:
msg_qnum 减 1.
msg_lrqid 设置为调用进程 pid。
msg_rtime 设置为当前时间。
失败 :
-1,errno
扩展:
处于阻塞状态的 msgrcv 可能被如下两种异常中断:
1.消息队列被移除。 此时 msgrcv 将立即返回,并设置 errno 为 EIDRM.
2.程序接收到信号。 此时 msgrcv 将立即返回,并设置 errno 为 EINTR.
3.4 msgctl 系统调用
//控制消息队列某些属性
#include <sys/msg.h>
int msgctl(int msqid, int command, struct msqid_ds * buf);
msqid:
共享内存标识符。
command:
指定要执行的命令。详见下表 13-4。
返回:
成功:
取决于 command 参数,详见下表 13-4。
失败:
-1,errno。
4. IPC 命令
5.进程间传递文件描述符
fork 后,父进程中打开的文件描述符在子进程中仍然保持打开
。
传递文件描述符是要在接收进程中创建一个新的文件描述符
,并且该文件描述符和发送进程中被传递的文件描述符指向内核中相同文件表项
。
示例:
#include <sys/socket.h>
#include <fcntl.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
static const int CONTROL_LEN = CMSG_LEN( sizeof(int) );
/// @brief 发送文件描述符
/// @param fd 传递信息的 unix 域 socket
/// @param fd_to_send 待发送额文件描述符
void send_fd( int fd, int fd_to_send )
{
struct iovec iov[1];
struct msghdr msg;
char buf[0];
iov[0].iov_base = buf;
iov[0].iov_len = 1;
msg.msg_name = NULL; //消息的协议地址 协议地址和套接口信息
//在非连接的UDP中,发送者要指定对方地址端口,接受方用于的到数据来源,如果不需要的话可以设置为NULL
//(在TCP或者连接的UDP中,一般设置为NULL)
msg.msg_namelen = 0; ///* 地址的长度 */
msg.msg_iov = iov; /* 多io缓冲区的地址 */
msg.msg_iovlen = 1; /* 缓冲区的个数 */
cmsghdr cm;
cm.cmsg_len = CONTROL_LEN; /* 包含该头部的数据长度 */
cm.cmsg_level = SOL_SOCKET; /* 具体的协议标识 */
cm.cmsg_type = SCM_RIGHTS; /* 协议中的类型 */
*(int *)CMSG_DATA( &cm ) = fd_to_send;
msg.msg_control = &cm; //设置辅助数据
msg.msg_controllen = CONTROL_LEN; /* 辅助数据的长度 */
sendmsg( fd, &msg, 0 );//只用于套接口,不能用于普通的I/O读写,参数sockfd则是指明要读写的套接口
}
/// @brief 接收文件描述符
/// @param fd
/// @return
int recv_fd( int fd )
{
struct iovec iov[1];
struct msghdr msg;
char buf[0];
iov[0].iov_base = buf; //指定用户空间缓存区地址
iov[0].iov_len = 1; //指定 缓冲区长度为 1
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
cmsghdr cm;
msg.msg_control = &cm;
msg.msg_controllen = CONTROL_LEN;
recvmsg( fd, &msg, 0 );
int fd_to_read = *(int *)CMSG_DATA( &cm );
return fd_to_read;
}
int main()
{
int pipefd[2];
int fd_to_pass = 0;
int ret = socketpair( PF_UNIX, SOCK_DGRAM, 0, pipefd ); //创建进程通信文件描述符
assert( ret != -1 );
pid_t pid = fork();
assert( pid >= 0 );
if ( pid == 0 )
{
close( pipefd[0] );//子进程关闭一端
fd_to_pass = open( "test.txt", O_RDWR, 0666 ); //子进程打开文件描述符
send_fd( pipefd[1], ( fd_to_pass > 0 ) ? fd_to_pass : 0 );//子进程向本地socket一端写数据,并携带待传递的文件描述符
close( fd_to_pass );//子进程关闭打开的文件描述符
exit( 0 );
}
close( pipefd[1] );//主进程关闭写端(因为子进程往这个方向写)
fd_to_pass = recv_fd( pipefd[0] ); //主进程接收文件描述符
char buf[1024];
memset( buf, '\0', 1024 );
read( fd_to_pass, buf, 1024 ); //主进程读取接收的文件描述符中的数据
printf( "I got fd %d and data %s\n", fd_to_pass, buf );
close( fd_to_pass ); //主进程关闭 接收的文件描述符。
}