目录
1.master进程监听socket
2.master和worker进程通信机制
2.1通信渠道
2.2通信方法
2.3通信内容
2.4子进程事件处理
3.epoll封装
4.linux系统下信号查看
1.master进程监听socket
nginx在master进程socket bind listen,accept在通过epoll在子进程中控制,代码如下:
ngx_int_t
ngx_open_listening_sockets(ngx_cycle_t *cycle)
{
...
s = ngx_socket(ls[i].sockaddr->sa_family, ls[i].type, 0);
...
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
(const void *) &reuseaddr, sizeof(int))
== -1)
...
if (bind(s, ls[i].sockaddr, ls[i].socklen) == -1) {
...
if (listen(s, ls[i].backlog) == -1) {
...
}
调用堆栈如下:
ngx_open_listening_sockets(ngx_cycle_t * cycle) (nginx-1.22.1/src/core/ngx_connection.c:604)
ngx_init_cycle(ngx_cycle_t * old_cycle) (nginx-1.22.1/src/core/ngx_cycle.c:618)
main(int argc, char * const * argv) (nginx-1.22.1/src/core/nginx.c:292)
2.master和worker进程通信机制
2.1通信渠道
创建套接字组用于读和写,master用fd[0]写,worker用fd[1]进行读
socketpair
2.2通信方法
master用sendmsg发送,work用recvmsg接收
#include <sys/types.h>
#include <sys/socket.h>
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
2.3通信内容
struct msghdr
2.4子进程事件处理
子进程注册fd[1] -> ngx_channel读事件到epoll上
static void
ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker)
{
...
if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT,
ngx_channel_handler)
== NGX_ERROR)
{
/* fatal */
exit(2);
}
}ngx_channel在如下函数进行赋值,为fd[1]
ngx_pid_t
ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data,
char *name, ngx_int_t respawn)
{
...
ngx_channel = ngx_processes[s].channel[1];
...
}
3.epoll封装
文件:
nginx-1.22.1/src/event/modules/ngx_epoll_module.c
结构:
static ngx_event_module_t ngx_epoll_module_ctx = {
&epoll_name,
ngx_epoll_create_conf, /* create configuration */
ngx_epoll_init_conf, /* init configuration */
{
ngx_epoll_add_event, /* add an event */
ngx_epoll_del_event, /* delete an event */
ngx_epoll_add_event, /* enable an event */
ngx_epoll_del_event, /* disable an event */
ngx_epoll_add_connection, /* add an connection */
ngx_epoll_del_connection, /* delete an connection */
#if (NGX_HAVE_EVENTFD)
ngx_epoll_notify, /* trigger a notify */
#else
NULL, /* trigger a notify */
#endif
ngx_epoll_process_events, /* process the events */
ngx_epoll_init, /* init the events */
ngx_epoll_done, /* done the events */
}
};
4.linux系统下信号查看
kill -l
5.信号处理模拟
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <signal.h>
#include <errno.h>
#include <stdint.h>
int parentProcess(int* fds, int num) {
if (num < 2) {
return -1;
}
close(fds[0]); // 关闭fds[0] 使用fds[1]读写,子进程中关闭fds[1] 使用fds[0]读写
char buf[128] {0};
char *pStr = (char*)"hello child, I am parent";
int inum = 0;
while (inum++ < 3) {
memset(buf, 0x00, sizeof(buf));
//写
write(fds[1], pStr, strlen(pStr));
//读
read(fds[1], buf, 128);
printf("parent [%d] %d :: %s\n", getpid(), inum, buf);
sleep(1);
}
close(fds[1]);
return 0;
}
int childProcess(int* fds, int num) {
if (num < 2) {
return -1;
}
close(fds[1]);
char buf[128] {0};
char *pStr = (char*)"hello parent, I am child";
char sendBuf[128] = {0};
sprintf(sendBuf, "hello parent, I am child %d", getpid());
int inum = 0;
while (inum++ < 3) {
memset(buf, 0x00, sizeof(buf));
//读
read(fds[0], buf, 128);
printf("child [%d] %d :: %s\n", getpid(), inum, buf);
//写
write(fds[0], sendBuf, strlen(sendBuf));
sleep(1);
}
close(fds[0]);
return 0;
}
//设置文件描述符非阻塞
int fd_nonblocking(int s)
{
int nb;
nb = 1;
//方法一
/*
int flag = fcntl(s, F_GETFL);
flag |= O_NONBLOCK;
return fcntl(s, F_SETFL, flag);
*/
//方法二
return ioctl(s, FIONBIO, &nb);
}
int close_channel(int* fds) {
if (close(fds[0]) == -1) {
printf("close() channel fds[0] failed\n");
}
if (close(fds[1]) == -1) {
printf("close() channel fds[1] failed\n");
}
return 0;
}
void testNonblockingSocketFd(int* fds) {
if (-1 == fd_nonblocking(fds[0])) {
printf("fd_nonblocking fds[0] failed\n");
close_channel(fds);
}
if (-1 == fd_nonblocking(fds[1])) {
printf("fd_nonblocking fds[1] failed\n");
close_channel(fds);
}
}
struct TDataExchangeChannel {
int fds[2];
};
#define GROUP_NUM 2
typedef struct {
int signo;
char *signame;
char *name;
void (*handler)(int signo, siginfo_t *siginfo, void *ucontext);
} ngx_signal_t;
#define ngx_signal_helper(n) SIG##n
#define ngx_signal_value(n) ngx_signal_helper(n)
#define ngx_value_helper(n) #n
#define ngx_value(n) ngx_value_helper(n)
#define NGX_SHUTDOWN_SIGNAL QUIT
#define NGX_TERMINATE_SIGNAL TERM
#define NGX_NOACCEPT_SIGNAL WINCH
#define NGX_RECONFIGURE_SIGNAL HUP
#if (NGX_LINUXTHREADS)
#define NGX_REOPEN_SIGNAL INFO
#define NGX_CHANGEBIN_SIGNAL XCPU
#else
#define NGX_REOPEN_SIGNAL USR1
#define NGX_CHANGEBIN_SIGNAL USR2
#endif
#define ngx_errno errno
typedef intptr_t ngx_int_t;
typedef int ngx_err_t;
typedef uintptr_t ngx_uint_t;
#define NGX_PROCESS_SINGLE 0
#define NGX_PROCESS_MASTER 1
#define NGX_PROCESS_SIGNALLER 2
#define NGX_PROCESS_WORKER 3
#define NGX_PROCESS_HELPER 4
#define NGX_OK 0
#define NGX_ERROR -1
#define NGX_AGAIN -2
#define NGX_BUSY -3
#define NGX_DONE -4
#define NGX_DECLINED -5
#define NGX_ABORT -6
ngx_uint_t ngx_process;
void
ngx_signal_handler(int signo, siginfo_t *siginfo, void *ucontext);
ngx_signal_t signals[] = {
{ ngx_signal_value(NGX_RECONFIGURE_SIGNAL),
"SIG" ngx_value(NGX_RECONFIGURE_SIGNAL),
"reload",
ngx_signal_handler },
{ ngx_signal_value(NGX_REOPEN_SIGNAL),
"SIG" ngx_value(NGX_REOPEN_SIGNAL),
"reopen",
ngx_signal_handler },
{ ngx_signal_value(NGX_NOACCEPT_SIGNAL),
"SIG" ngx_value(NGX_NOACCEPT_SIGNAL),
"",
ngx_signal_handler },
{ ngx_signal_value(NGX_TERMINATE_SIGNAL),
"SIG" ngx_value(NGX_TERMINATE_SIGNAL),
"stop",
ngx_signal_handler },
{ ngx_signal_value(NGX_SHUTDOWN_SIGNAL),
"SIG" ngx_value(NGX_SHUTDOWN_SIGNAL),
"quit",
ngx_signal_handler },
{ ngx_signal_value(NGX_CHANGEBIN_SIGNAL),
"SIG" ngx_value(NGX_CHANGEBIN_SIGNAL),
"",
ngx_signal_handler },
{ SIGALRM, "SIGALRM", "", ngx_signal_handler },
{ SIGINT, "SIGINT", "", ngx_signal_handler },
{ SIGIO, "SIGIO", "", ngx_signal_handler },
{ SIGCHLD, "SIGCHLD", "", ngx_signal_handler },
{ SIGSYS, "SIGSYS, SIG_IGN", "", NULL },
{ SIGPIPE, "SIGPIPE, SIG_IGN", "", NULL },
{ 0, NULL, "", NULL }
};
void
ngx_signal_handler(int signo, siginfo_t *siginfo, void *ucontext)
{
char *action;
ngx_int_t ignore;
ngx_err_t err;
ngx_signal_t *sig;
ignore = 0;
err = ngx_errno;
printf("signo = [%d]\n", signo);
for (sig = signals; sig->signo != 0; sig++) {
if (sig->signo == signo) {
break;
}
}
action = "";
switch (ngx_process) {
case NGX_PROCESS_MASTER:
case NGX_PROCESS_SINGLE:
switch (signo) {
case ngx_signal_value(NGX_SHUTDOWN_SIGNAL):
//ngx_quit = 1;
action = ", shutting down";
break;
case ngx_signal_value(NGX_TERMINATE_SIGNAL):
case SIGINT:
//ngx_terminate = 1;
action = ", exiting";
break;
case ngx_signal_value(NGX_NOACCEPT_SIGNAL):
/*
if (ngx_daemonized) {
ngx_noaccept = 1;
action = ", stop accepting connections";
}
*/
break;
case ngx_signal_value(NGX_RECONFIGURE_SIGNAL):
//ngx_reconfigure = 1;
action = ", reconfiguring";
break;
case ngx_signal_value(NGX_REOPEN_SIGNAL):
//ngx_reopen = 1;
action = ", reopening logs";
break;
case ngx_signal_value(NGX_CHANGEBIN_SIGNAL):
/*
if (ngx_getppid() == ngx_parent || ngx_new_binary > 0) {
action = ", ignoring";
ignore = 1;
break;
}
ngx_change_binary = 1;
*/
action = ", changing binary";
break;
case SIGALRM:
//ngx_sigalrm = 1;
break;
case SIGIO:
//ngx_sigio = 1;
break;
case SIGCHLD:
//ngx_reap = 1;
break;
}
break;
case NGX_PROCESS_WORKER:
case NGX_PROCESS_HELPER:
switch (signo) {
case ngx_signal_value(NGX_NOACCEPT_SIGNAL):
/*
if (!ngx_daemonized) {
break;
}
ngx_debug_quit = 1;
*/
/* fall through */
case ngx_signal_value(NGX_SHUTDOWN_SIGNAL):
//ngx_quit = 1;
action = ", shutting down";
break;
case ngx_signal_value(NGX_TERMINATE_SIGNAL):
case SIGINT:
//ngx_terminate = 1;
action = ", exiting";
break;
case ngx_signal_value(NGX_REOPEN_SIGNAL):
//ngx_reopen = 1;
action = ", reopening logs";
break;
case ngx_signal_value(NGX_RECONFIGURE_SIGNAL):
case ngx_signal_value(NGX_CHANGEBIN_SIGNAL):
case SIGIO:
action = ", ignoring";
break;
}
break;
}
/*
if (siginfo && siginfo->si_pid) {
printf("signal %d (%s) received from %P%s",
signo, sig->signame, siginfo->si_pid, action);
} else {
printf("signal %d (%s) received%s",
signo, sig->signame, action);
}
*/
if (ignore) {
printf("the changing binary signal is ignored: "
"you should shutdown or terminate "
"before either old or new binary's process");
}
printf("action = [%s]\n", action);
}
ngx_int_t
ngx_init_signals()
{
ngx_signal_t *sig;
struct sigaction sa;
for (sig = signals; sig->signo != 0; sig++) {
memset(&sa, 0x00, sizeof(struct sigaction));
if (sig->handler) {
sa.sa_sigaction = sig->handler;
sa.sa_flags = SA_SIGINFO;
} else {
sa.sa_handler = SIG_IGN;
}
sigemptyset(&sa.sa_mask);
if (sigaction(sig->signo, &sa, NULL) == -1) {
#if (NGX_VALGRIND)
printf("sigaction(%s) failed, ignored", sig->signame);
#else
printf("sigaction(%s) failed", sig->signame);
return NGX_ERROR;
#endif
}
}
return NGX_OK;
}
int main()
{
//信号初始化
ngx_init_signals();
#ifdef NOBLOCKFD
testNonblockingSocketFd(fds);
#endif
TDataExchangeChannel channels[GROUP_NUM];
for (int i = 0; i < GROUP_NUM; i++) {
socketpair(PF_UNIX, SOCK_STREAM, 0, channels[i].fds);
int pid = fork();
switch (pid) {
case -1: // error
return -1;
case 0: // child
childProcess(channels[i].fds, 2);
printf("child exit \n");
exit(0);
default: // parent
break;
}
}
//父进程给子进程发送消息,并接收子进程发来的消息
for (int i = 0; i < GROUP_NUM; i++) {
parentProcess(channels[i].fds, 2);
}
while(1) {
sleep(20);
}
return 0;
}
运行结果: