tcp_recvmsg
是 Linux 内核中用于处理 TCP 套接字接收数据的核心函数。它的主要任务是从接收队列中读取数据并将其复制到用户空间。
函数原型
int tcp_recvmsg(struct kiocb *iocb,
struct sock *sk,
struct msghdr *msg,
size_t len,
int nonblock,
int flags,
int *addr_len
);
参数说明
iocb
: IO 控制块,包含与 IO 操作相关的信息。sk
: 指向struct sock
的指针,表示当前套接字。msg
: 指向struct msghdr
的指针,包含接收数据的缓冲区信息。len
: 要接收的最大字节数。nonblock
: 非阻塞标志。flags
: 控制接收行为的标志。addr_len
: 传出参数,用于返回地址长度。
处理流程
锁定套接字
通过调用 lock_sock(sk) 来确保对套接字的独占访问,防止其他线程同时访问。
/** 通过 tcp_sk 宏将传入的 struct sock *sk 转换为 struct tcp_sock *tp 类型,获取与该套接字相关的 TCP 结构体。
** 这使得后续的操作能够直接访问 TCP 特有的字段
**/
struct tcp_sock *tp = tcp_sk(sk);
int copied = 0; //用于记录已经成功复制到用户缓冲区的字节数
u32 peek_seq; //用于存储在使用 MSG_PEEK 标志时的序列号
u32 *seq; //指向当前序列号的指针,用于跟踪已复制的数据
unsigned long used; //用于记录从接收缓冲区中提取的数据长度
int err;
int target; /* Read at least this many bytes(期望读取的最小字节数) */
long timeo; //用于存储接收超时时间。
struct task_struct *user_recv = NULL; //指向正在接收数据的用户任务结构体,初始化为 NULL
int copied_early = 0; //标记是否在早期阶段就复制了数据
struct sk_buff *skb; //指向当前处理的套接字缓冲区(socket buffer)
u32 urg_hole = 0; //用于处理紧急数据时可能出现的数据缺口
lock_sock(sk);
状态检查
err = -ENOTCONN;
if (sk->sk_state == TCP_LISTEN)
goto out;
// sk_rcvtimeo 通常表示套接字在接收数据时的超时时间,单位为毫秒。
static inline long sock_rcvtimeo(const struct sock *sk, int noblock)
{
return noblock ? 0 : sk->sk_rcvtimeo;
}
检查套接字状态,如果处于监听状态(TCP_LISTEN
),则返回错误,因为在此状态下不能接收数据。
这一设计的目的主要是为了处理不同的网络通信场景,确保套接字在接收数据时的灵活性和效率。
区分阻塞与非阻塞模式:
- 阻塞模式: 在这种模式下,如果没有数据可读,调用
tcp_recvmsg
的进程会被挂起,直到有数据到达或发生错误。在这种情况下,超时设置用于防止进程无限期等待。 - 非阻塞模式: 如果套接字处于非阻塞模式,函数会立即返回,如果没有数据可读,则返回
-EAGAIN
或-EWOULDBLOCK
。在这种情况下,超时设置用于控制函数在没有数据时的返回行为。
阻塞模式下默认超时设置为sk_rcvtimeo
,如何设置这个参数呢?
在默认情况下,TCP 套接字的接收超时通常设定为几秒钟(例如 180 秒),但具体值可以通过系统配置进行调整。
默认值: 在许多 Linux 系统中,TCP 套接字的默认接收超时为 180 秒(即 180000 毫秒),但可以根据具体需求进行修改。
sk_rcvtimeo
的值通常在创建套接字时被初始化,具体可以在以下几个地方进行设置:
- 套接字创建: 在调用 socket() 函数创建套接字时,内核会为该套接字分配默认的超时时间。
- 系统配置文件: 超时时间也可以通过系统配置文件(如
/proc/sys/net/ipv4/tcp_rmem
和/proc/sys/net/ipv4/tcp_wmem)
进行调整。这些文件定义了 TCP 套接字的缓冲区大小和超时行为。
应用程序可以通过调用 setsockopt()
函数显式地设置套接字的接收超时。例如,可以使用 SO_RCVTIMEO
选项来修改 sk_rcvtimeo
的值。
超时设置
timeo = sock_rcvtimeo(sk, nonblock);
设置接收超时时间,根据传入的非阻塞标志来确定。
紧急数据处理
检查传入的标志 flags 是否包含 MSG_OOB
。如果包含,函数将跳转到专门处理紧急数据的部分 (recv_urg
)
if (flags & MSG_OOB) goto recv_urg;
...
recv_urg:
err = tcp_recv_urg(sk, msg, len, flags);
goto out;
为什么需要这个处理流程呢?
- 紧急数据的特性
紧急数据: 在 TCP 协议中,紧急数据(也称为“带外数据”)是一种特殊的数据流,它允许应用程序在正常数据流中插入优先级更高的数据。这通常用于需要立即处理的信息,如中断信号或重要通知。
独立处理: 紧急数据的处理与常规数据流不同,因此需要单独的逻辑来确保它们能够被及时和正确地接收。- 应用场景
实时通信: 在某些实时应用(如 VoIP 或在线游戏)中,可能会使用紧急数据来传递关键控制信息或状态更新,这些信息需要优先处理。
终端控制: 在终端应用程序中,紧急数据常用于发送控制字符(如 Ctrl+C),这些字符需要立即响应,以便用户能够及时中断或控制正在运行的程序。
网络协议: 一些网络协议可能会利用紧急数据来指示特定事件或状态变化,例如在 SSH 或 Telnet 等协议中。
/*
* Handle reading urgent data. BSD has very simple semantics for
* this, no blocking and very strange errors 8)
*/
static int tcp_recv_urg(struct sock *sk, struct msghdr *msg, int len, int flags)
{
struct tcp_sock *tp = tcp_sk(sk);
/* No URG data to read. */
if (sock_flag(sk, SOCK_URGINLINE) || !tp->urg_data ||
tp->urg_data == TCP_URG_READ)
return -EINVAL; /* Yes this is right ! */
if (sk->sk_state == TCP_CLOSE && !sock_flag(sk, SOCK_DONE))
return -ENOTCONN;
if (tp->urg_data & TCP_URG_VALID) {
int err = 0;
char c = tp->urg_data;
if (!(flags & MSG_PEEK))
tp->urg_data = TCP_URG_READ;
/* Read urgent data. */
msg->msg_flags |= MSG_OOB;
if (len > 0) {
if (!(flags & MSG_TRUNC))
err = memcpy_toiovec(msg->msg_iov, &c, 1);
len = 1;
} else
msg->msg_flags |= MSG_TRUNC;
return err ? -EFAULT : len;
}
if (sk->sk_state == TCP_CLOSE || (sk->sk_shutdown & RCV_SHUTDOWN))
return 0;
/* Fixed the recv(..., MSG_OOB) behaviour. BSD docs and
* the available implementations agree in this case:
* this call should never block, independent of the
* blocking state of the socket.
* Mike <pall@rz.uni-karlsruhe.de>
*/
return -EAGAIN;
}
上面的代码做了什么👆?
在 recv_urg 部分,内核会执行以下操作:
- 读取紧急数据: 从接收缓冲区中提取紧急数据,并将其复制到用户提供的缓冲区。
- 更新状态: 更新 TCP 套接字状态,以反映已处理的紧急数据,并确保后续的数据接收逻辑能够正确识别当前的数据流状态。
- 信号处理: 如果在处理过程中有信号(如 SIGURG)到达,函数会相应地进行处理,以确保应用程序可以及时响应外部事件。
这一设计确保了关键控制信息能够被优先处理,提高了实时应用和网络协议的响应能力。对于需要快速响应用户输入或特定事件的应用场景,这种机制是至关重要的。
主要职责
- 数据接收:
tcp_recvmsg
从 TCP 接收队列中读取数据,并将其复制到用户提供的缓冲区。它确保数据的顺序性和完整性,处理 TCP 协议的特性,如流量控制和拥塞控制。 - 队列管理: 函数会检查多个接收队列,包括:
- 接收队列 (receive queue): 存放已接收但未被读取的数据。
- 预处理队列 (prequeue): 暂存于用户进程上下文中等待处理的数据。
- 后备队列 (backlog): 用于存放在套接字被锁定时到达的数据包。
- 阻塞与非阻塞模式: 根据传入的参数,函数可以在没有数据可读时选择阻塞等待或立即返回。
- 错误处理: 在数据接收过程中,tcp_recvmsg 还会处理各种错误情况,并返回相应的错误码。
工作流程
-
函数首先锁定与套接字相关的控制块,以防止并发访问。
-
检查接收队列是否有数据可用。如果没有且处于阻塞模式,则函数会挂起等待数据到达。
-
一旦有数据可用,函数会遍历接收队列,找到合适的序列号进行读取,并将数据复制到用户缓冲区。
处理特殊标志(如MSG_PEEK
)以决定如何处理数据。
如何处理预备队列和后备队列中的数据
- 预备队列 (prequeue): 当接收的数据包到达时,如果套接字未被用户进程锁定,数据包会被放入预备队列。这些数据包随后会在用户进程上下文中进行处理。
- 后备队列 (backlog): 如果套接字被用户进程锁定,新的数据包将被放入后备队列。此时,任何试图锁定该套接字的线程将被排入等待队列,直到套接字被解锁。
设计预备队列(prequeue)和后备队列(backlog)在 Linux 内核 TCP 实现中是为了优化数据接收的效率和可靠性。
当多个数据包同时到达时,预备队列允许内核在用户进程未锁定套接字的情况下临时存储这些数据包,从而避免数据丢失。这样可以在用户进程准备好读取数据时,快速将数据从预备队列转移到用户空间。
/*
* This routine copies from a sock struct into the user buffer.
*
* Technical note: in 2.3 we work on _locked_ socket, so that
* tricks with *seq access order and skb->users are not required.
* Probably, code can be easily improved even more.
*/
int tcp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
size_t len, int nonblock, int flags, int *addr_len)
{
struct tcp_sock *tp = tcp_sk(sk);
int copied = 0;
u32 peek_seq;
u32 *seq;
unsigned long used;
int err;
int target; /* Read at least this many bytes */
long timeo;
struct task_struct *user_recv = NULL;
int copied_early = 0;
struct sk_buff *skb;
u32 urg_hole = 0;
lock_sock(sk);
err = -ENOTCONN;
if (sk->sk_state == TCP_LISTEN)
goto out;
timeo = sock_rcvtimeo(sk, nonblock);
/* Urgent data needs to be handled specially. */
if (flags & MSG_OOB)
goto recv_urg;
seq = &tp->copied_seq;
if (flags & MSG_PEEK) {
peek_seq = tp->copied_seq;
seq = &peek_seq;
}
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
#ifdef CONFIG_NET_DMA
tp->ucopy.dma_chan = NULL;
preempt_disable();
skb = skb_peek_tail(&sk->sk_receive_queue);
{
int available = 0;
if (skb)
available = TCP_SKB_CB(skb)->seq + skb->len - (*seq);
if ((available < target) &&
(len > sysctl_tcp_dma_copybreak) && !(flags & MSG_PEEK) &&
!sysctl_tcp_low_latency &&
dma_find_channel(DMA_MEMCPY)) {
preempt_enable_no_resched();
tp->ucopy.pinned_list =
dma_pin_iovec_pages(msg->msg_iov, len);
} else {
preempt_enable_no_resched();
}
}
#endif
do {
u32 offset;
/* Are we at urgent data? Stop if we have read anything or have SIGURG pending. */
if (tp->urg_data && tp->urg_seq == *seq) {
if (copied)
break;
if (signal_pending(current)) {
copied = timeo ? sock_intr_errno(timeo) : -EAGAIN;
break;
}
}
/* Next get a buffer. */
skb_queue_walk(&sk->sk_receive_queue, skb) {
/* Now that we have two receive queues this
* shouldn't happen.
*/
if (WARN(before(*seq, TCP_SKB_CB(skb)->seq),
"recvmsg bug: copied %X seq %X rcvnxt %X fl %X\n",
*seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt,
flags))
break;
offset = *seq - TCP_SKB_CB(skb)->seq;
if (tcp_hdr(skb)->syn)
offset--;
if (offset < skb->len)
goto found_ok_skb;
if (tcp_hdr(skb)->fin)
goto found_fin_ok;
WARN(!(flags & MSG_PEEK),
"recvmsg bug 2: copied %X seq %X rcvnxt %X fl %X\n",
*seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt, flags);
}
/* Well, if we have backlog, try to process it now yet. */
if (copied >= target && !sk->sk_backlog.tail)
break;
if (copied) {
if (sk->sk_err ||
sk->sk_state == TCP_CLOSE ||
(sk->sk_shutdown & RCV_SHUTDOWN) ||
!timeo ||
signal_pending(current))
break;
} else {
if (sock_flag(sk, SOCK_DONE))
break;
if (sk->sk_err) {
copied = sock_error(sk);
break;
}
if (sk->sk_shutdown & RCV_SHUTDOWN)
break;
if (sk->sk_state == TCP_CLOSE) {
if (!sock_flag(sk, SOCK_DONE)) {
/* This occurs when user tries to read
* from never connected socket.
*/
copied = -ENOTCONN;
break;
}
break;
}
if (!timeo) {
copied = -EAGAIN;
break;
}
if (signal_pending(current)) {
copied = sock_intr_errno(timeo);
break;
}
}
tcp_cleanup_rbuf(sk, copied);
if (!sysctl_tcp_low_latency && tp->ucopy.task == user_recv) {
/* Install new reader */
if (!user_recv && !(flags & (MSG_TRUNC | MSG_PEEK))) {
user_recv = current;
tp->ucopy.task = user_recv;
tp->ucopy.iov = msg->msg_iov;
}
tp->ucopy.len = len;
WARN_ON(tp->copied_seq != tp->rcv_nxt &&
!(flags & (MSG_PEEK | MSG_TRUNC)));
/* Ugly... If prequeue is not empty, we have to
* process it before releasing socket, otherwise
* order will be broken at second iteration.
* More elegant solution is required!!!
*
* Look: we have the following (pseudo)queues:
*
* 1. packets in flight
* 2. backlog
* 3. prequeue
* 4. receive_queue
*
* Each queue can be processed only if the next ones
* are empty. At this point we have empty receive_queue.
* But prequeue _can_ be not empty after 2nd iteration,
* when we jumped to start of loop because backlog
* processing added something to receive_queue.
* We cannot release_sock(), because backlog contains
* packets arrived _after_ prequeued ones.
*
* Shortly, algorithm is clear --- to process all
* the queues in order. We could make it more directly,
* requeueing packets from backlog to prequeue, if
* is not empty. It is more elegant, but eats cycles,
* unfortunately.
*/
if (!skb_queue_empty(&tp->ucopy.prequeue))
goto do_prequeue;
/* __ Set realtime policy in scheduler __ */
}
#ifdef CONFIG_NET_DMA
if (tp->ucopy.dma_chan)
dma_async_memcpy_issue_pending(tp->ucopy.dma_chan);
#endif
if (copied >= target) {
/* Do not sleep, just process backlog. */
release_sock(sk);
lock_sock(sk);
} else
sk_wait_data(sk, &timeo);
#ifdef CONFIG_NET_DMA
tcp_service_net_dma(sk, false); /* Don't block */
tp->ucopy.wakeup = 0;
#endif
if (user_recv) {
int chunk;
/* __ Restore normal policy in scheduler __ */
if ((chunk = len - tp->ucopy.len) != 0) {
NET_ADD_STATS_USER(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMBACKLOG, chunk);
len -= chunk;
copied += chunk;
}
if (tp->rcv_nxt == tp->copied_seq &&
!skb_queue_empty(&tp->ucopy.prequeue)) {
do_prequeue:
tcp_prequeue_process(sk);
if ((chunk = len - tp->ucopy.len) != 0) {
NET_ADD_STATS_USER(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk);
len -= chunk;
copied += chunk;
}
}
}
if ((flags & MSG_PEEK) &&
(peek_seq - copied - urg_hole != tp->copied_seq)) {
if (net_ratelimit())
printk(KERN_DEBUG "TCP(%s:%d): Application bug, race in MSG_PEEK.\n",
current->comm, task_pid_nr(current));
peek_seq = tp->copied_seq;
}
continue;
found_ok_skb:
/* Ok so how much can we use? */
used = skb->len - offset;
if (len < used)
used = len;
/* Do we have urgent data here? */
if (tp->urg_data) {
u32 urg_offset = tp->urg_seq - *seq;
if (urg_offset < used) {
if (!urg_offset) {
if (!sock_flag(sk, SOCK_URGINLINE)) {
++*seq;
urg_hole++;
offset++;
used--;
if (!used)
goto skip_copy;
}
} else
used = urg_offset;
}
}
if (!(flags & MSG_TRUNC)) {
#ifdef CONFIG_NET_DMA
if (!tp->ucopy.dma_chan && tp->ucopy.pinned_list)
tp->ucopy.dma_chan = dma_find_channel(DMA_MEMCPY);
if (tp->ucopy.dma_chan) {
tp->ucopy.dma_cookie = dma_skb_copy_datagram_iovec(
tp->ucopy.dma_chan, skb, offset,
msg->msg_iov, used,
tp->ucopy.pinned_list);
if (tp->ucopy.dma_cookie < 0) {
printk(KERN_ALERT "dma_cookie < 0\n");
/* Exception. Bailout! */
if (!copied)
copied = -EFAULT;
break;
}
dma_async_memcpy_issue_pending(tp->ucopy.dma_chan);
if ((offset + used) == skb->len)
copied_early = 1;
} else
#endif
{
err = skb_copy_datagram_iovec(skb, offset,
msg->msg_iov, used);
if (err) {
/* Exception. Bailout! */
if (!copied)
copied = -EFAULT;
break;
}
}
}
*seq += used;
copied += used;
len -= used;
tcp_rcv_space_adjust(sk);
skip_copy:
if (tp->urg_data && after(tp->copied_seq, tp->urg_seq)) {
tp->urg_data = 0;
tcp_fast_path_check(sk);
}
if (used + offset < skb->len)
continue;
if (tcp_hdr(skb)->fin)
goto found_fin_ok;
if (!(flags & MSG_PEEK)) {
sk_eat_skb(sk, skb, copied_early);
copied_early = 0;
}
continue;
found_fin_ok:
/* Process the FIN. */
++*seq;
if (!(flags & MSG_PEEK)) {
sk_eat_skb(sk, skb, copied_early);
copied_early = 0;
}
break;
} while (len > 0);
if (user_recv) {
if (!skb_queue_empty(&tp->ucopy.prequeue)) {
int chunk;
tp->ucopy.len = copied > 0 ? len : 0;
tcp_prequeue_process(sk);
if (copied > 0 && (chunk = len - tp->ucopy.len) != 0) {
NET_ADD_STATS_USER(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk);
len -= chunk;
copied += chunk;
}
}
tp->ucopy.task = NULL;
tp->ucopy.len = 0;
}
#ifdef CONFIG_NET_DMA
tcp_service_net_dma(sk, true); /* Wait for queue to drain */
tp->ucopy.dma_chan = NULL;
if (tp->ucopy.pinned_list) {
dma_unpin_iovec_pages(tp->ucopy.pinned_list);
tp->ucopy.pinned_list = NULL;
}
#endif
/* According to UNIX98, msg_name/msg_namelen are ignored
* on connected socket. I was just happy when found this 8) --ANK
*/
/* Clean up data we have read: This will do ACK frames. */
tcp_cleanup_rbuf(sk, copied);
release_sock(sk);
return copied;
out:
release_sock(sk);
return err;
recv_urg:
err = tcp_recv_urg(sk, msg, len, flags);
goto out;
}
EXPORT_SYMBOL(tcp_recvmsg);