内核在协议栈接收处理完输入包以后,要能通知到用户进程,让用户进程能够收到并处理这些数据。进程和内核配合有很多种方案,第一种是同步阻塞的方案,第二种是多路复用方案。本文以epoll为例
部分内容来源于 《深入理解Linux网络》、《Linux内核源码分析TCP实现》
socket
在网络编程中,套接字(Socket)是用于描述计算机网络中通信端点的抽象概念。它允许应用程序在网络上进行数据传输,通过特定的 API 与底层协议(如 TCP 或 UDP)交互。套接字可以分为流式套接字(用于 TCP 连接,提供可靠的字节流通信)和数据报套接字(用于 UDP 连接,提供无连接的不可靠通信)。常见操作包括创建套接字、绑定地址、监听连接、接受和发送数据等。
int socket(int domain, int type, int protocol);
创建完socket之后,内核其实在内部创建了一系列的对象,部分对象如上所示。
创建流程
接下来解析socket的创建流程以及其大体包含哪些内容:
首先是socket系统调用,调用__sys_socket
,其中首先调用sock_create
创建socket。
SYSCALL_DEFINE3(socket, int, family, int, type, int, protocol)
{
return __sys_socket(family, type, protocol);
}
/**
family: 常用AF_INET(ipv4),AF_UNIX(本地),AF_INET6(ipv6)
type: SOCK_STREAM,SOCK_DGRAM,SOCK_RAW
*/
int __sys_socket(int family, int type, int protocol)
{
int retval;
struct socket *sock;
int flags;
// 创建socket
retval = sock_create(family, type, protocol, &sock);
// 将socket和文件描述符关联,并返回对应描述符
return sock_map_fd(sock, flags & (O_CLOEXEC | O_NONBLOCK));
}
初始化socket
sock_create
是创建socket的主要位置,其中sock_create
又调用了_sock_create
。
int __sock_create(struct net *net, int family, int type, int protocol,
struct socket **res, int kern)
{
struct socket *sock;
const struct net_proto_family *pf;
sock = sock_alloc();
// 获取协议族
pf = rcu_dereference(net_families[family]);
// 调用协议族的create函数
pf->create(net, sock, protocol, kern);
...
}
在_sock_create
里,首先调用sock_alloc
来分配一个struct sock
内核对象,接着获取协议族的操作函数表,并调用其create方法。对于AF_INET
协议族来说,执行到的是inet_create
方法。
// file: net/ipv4/af_inet.c
static struct inet_protosw inetsw_array[] =
{
// 可以看到tcp对应的type和protocol
{
.type = SOCK_STREAM,
.protocol = IPPROTO_TCP,
.prot = &tcp_prot,
.ops = &inet_stream_ops,
.flags = INET_PROTOSW_PERMANENT |
INET_PROTOSW_ICSK,
},
}
static int inet_create(struct net *net, struct socket *sock, int protocol,
int kern)
{
struct inet_protosw *answer;
/* look for the requested type/protocol pair. */
lookup_protocol:
/**
每个type都有一个链表,里面对应不同协议的对象
例如:type为SOCK_DGRAM的链表,protocol包含UDP,ICMP等
*/
list_for_each_entry_rcu(answer, &inetsw[sock->type], list) {
// 更据protocol匹配,得到anser
if (protocol == answer->protocol) {
if (protocol != IPPROTO_IP)
break;
} else
......
}
// 将 inet_stream_ops 赋到 socket->ops 上
sock->ops = answer->ops;
// 获得 tcp_prot
answer_prot = answer->prot;
// 分配 sock 对象,并把 tcp_prot 赋到 sock->sk_prot 上
sk = sk_alloc(net, PF_INET, GFP_KERNEL, answer_prot, kern);
// 初始化,且sock->sk = sk;
sock_init_data(sock, sk);
}
在inet _create
中,根据类型SOCK_STREAM
查找到对于TCP定义的操作方法实现集合inet_stream _ops
和tcp_prot
,并把它们分别设置到socket- >ops
和sock->sk_prot
上,如图下图所示。
再往下看到了sock_init_data
。在这个方法中将sock中的sk _data ready
函数指针进行了初始化,设置为默认sock_def_readable
。
// file: net/core/sock.c
void sock_init_data(struct socket *sock, struct sock *sk) {
sk->sk_data_ready = sock_def_readable;
sk->sk_write_space = sock_def_write_space;
sk->sk_error_report = sock_def_error_report;
}
当软中断上收到数据包时会通过调用sk_data_ready函数指针来唤醒在sock上等待的进程。至此,一个tcp对象就算创建完成了,这里花费了一次socket系统调用的开销。
与sockfs关联
创建完成后将socket和文件描述符关联,并返回对应描述符。关于socket和文件描述符关联,实际上Linux中存在sockfs的虚拟文件系统专门用来管理套接字,会创建并关联对应inode,将socket视为文件管理。
// file: net/socket.c
/**
创建sock时实际sockfs创建inode,和socket关联
*/
struct socket *sock_alloc(void)
{
struct inode *inode;
struct socket *sock;
inode = new_inode_pseudo(sock_mnt->mnt_sb);
sock = SOCKET_I(inode);
inode->i_ino = get_next_ino();
inode->i_mode = S_IFSOCK | S_IRWXUGO;
inode->i_uid = current_fsuid();
inode->i_gid = current_fsgid();
inode->i_op = &sockfs_inode_ops;
return sock;
}
static int sock_map_fd(struct socket *sock, int flags)
{
struct file *newfile;
// 从当前进程files表中获取未使用fd
int fd = get_unused_fd_flags(flags);
//通过sockfs创建一个于传入socket关联的file
newfile = sock_alloc_file(sock, flags, NULL);
if (!IS_ERR(newfile)) {
// 将fd与socket对应file关联
fd_install(fd, newfile);
return fd;
}
}
/**
为socket创建file,并关联
*/
struct file *sock_alloc_file(struct socket *sock, int flags, const char *dname)
{
struct file *file;
if (!dname)
dname = sock->sk ? sock->sk->sk_prot_creator->name : "";
file = alloc_file_pseudo(SOCK_INODE(sock), sock_mnt, dname,
O_RDWR | (flags & O_NONBLOCK),
&socket_file_ops);
// sock和file互相关联
sock->file = file;
file->private_data = sock;
return file;
}
小结
socket套接字创建流程如下:
-
系统调用:用户程序通过调用 socket() 系统调用请求创建一个新的socket。
-
进入内核态:系统调用被转发到内核中的处理函数。
-
分配socket结构:内核分配一个 struct socket 结构体,用于描述该socket的状态和属性。
-
分配sock结构:分配一个 struct sock 结构体,表示与网络协议相关的信息。
-
初始化socket数据:调用 sock_init_data() 函数初始化 struct sock 中的各种回调函数。
-
协议族和协议的注册:根据传入的协议族和类型选择合适的协议。在TCP/IP协议栈中,调用 inet_create() 或类似函数来创建对应的协议对象。
-
绑定sockfs:将刚创建的socket结构体与sockfs绑定。sockfs是Linux内核中用于处理套接字的虚拟文件系统。这个过程通常在 socket 创建后完成,使得该socket可以被视为文件描述符。在这个阶段,内核会设置socket的操作和协议相关的功能,以便为后续的操作(如bind()、connect()等)做好准备。
-
分配端口和地址:如果是流式socket(如TCP),内核将分配一个本地端口和地址,以便后续的连接请求。
-
返回socket描述符:一旦所有的初始化工作完成,内核会返回一个文件描述符给用户程序,表示新创建的socket。
-
后续操作:用户可以使用返回的socket描述符进行后续的操作,例如绑定地址 (bind())、监听连接 (listen()) 和接受连接 (accept()) 等。
同步阻塞I/O
从用户进程创建socket,到一个网络包抵达网卡被用户进程接收,同步阻塞IO总体上的流程如图所示。
等待接收消息
ssize_t recv(int sockfd, void buf[.len], size_t len,int flags);
ssize_t recvfrom(int sockfd, void buf[restrict .len], size_t len,
int flags,
struct sockaddr *_Nullable restrict src_addr,
socklen_t *_Nullable restrict addrlen);
recv会执行recvform系统调用。进入系统调用后,用户进程就进入了内核态,执行一系列的内核协议层函数,然后到socket对象的接收队列中查看是否有数据,没有的话就把自己添加到socket对应的等待队列里。最后让出CPU,操作系统会选择下一个就绪状态的进程来执行。整个流程如下图:
SYSCALL_DEFINE6(recvfrom, int, fd, void __user *, ubuf, size_t, size,
unsigned int, flags, struct sockaddr __user *, addr,
int __user *, addr_len)
{
return __sys_recvfrom(fd, ubuf, size, flags, addr, addr_len);
}
int __sys_recvfrom(int fd, void __user *ubuf, size_t size, unsigned int flags,
struct sockaddr __user *addr, int __user *addr_len)
{
struct socket *sock;
struct iovec iov;
struct msghdr msg;
struct sockaddr_storage address;
// 将用户空间缓冲区转换为内核空间可以使用的iovec
err = import_single_range(READ, ubuf, size, &iov, &msg.msg_iter);
// 通过fd获取socket对象
sock = sockfd_lookup_light(fd, &err, &fput_needed);
msg.msg_control = NULL;
msg.msg_controllen = 0;
...
// 接收数据并通过msg.msg_iter写入用户缓冲区
err = sock_recvmsg(sock, &msg, flags);
//addr不为NULL时获取对等方地址
if (err >= 0 && addr != NULL) {
move_addr_to_user(&address,msg.msg_namelen, addr, addr_len);
}
}
通过查找当前进程文件表可以获取fd对应的file对象,之前讲过file和socket关联,从而获取对应socket。在sock_recvmsg
函数中会从socket中获取数据并写入用户缓冲区。
static inline int sock_recvmsg_nosec(struct socket *sock, struct msghdr *msg,
int flags)
{
return INDIRECT_CALL_INET(sock->ops->recvmsg, inet6_recvmsg,
inet_recvmsg, sock, msg, msg_data_left(msg),
flags);
}
const struct proto_ops inet_stream_ops = {
.recvmsg = inet_recvmsg,
};
最终会调用到sock_recvmsg_nosec
函数,其中又会调用sock->ops->recvmsg
,在这里即调用inet_recvmsg
。而在inet_recvmsg
中又会调用sock成员的函数sk->sk_prot->recvmsg
,即tcp_recvmsg
。
int inet_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
int flags)
{
struct sock *sk = sock->sk;
err = INDIRECT_CALL_2(sk->sk_prot->recvmsg, tcp_recvmsg, udp_recvmsg,
sk, msg, size, flags & MSG_DONTWAIT,
flags & ~MSG_DONTWAIT, &addr_len);
}
struct proto tcp_prot = {
.recvmsg = tcp_recvmsg,
...
};
tcp_recvmsg
函数会从sock结构体的接收队列中获取skbuff并拷贝数据到用户缓冲区。
//file: net/ipv4/tcp.c
int tcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int nonblock,
int flags, int *addr_len)
{
do {
// 在循环中不断从接收队列获取数据
last = skb_peek_tail(&sk->sk_receive_queue);
skb_queue_walk(&sk->sk_receive_queue, skb) {
last = skb;
offset = *seq - TCP_SKB_CB(skb)->seq;
if (offset < skb->len)
goto found_ok_skb;
if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN)
goto found_fin_ok;
...
}
...
if (copied >= target) {
/* Do not sleep, just process backlog. */
release_sock(sk);
lock_sock(sk);
} else {
// 没有拷贝足够数据,等待,阻塞当前进程
sk_wait_data(sk, &timeo, last);
}
found_ok_skb:
// 将skbuff内核态数据拷贝到用户缓冲区
used = skb->len - offset;
skb_copy_datagram_msg(skb, offset, msg, used);
} while (len > 0)
}
skb_queue_walk
函数在读取sock对象下的接收队列,如果数据不够多则调用sk_wait_data
。
sk_wait_data
函数会阻塞进程,其内部如下:
// file:net/core/sock.c
int sk_wait_data(struct sock *sk, long *timeo, const struct sk_buff *skb)
{
DEFINE_WAIT_FUNC(wait, woken_wake_function);
int rc;
// 向sock等待队列添加等待项
add_wait_queue(sk_sleep(sk), &wait);
sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
// 等待条件为接收队列尾部元素改变,陷入阻塞
rc = sk_wait_event(sk, timeo, skb_peek_tail(&sk->sk_receive_queue) != skb, &wait);
sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
remove_wait_queue(sk_sleep(sk), &wait);
return rc;
}
其将当前进程的等待项添加到与 socket (sk)
相关联的等待队列中。sk_sleep(sk)
返回一个指向与该 socket 相关的睡眠队列的指针。之后设置 socket 的状态位,以指示当前进程正在等待数据。
之后调用 sk_wait_event
,该函数会检查条件表达式(即接收队列尾部元素是否改变)并可能导致阻塞。如果条件不满足,进程将进入休眠状态,直到有其他进程唤醒它。
timeo 参数可以指定超时值,在指定时间内如果条件仍未满足,进程将被唤醒。
当有数据到达 socket 或者其他条件发生变化时,其他进程会调用相应的唤醒函数,如 wake_up()
,从而将这个等待队列中的进程唤醒。在退出之前,函数会清除设置的状态位,并从等待队列中移除当前进程的等待项。
整个过程涉及一次进程上下文转换。
软中断模块唤醒进程
前文讲到了网络包到网卡后是怎么被网卡接收,最后再交由软中断处理的,这里直接从TCP协议的接收函数tcp _v4_rcv
看起。
软中断(也就是Linux里的ksoftirqd线程)里收到数据包以后,发现是TCP包就会执行tcp_v4_rcv
函数。
int tcp_v4_rcv
通过IP和端口获取对应的struct sock对象,进一步调用tcp_v4_do_rcv
,主要看其中对ESTABLISHED状态下的数据处理——tcp_rcv_established
,在其中会进行TCP协议的相关处理,之后将处理完成的sk_buff
加入sock对象的接收队列中,然后执行sk->sk_data_ready(sk)
。在socket创建部分我们知道该函数指针指向sock_def_readable
函数,在其中获取sock对象的等待队列,唤醒等待的进程。
// file:net/ipv/tcp_ipv4.c
int tcp_v4_rcv(struct sk_buff *skb)
{
th = (const struct tcphdr *)skb->data; // 获取tcp header
iph = ip_hdr(skb); // 获取ip header
// 根据数据包的ip,端口信息找到对应struct sock
sk = __inet_lookup_skb(&tcp_hashinfo, skb, __tcp_hdrlen(th), th->source,th->dest, sdif, &refcounted);
...
if (!sock_owned_by_user(sk)) {
// 调用tcp_v4_do_rcv进一步处理
ret = tcp_v4_do_rcv(sk, skb);
}
}
int tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb)
{
struct sock *rsk;
// ESTABLISHED状态下的数据处理
if (sk->sk_state == TCP_ESTABLISHED) { /* Fast path */
tcp_rcv_established(sk, skb);
return 0;
}
}
// file: net/ipve/tcp_input.c
void tcp_rcv_established(struct sock *sk, struct sk_buff *skb)
{
// 一系列处理
......
// 将处理好的sk_buff放入sock对象的接收队列
tcp_queue_rcv(sk, skb, &fragstolen);
// 出发就绪事件
tcp_data_ready(sk);
}
void tcp_data_ready(struct sock *sk)
{
...
sk->sk_data_ready(sk);
}
// file: net/core/sock.c
static void sock_def_readable(struct sock *sk)
{
struct socket_wq *wq;
rcu_read_lock();
wq = rcu_dereference(sk->sk_wq);
if (skwq_has_sleeper(wq))
wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI |
EPOLLRDNORM | EPOLLRDBAND);
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
rcu_read_unlock();
}
sock_def_readable
函数中wake_up_interruptible_sync_poll
宏的内容如下,其中nr_exclusive
参数传入1,代表即使多个进程阻塞在同一个sock上也只唤醒一个进程,避免“惊群”。
#define wake_up_interruptible_sync_poll(x, m) \
__wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, poll_to_key(m))
void __wake_up_sync_key(struct wait_queue_head *wq_head, unsigned int mode,
int nr_exclusive, void *key) {}
小结
同步阻塞方式接收网络包的整个过程分为两部分:
第一部分是自己的代码所在的进程,调用的socket()函数会进入内核态创建必要内核对象。recv()函数在进入内核态以后负责查看接收队列,以及在没有数据可处理的时候把当前进程阻塞掉,让出CPU。
第二部分是硬中断、软中断上下文(系统线程ksoftirqd)。在这些组件中,将包处理完后会放到socket的接收队列中。然后根据socket内核对象找到其等待队列中正在因为等待而被阻塞掉的进程,把它唤醒。
异步阻塞
在Linux上多路复用方案有select、poll、epoll。它们三个中的epoll的性能表现是最优秀的,能支持的并发量也最大。所以把epoll作为要拆解的对象,深入揭秘内核是如何实现多路的IO管理的。
epoll解析
小结
epoll的数据结构:
- rb_root rbr,这是红黑树的根节点,存储着所有添加到 epoll 中的事件,也就是这个 epoll 监控的事件。
- list_head rdllist 这是一个双向链表,保存着将要通过 epoll_wait 返回给用户的、满足条件的事件。
epoll的操作: 调用 epoll_create 建立一个 epoll 对象(在 epoll 文件系统中给这个句柄分配资源)、调用 epoll_ctl 向 epoll 对象中添加连接的套接字、调用 epoll_wait 收集发生事件的连接。
当进程调用 epoll_create 方法时,内核会创建一个 eventpoll 对象,也就是应用程序中的 epfd(epoll 文件描述符) 所代表的对象。eventpoll 对象也是文件系统中的一员,和socket一样也有一个等待队列。
创建epoll对象 eventpoll 之后,可以使用 epoll_ctl 添加或者删除所要监听的socket。内核会将eventpoll添加到需要监听的socket的等待队列中。当socket收到数据后,中断回调程序会操作eventpoll对象,而不是直接操作进程。
在 eventpoll 对象中存在就绪列表,rdlist(双向链表保存着将要通过 epoll_wait 返回给用户满足条件的事件)。中断回调程序会给eventpoll的就绪列表添加socket的引用。eventpoll对象相当于是socket和进程之间的中介,socket的数据接收并不直接影响进程,而是通过改变eventpoll的就绪列表来改变进程状态。
epoll_wait的返回条件也是根据rdlist的状态进行判断:如果rdlist已经引用了socket,那么epoll_wait直接返回(把发生的事件的集合从内核复制到 events数组中);如果rdlist为空,阻塞进程。
(对于epoll,操作系统只需要将进程放入eventpoll这一个对象的等待队列中;而对于select,操作系统则需要将进程放入到socket列表中的所有socket对象的等待队列中。)
疑问点
多路复用epoll为什么就能提高网络性能?
epoll高性能最根本的原因是极大程度地减少了无用的进程上下文切换,让进程更专注地处理网络请求。
在内核的硬、软中断上下文中,包从网卡接收过来进行处理,然后放到socket的接收队列。再找到socket关联的epitem,并把它添加到epoll对象的就绪链表中。
在用户进程中,通过调用epoll_wait来查看就绪链表中是否有事件到达,如果有,直接取走进行处理。处理完毕再次调用epoll_wait。在高并发的实践中,只要连接足够多,epoll_wait根本不会让进程阻塞。用户进程会一直处理,直到epoll_wait里实在没活儿可干的时候才主动让出CPU。这是epoll高效的核心原因所在。
红黑树仅仅是提高了epoll查找、添加、删除socket时的效率而已,不算epoll在高并发场景高性能的根本原因。
epoll也是阻塞的?
很多人以为只要一提到阻塞,就是性能差,其实这就冤枉了阻塞。阻塞说的是进程因为等待某个事件而主动让出CPU挂起的操作。
例如,一个epoll对象下添加了一万个客户端连接的socket。假设所有这些socket上都还没有数据达到,这个时候进程调用epoll_wait发现没有任何事情可干。这种情况下用户进程就会被阻塞掉,而这种情况是完全正常的,没有工作需要处理,那还占着CPU是没有道理的。
阻塞不会导致低性能,过多过频繁的阻塞才会。epoll的阻塞和它的高性能并不冲突。