一、GSO/TSO
GSO 目前在内核5.10.* 版本时,已经是合入主线,就是对TCP/UDP都支持并且在网络协议栈GSO功能是默认打开的。虽然可以通过ethtool -K 网卡名 gso off 关闭,但是在L3/L4还是走GSO逻辑,关不掉。我目前是没有找到内核源码中,通过哪个控制变量或者数据结构来控制GSO 功能打开或者关闭。
TSO(transimit segment offload)是针对tcp而言的,是指协议栈可以将tcp 分段的操作offload到硬件的能力,本身需要硬件的支持。当网卡具有TSO能力时,上层协议栈可以直接下发一个超过MTU数据包,而把数据包拆分的动作交给硬件去做,节省cpu资源。除了TSO,内核还有一个GSO,GSO不区分协议类型,GSO默认是开启的,GSO是在软件上实现的一种延迟分段的技术,相比TSO,GSO最终还是需要协议栈自己完成分段的处理。
即使网卡没有TSO能力,传输层依然可以封装一个超过MTU的数据包,等数据包发送给驱动之前,检查网卡是否有TSO能力,如果没有,再调用ip层和传输层的分段处理函数完成数据包的分段处理,通过这样,内核将数据包的分段延迟到了dev链路层,提升数据包处理效率。当支持GSO/TSO时,skb的数据存放格式如下所示,在skb->end后,存在一个skb_share区域,skb的非线性区数据就存放在这里,GSO/TSO分段的处理就是要把skb数据(包括线性区、非线性区)按gso_size的大小进行分割处理;
二、以虚拟网卡驱动为例分析
2.1 TSO 硬件能力
源码路径:kernel/drivers/net/virtio_net.c
virtio驱动加载时,会根据qemu/vhost前后端feature协商的结果判断虚拟网卡是否有TSO能力,如果有,则在dev->hw_feature或上NETIF_F_TSO标志,然后赋给dev->features。
static int virtnet_probe(struct virtio_device *vdev)
{
int i, err = -ENOMEM;
struct net_device *dev;
struct virtnet_info *vi;
u16 max_queue_pairs;
int mtu;
/* Find if host supports multiqueue virtio_net device */
err = virtio_cread_feature(vdev, VIRTIO_NET_F_MQ,
struct virtio_net_config,
max_virtqueue_pairs, &max_queue_pairs);
....
/* Do we support "hardware" checksums? */
if (virtio_has_feature(vdev, VIRTIO_NET_F_CSUM)) {
/* This opens up the world of extra features. */
dev->hw_features |= NETIF_F_HW_CSUM | NETIF_F_SG;
if (csum)
dev->features |= NETIF_F_HW_CSUM | NETIF_F_SG;
if (virtio_has_feature(vdev, VIRTIO_NET_F_GSO)) {
dev->hw_features |= NETIF_F_TSO
| NETIF_F_TSO_ECN | NETIF_F_TSO6;
}
/* Individual feature bits: what can host handle? */
if (virtio_has_feature(vdev, VIRTIO_NET_F_HOST_TSO4))
dev->hw_features |= NETIF_F_TSO;
if (virtio_has_feature(vdev, VIRTIO_NET_F_HOST_TSO6))
dev->hw_features |= NETIF_F_TSO6;
if (virtio_has_feature(vdev, VIRTIO_NET_F_HOST_ECN))
dev->hw_features |= NETIF_F_TSO_ECN;
dev->features |= NETIF_F_GSO_ROBUST;
// gso=true
if (gso)
dev->features |= dev->hw_features & NETIF_F_ALL_TSO;
/* (!csum && gso) case will be fixed by register_netdev() */
}
if (virtio_has_feature(vdev, VIRTIO_NET_F_GUEST_CSUM))
dev->features |= NETIF_F_RXCSUM;
if (virtio_has_feature(vdev, VIRTIO_NET_F_GUEST_TSO4) ||
virtio_has_feature(vdev, VIRTIO_NET_F_GUEST_TSO6))
dev->features |= NETIF_F_GRO_HW;
if (virtio_has_feature(vdev, VIRTIO_NET_F_CTRL_GUEST_OFFLOADS))
dev->hw_features |= NETIF_F_GRO_HW;
dev->vlan_features = dev->features;
....
}
注册虚拟网卡设备时,设置GSO能力。
int register_netdevice(struct net_device *dev)
{
...
dev->hw_features |= NETIF_F_SOFT_FEATURES;
//dev->features是给协议栈用的
dev->features |= NETIF_F_SOFT_FEATURES;
...
}
已上是判断网卡是否支持TSO 的硬件能力。
2.2 GSO 软件逻辑是如何实现的?
在网络子系统初始化时,会注册TCP和UDP offload 的函数,也说明GSO 支持TCP 和UDP 。下面是注册流程:
inet_init()
->ipv4_offload_init()
ipv4_offload_init :
static int __init ipv4_offload_init(void)
{
/*
* Add offloads
*/
if (udpv4_offload_init() < 0)
pr_crit("%s: Cannot add UDP protocol offload\n", __func__);
if (tcpv4_offload_init() < 0)
pr_crit("%s: Cannot add TCP protocol offload\n", __func__);
if (ipip_offload_init() < 0)
pr_crit("%s: Cannot add IPIP protocol offload\n", __func__);
dev_add_offload(&ip_packet_offload);
return 0;
}
tcpv4_offload_init:
static const struct net_offload tcpv4_offload = {
.callbacks = {
.gso_segment = tcp4_gso_segment,
.gro_receive = tcp4_gro_receive,
.gro_complete = tcp4_gro_complete,
},
};
int __init tcpv4_offload_init(void)
{
return inet_add_offload(&tcpv4_offload, IPPROTO_TCP);
}
udpv4_offload_init:
static const struct net_offload udpv4_offload = {
.callbacks = {
.gso_segment = udp4_ufo_fragment,
.gro_receive = udp4_gro_receive,
.gro_complete = udp4_gro_complete,
},
};
int __init udpv4_offload_init(void)
{
return inet_add_offload(&udpv4_offload, IPPROTO_UDP);
}
同时通过dev_add_offload 注册了 packet_offload , 是先调用 packet_offload 匹配,再 调用net_offload 具体执行。
dev_add_offload :register offload handlers
static struct list_head offload_base __read_mostly;
void dev_add_offload(struct packet_offload *po)
{
struct packet_offload *elem;
spin_lock(&offload_lock);
list_for_each_entry(elem, &offload_base, list) {
if (po->priority < elem->priority)
break;
}
list_add_rcu(&po->list, elem->list.prev);
spin_unlock(&offload_lock);
}
EXPORT_SYMBOL(dev_add_offload);
上面注册了packet_offload 。
inet_add_offload:
const struct net_offload __rcu *inet_offloads[MAX_INET_PROTOS] __read_mostly;
EXPORT_SYMBOL(inet_offloads);
int inet_add_offload(const struct net_offload *prot, unsigned char protocol)
{
return !cmpxchg((const struct net_offload **)&inet_offloads[protocol],
NULL, prot) ? 0 : -1;
}
EXPORT_SYMBOL(inet_add_offload);
inet_add_offload 把tcpv4_offload 和udp_offload 保存在inet_offloads 数字中。
上面这部分就是实现了网络协议栈的GSO 功能。
上面已经把gso功能定义好,什么时候调用?
通过调用函数接口:skb_gso_segment ,只要调用这个接口,就是要判断数据包,执行tcp_offload 还是udp_offload;目前所知, 内核会在ip 层和 调用网卡驱动之前 执行 skb_gso_segment函数。
2.3 使用TCP 进行通信
在发送端发起connect连接或三次握手建立完成(tcp_v4_syn_recv_sock),会开启GSO。
/* This will initiate an outgoing connection. */
int tcp_v4_connect(struct sock *sk, struct sockaddr *uaddr, int addr_len)
{
......
/* OK, now commit destination to socket. */
sk->sk_gso_type = SKB_GSO_TCPV4;
sk_setup_caps(sk, &rt->dst);
......
}
tcp_v4_connect将sock的gso_type设置为tcpv4类型,然后调用sk_setup_caps,根据net_gso_ok返回值,判断是否支持GSO能力,正常这里是返回True。
//这个函数可以认为是检验是否具有tso能力,主要的调用地方有两个:
//1、tcp层connect或三次握手完成时调用,这个调用流程里,如果开启GSO,则features同时会置上TSO,
// 而GSO默认都是开启的,因此tcp层调用这个接口,会返回true
//2、dev层将skb发送给驱动前调用,判断是否需要做TSO,这个调用流程里,features直接等于dev->features,
// 如果网卡没有TSO能力,则features不会有TSO的标志,那这个函数就会返回false
static inline bool net_gso_ok(netdev_features_t features, int gso_type)
{
netdev_features_t feature = (netdev_features_t)gso_type << NETIF_F_GSO_SHIFT;
/* check flags correspondence */
BUILD_BUG_ON(SKB_GSO_TCPV4 != (NETIF_F_TSO >> NETIF_F_GSO_SHIFT));
BUILD_BUG_ON(SKB_GSO_DODGY != (NETIF_F_GSO_ROBUST >> NETIF_F_GSO_SHIFT));
BUILD_BUG_ON(SKB_GSO_TCP_ECN != (NETIF_F_TSO_ECN >> NETIF_F_GSO_SHIFT));
BUILD_BUG_ON(SKB_GSO_TCP_FIXEDID != (NETIF_F_TSO_MANGLEID >> NETIF_F_GSO_SHIFT));
BUILD_BUG_ON(SKB_GSO_TCPV6 != (NETIF_F_TSO6 >> NETIF_F_GSO_SHIFT));
BUILD_BUG_ON(SKB_GSO_FCOE != (NETIF_F_FSO >> NETIF_F_GSO_SHIFT));
BUILD_BUG_ON(SKB_GSO_GRE != (NETIF_F_GSO_GRE >> NETIF_F_GSO_SHIFT));
BUILD_BUG_ON(SKB_GSO_GRE_CSUM != (NETIF_F_GSO_GRE_CSUM >> NETIF_F_GSO_SHIFT));
BUILD_BUG_ON(SKB_GSO_IPXIP4 != (NETIF_F_GSO_IPXIP4 >> NETIF_F_GSO_SHIFT));
BUILD_BUG_ON(SKB_GSO_IPXIP6 != (NETIF_F_GSO_IPXIP6 >> NETIF_F_GSO_SHIFT));
BUILD_BUG_ON(SKB_GSO_UDP_TUNNEL != (NETIF_F_GSO_UDP_TUNNEL >> NETIF_F_GSO_SHIFT));
BUILD_BUG_ON(SKB_GSO_UDP_TUNNEL_CSUM != (NETIF_F_GSO_UDP_TUNNEL_CSUM >> NETIF_F_GSO_SHIFT));
BUILD_BUG_ON(SKB_GSO_PARTIAL != (NETIF_F_GSO_PARTIAL >> NETIF_F_GSO_SHIFT));
BUILD_BUG_ON(SKB_GSO_TUNNEL_REMCSUM != (NETIF_F_GSO_TUNNEL_REMCSUM >> NETIF_F_GSO_SHIFT));
BUILD_BUG_ON(SKB_GSO_SCTP != (NETIF_F_GSO_SCTP >> NETIF_F_GSO_SHIFT));
BUILD_BUG_ON(SKB_GSO_ESP != (NETIF_F_GSO_ESP >> NETIF_F_GSO_SHIFT));
BUILD_BUG_ON(SKB_GSO_UDP != (NETIF_F_GSO_UDP >> NETIF_F_GSO_SHIFT));
BUILD_BUG_ON(SKB_GSO_UDP_L4 != (NETIF_F_GSO_UDP_L4 >> NETIF_F_GSO_SHIFT));
BUILD_BUG_ON(SKB_GSO_FRAGLIST != (NETIF_F_GSO_FRAGLIST >> NETIF_F_GSO_SHIFT));
return (features & feature) == feature;
}
协议层校验支持gso后,会同时开启分散、聚合及csum校验能力。
void sk_setup_caps(struct sock *sk, struct dst_entry *dst)
{
u32 max_segs = 1;
sk_dst_set(sk, dst);
sk->sk_route_caps = dst->dev->features | sk->sk_route_forced_caps;
if (sk->sk_route_caps & NETIF_F_GSO)
sk->sk_route_caps |= NETIF_F_GSO_SOFTWARE;
sk->sk_route_caps &= ~sk->sk_route_nocaps;
if (sk_can_gso(sk)) {
//skb头部需要额外空间,关闭GSO
if (dst->header_len && !xfrm_dst_offload_ok(dst)) {
sk->sk_route_caps &= ~NETIF_F_GSO_MASK;
} else {
//开启skb的分散、聚合及csum功能,因为网卡做TSO的同时需要支持分散、聚合功能以及csum的重新计算能力
sk->sk_route_caps |= NETIF_F_SG | NETIF_F_HW_CSUM;
sk->sk_gso_max_size = dst->dev->gso_max_size;
max_segs = max_t(u32, dst->dev->gso_max_segs, 1);
}
}
sk->sk_gso_max_segs = max_segs;
}
EXPORT_SYMBOL_GPL(sk_setup_caps);
2.4 应用程序调用send发送数据包
send系统调用最终调用tcp_sendmsg,在tcp_sendmsg里判断是否支持GSO,支持的话将用户数据信息封装到skb的线性区或非线性区,封装完后的skb数据包就是一个大包了,然后调用tcp_push_one发送给IP层,当然发送之前还会调用check函数,根据csum的类型计算tcp层的csum,支持GSO、TSO情况下,tcp层只会计算伪头部的csum。
int tcp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
size_t size)
{
struct iovec *iov;
struct tcp_sock *tp = tcp_sk(sk);
struct sk_buff *skb;
int iovlen, flags, err, copied = 0;
int mss_now = 0, size_goal, copied_syn = 0, offset = 0;
bool sg;
long timeo;
lock_sock(sk);
flags = msg->msg_flags;
if (flags & MSG_FASTOPEN) {
err = tcp_sendmsg_fastopen(sk, msg, &copied_syn, size);
if (err == -EINPROGRESS && copied_syn > 0)
goto out;
else if (err)
goto out_err;
offset = copied_syn;
}
timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
/* Wait for a connection to finish. One exception is TCP Fast Open
* (passive side) where data is allowed to be sent before a connection
* is fully established.
*/
if (((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT)) &&
!tcp_passive_fastopen(sk)) {
if ((err = sk_stream_wait_connect(sk, &timeo)) != 0)
goto do_error;
}
if (unlikely(tp->repair)) {
if (tp->repair_queue == TCP_RECV_QUEUE) {
copied = tcp_send_rcvq(sk, msg, size);
goto out_nopush;
}
err = -EINVAL;
if (tp->repair_queue == TCP_NO_QUEUE)
goto out_err;
/* 'common' sending to sendq */
}
/* This should be in poll */
clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
//获取mss,如果支持GSO,这里获取的是协商到的mss的整数倍
mss_now = tcp_send_mss(sk, &size_goal, flags);
/* Ok commence sending. */
iovlen = msg->msg_iovlen;
iov = msg->msg_iov;
copied = 0;
err = -EPIPE;
if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN))
goto out_err;
//判断是否有分散、聚合能力
sg = !!(sk->sk_route_caps & NETIF_F_SG);
while (--iovlen >= 0) {
size_t seglen = iov->iov_len;
unsigned char __user *from = iov->iov_base;
iov++;
if (unlikely(offset > 0)) { /* Skip bytes copied in SYN */
if (offset >= seglen) {
offset -= seglen;
continue;
}
seglen -= offset;
from += offset;
offset = 0;
}
while (seglen > 0) {
int copy = 0;
int max = size_goal;
//获取write队列的最后一个skb
skb = tcp_write_queue_tail(sk);
if (tcp_send_head(sk)) {
if (skb->ip_summed == CHECKSUM_NONE)
max = mss_now;
//copy表示skb可以存放的最大的数据长度
copy = max - skb->len;
}
//如果skb->len >= max,说明这个skb已经填满数据了;需要重新分配一个skb
if (copy <= 0) {
new_segment:
/* Allocate new segment. If the interface is SG,
* allocate skb fitting to single page.
*/
if (!sk_stream_memory_free(sk))
goto wait_for_sndbuf;
skb = sk_stream_alloc_skb(sk,
//获取skb头的长度
select_size(sk, sg),
sk->sk_allocation);
if (!skb)
goto wait_for_memory;
/*
* Check whether we can use HW checksum.
*/
//在sk_setup_caps,已经置上的csum能力,设置ip_summed模式为CHECKSUM_PARTIAL
//意思是协议栈只做ip头和伪头部的checksum,palyload需要硬件帮忙做
if (sk->sk_route_caps & NETIF_F_CSUM_MASK)
skb->ip_summed = CHECKSUM_PARTIAL;
//分完一个skb后,将其加入sk->sk_write_queue队列中
skb_entail(sk, skb);
copy = size_goal;
max = size_goal;
/* All packets are restored as if they have
* already been sent. skb_mstamp isn't set to
* avoid wrong rtt estimation.
*/
if (tp->repair)
TCP_SKB_CB(skb)->sacked |= TCPCB_REPAIRED;
}
/* Try to append data to the end of skb. */
//拷贝的数据最多不超过用户发送的消息长度
if (copy > seglen)
copy = seglen;
/* Where to copy to? */
//判断线性区是否还有空间
if (skb_availroom(skb) > 0) {
/* We have some space in skb head. Superb! */
copy = min_t(int, copy, skb_availroom(skb));
//将用户数据拷贝到线性区,并同步更新skb->len
err = skb_add_data_nocache(sk, skb, from, copy);
if (err)
goto do_fault;
} else {
//线性区已经没有空间,将报文信息放到skinfo里
bool merge = true;
int i = skb_shinfo(skb)->nr_frags;
struct page_frag *pfrag = sk_page_frag(sk);
//sk_page_frag表示当前的skb_shinfo的最后一个frags,这里判断最后一个frags的page是否还有
//空间可以存放数据(最小是32字节),如果没有,则重新分配一个page并放到pfrag->page里
if (!sk_page_frag_refill(sk, pfrag))
goto wait_for_memory;
//判断pfrag->page是否是sk_page_frag指向的最后一个page,如果是,则表明上一步判断
//sk_page_frag里的page还有足够空间可以存放数据;
//如果不是,则表明上一步有重新分配过page页,把merge置为false,接下去需要把这个新分配的page页
//添加到skb_shinfo里.
if (!skb_can_coalesce(skb, i, pfrag->page,
pfrag->offset)) {
if (i == MAX_SKB_FRAGS || !sg) {
tcp_mark_push(tp, skb);
goto new_segment;
}
merge = false;
}
//取需要拷贝的数据包长度与page剩余空间的最小值
copy = min_t(int, copy, pfrag->size - pfrag->offset);
if (!sk_wmem_schedule(sk, copy))
goto wait_for_memory;
//拷贝用户数据到pfrag->page里,并同步更新skb->len和skb->data_len,所以skb->data_len只表示非线性区的长度
err = skb_copy_to_page_nocache(sk, from, skb,
pfrag->page,
pfrag->offset,
copy);
if (err)
goto do_error;
/* Update the skb. */
//如果是合并操作,则修改最后一个page的大小信息
if (merge) {
skb_frag_size_add(&skb_shinfo(skb)->frags[i - 1], copy);
} else {
//新分配的page页,添加到skb_shinfo(skb)->frags[i]里
//同时将skb_shinfo(skb)->nr_frags值增1
skb_fill_page_desc(skb, i, pfrag->page,
pfrag->offset, copy);
//page引用计数加1
get_page(pfrag->page);
}
pfrag->offset += copy;
}
if (!copied)
TCP_SKB_CB(skb)->tcp_flags &= ~TCPHDR_PSH;
tp->write_seq += copy;
TCP_SKB_CB(skb)->end_seq += copy;
skb_shinfo(skb)->gso_segs = 0;
from += copy;
copied += copy;
if ((seglen -= copy) == 0 && iovlen == 0)
goto out;
if (skb->len < max || (flags & MSG_OOB) || unlikely(tp->repair))
continue;
if (forced_push(tp)) {
tcp_mark_push(tp, skb);
__tcp_push_pending_frames(sk, mss_now, TCP_NAGLE_PUSH);
} else if (skb == tcp_send_head(sk))
tcp_push_one(sk, mss_now);
continue;
wait_for_sndbuf:
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
wait_for_memory:
if (copied)
tcp_push(sk, flags & ~MSG_MORE, mss_now,
TCP_NAGLE_PUSH, size_goal);
if ((err = sk_stream_wait_memory(sk, &timeo)) != 0)
goto do_error;
mss_now = tcp_send_mss(sk, &size_goal, flags);
}
}
out:
if (copied)
tcp_push(sk, flags, mss_now, tp->nonagle, size_goal);
out_nopush:
release_sock(sk);
return copied + copied_syn;
do_fault:
if (!skb->len) {
tcp_unlink_write_queue(skb, sk);
/* It is the one place in all of TCP, except connection
* reset, where we can be unlinking the send_head.
*/
tcp_check_send_head(sk, skb);
sk_wmem_free_skb(sk, skb);
}
do_error:
if (copied + copied_syn)
goto out;
out_err:
err = sk_stream_error(sk, flags, err);
release_sock(sk);
return err;
}
2.5 tcp_write_xmit
在tcp_sendmsg 函数中,会调用__tcp_push_pending_frames 函数和tcp_push_one 函数,进行发送数据包,这两个函数,都会调用tcp_write_xmit 函数。
在tcp_write_xmit流程里,通过tcp_init_tso_segs设置gso_size及分段个数gso_segs,其中gso_size为mss值,这两个参数用于告诉硬件做tso拆分时,需要拆分成的数据包个数及长度。
static void tcp_set_skb_tso_segs(const struct sock *sk, struct sk_buff *skb,
unsigned int mss_now)
{
struct skb_shared_info *shinfo = skb_shinfo(skb);
/* Make sure we own this skb before messing gso_size/gso_segs */
WARN_ON_ONCE(skb_cloned(skb));
if (skb->len <= mss_now || !sk_can_gso(sk) ||
skb->ip_summed == CHECKSUM_NONE) {
/* Avoid the costly divide in the normal
* non-TSO case.
*/
shinfo->gso_segs = 1;
shinfo->gso_size = 0;
shinfo->gso_type = 0;
} else {
//gso_segs为数据包总长度除mss
shinfo->gso_segs = DIV_ROUND_UP(skb->len, mss_now);
//gso_size为mss值,硬件做拆分时,会按gso_size的长度拆分每个数据包
shinfo->gso_size = mss_now;
shinfo->gso_type = sk->sk_gso_type;
}
}
2.6 dev层进一步检验网卡是否具有TSO 能力
dev层发送给驱动之前,进一步校验网卡是否具有TSO能力,如果没有,则回调tcp的分段函数完成skb的分段处理,如果支持,则直接发送给驱动;
发送数据包 流程图:
__dev_queue_xmit->
validate_xmit_skb
static struct sk_buff *validate_xmit_skb(struct sk_buff *skb, struct net_device *dev)
{
netdev_features_t features;
if (skb->next)
return skb;
features = netif_skb_features(skb);
skb = validate_xmit_vlan(skb, features);
if (unlikely(!skb))
goto out_null;
//这里的featrues就是dev->features,如果网卡不具有TSO,则dev->features不会有TSO的标志,这个函数返回true
if (netif_needs_gso(skb, features)) {
struct sk_buff *segs;
segs = skb_gso_segment(skb, features);
if (IS_ERR(segs)) {
goto out_kfree_skb;
} else if (segs) {
consume_skb(skb);
skb = segs;
}
} else {
if (skb_needs_linearize(skb, features) &&
__skb_linearize(skb))
goto out_kfree_skb;
/* If packet is not checksummed and device does not
* support checksumming for this protocol, complete
* checksumming here.
*/
if (skb->ip_summed == CHECKSUM_PARTIAL) {
if (skb->encapsulation)
skb_set_inner_transport_header(skb,
skb_checksum_start_offset(skb));
else
skb_set_transport_header(skb,
skb_checksum_start_offset(skb));
if (skb_csum_hwoffload_help(skb, features))
goto out_kfree_skb;
}
}
return skb;
out_kfree_skb:
kfree_skb(skb);
out_null:
return NULL;
}
2.7 skb_gso_segment
如果需要做gso分段,则先进入ip层的分段处理,在ip层分段处理函数里,主要工作是调用tcp层的分段处理函数,等tcp层分段完成后,重新对分段的skb的ip头做checksum;
skb_gso_segment->
__skb_gso_segment->
skb_mac_gso_segment ->#net/core/dev.c 路径
inet_gso_segment->
tcp4_gso_segment
这里会有一个问题: 在IP层,已经进行gso 的操作了,在这里为什么又一次进行判断,然后再进行gso 分段?有必要执行两次gso 分段?
下面看一下ip 层gso 的功能:
__ip_finish_output->
ip_finish_output_gso->
skb_gso_segment->
__skb_gso_segment->
skb_mac_gso_segment ->#net/core/dev.c 路径
inet_gso_segment->
tcp4_gso_segment
上面这个问题,还没有找到答案,只能认为是进一步检验,防止出错。
2.8 inet_gso_segment
static struct sk_buff *inet_gso_segment(struct sk_buff *skb,
netdev_features_t features)
{
bool udpfrag = false, fixedid = false, gso_partial, encap;
struct sk_buff *segs = ERR_PTR(-EINVAL);
const struct net_offload *ops;
unsigned int offset = 0;
struct iphdr *iph;
int proto, tot_len;
int nhoff;
int ihl;
int id;
//设置ip头基于head的偏移
skb_reset_network_header(skb);
//ip头基于mac头的偏移,即使就是mac头的长度
nhoff = skb_network_header(skb) - skb_mac_header(skb);
if (unlikely(!pskb_may_pull(skb, sizeof(*iph))))
goto out;
//检验首部中的长度字段是否有效
iph = ip_hdr(skb);
ihl = iph->ihl * 4;
if (ihl < sizeof(*iph))
goto out;
id = ntohs(iph->id);
proto = iph->protocol;
// 再次通过首部中的长度字段检测skb 长度是否有效
/* Warning: after this point, iph might be no longer valid */
if (unlikely(!pskb_may_pull(skb, ihl)))
goto out;
//注意:这里已经将data 偏移到了传输层头部了,去掉了ip头
__skb_pull(skb, ihl);
encap = SKB_GSO_CB(skb)->encap_level > 0;
if (encap)
features &= skb->dev->hw_enc_features;
SKB_GSO_CB(skb)->encap_level += ihl;
//设置tcp头基于head的偏移
skb_reset_transport_header(skb);
segs = ERR_PTR(-EPROTONOSUPPORT);
if (!skb->encapsulation || encap) {
udpfrag = !!(skb_shinfo(skb)->gso_type & SKB_GSO_UDP);
fixedid = !!(skb_shinfo(skb)->gso_type & SKB_GSO_TCP_FIXEDID);
/* fixed ID is invalid if DF bit is not set */
if (fixedid && !(ip_hdr(skb)->frag_off & htons(IP_DF)))
goto out;
}
ops = rcu_dereference(inet_offloads[proto]);//根据协议字段取得上层的协议接口
if (likely(ops && ops->callbacks.gso_segment))
segs = ops->callbacks.gso_segment(skb, features);//调用上册协议的gso处理函数
if (IS_ERR_OR_NULL(segs))
goto out;
gso_partial = !!(skb_shinfo(segs)->gso_type & SKB_GSO_PARTIAL);
//开始处理分段后的skb
skb = segs;
do {
//重新为每个分段skb设置ip头信息
iph = (struct iphdr *)(skb_mac_header(skb) + nhoff);
if (udpfrag) {//对于UDP进行IP分片的头部处理函数
iph->frag_off = htons(offset >> 3);//所有UDP的IP分片id都相同,ip头部偏移8字节
if (skb->next != NULL)
iph->frag_off |= htons(IP_MF);//设置分片标识
offset += skb->len - nhoff - ihl;
tot_len = skb->len - nhoff;
} else if (skb_is_gso(skb)) {//对于TCP报文,分片后IP头部中id 加 1
if (!fixedid) {
iph->id = htons(id);
id += skb_shinfo(skb)->gso_segs;
}
if (gso_partial)
tot_len = skb_shinfo(skb)->gso_size +
SKB_GSO_CB(skb)->data_offset +
skb->head - (unsigned char *)iph;
else
tot_len = skb->len - nhoff;
} else {
if (!fixedid)
iph->id = htons(id++);
tot_len = skb->len - nhoff;
}
iph->tot_len = htons(tot_len);
//为每个分段skb的ip头做checksum
ip_send_check(iph);
if (encap)
skb_reset_inner_headers(skb);
skb->network_header = (u8 *)iph - skb->head;
} while ((skb = skb->next));
out:
return segs;
}
这里有一个问题:UDP经过GSO分片后 每个分片的IP头部id都是一样的,这符合IP分片的逻辑,但是为什么TCP的GSO分片,IP头部的id 会依次加1 ?
原因是:tcp 建立三次握手的过程中产生合适的mss,这个mss 肯定是<=网络层的最大MTU,然后tcp数据封装成ip数据包通过网络层发送,当服务器端传输层 接收到tcp数据之后进行tcp重组。所以正常情况下tcp产生的ip数据包在传输过程中 是不会发生分片的!由于GSO应该保证对外透明,所以其效果应该也和在TCP层直接分片的效果是一样的,所以这里对UDP的处理是IP分片逻辑,但是对TCP的处理是构造新的skb逻辑。
小结:对于GSO
UDP:所有分片ip 头部id 都是相同,设置IP_MF分片标志(除最后一片)(等同于IP分片)
TCP:分片后,每个分片IP头部中id加1,(等同于TCP分段)
2.9 tcp_gso_segment
进入tcp层分段处理函数后,会调用tcp_gso_segment完成skb分段,分段完成后,重新为每个分段skb做tcp层的checksum,以及为每个分段skb重新分配seq序列号等;
struct sk_buff *tcp_gso_segment(struct sk_buff *skb,
netdev_features_t features)
{
struct sk_buff *segs = ERR_PTR(-EINVAL);
unsigned int sum_truesize = 0;
struct tcphdr *th;
unsigned int thlen;
unsigned int seq;
__be32 delta;
unsigned int oldlen;
unsigned int mss;
struct sk_buff *gso_skb = skb;
__sum16 newcheck;
bool ooo_okay, copy_destructor;
th = tcp_hdr(skb);
thlen = th->doff * 4; //TCP头部的长度字段单位为4字节
if (thlen < sizeof(*th))
goto out;
//再次通过首部中的长度字段检测skb 长度是否有效
if (!pskb_may_pull(skb, thlen))
goto out;
//把tcp header 移到skb header里,把skb->len 保存到oldlen中,此时skb->len就只有ip payload 的长度,包含TCP首部
oldlen = (u16)~skb->len;
__skb_pull(skb, thlen);//data 指向tcp payload
mss = skb_shinfo(skb)->gso_size;//这里可以看出gso_size 就是mss
if (unlikely(skb->len <= mss)) //如果skb 长度小于mss 就不需要GSO分片处理了
goto out;
if (skb_gso_ok(skb, features | NETIF_F_GSO_ROBUST)) {
/* Packet is from an untrusted source, reset gso_segs. */
//计算出skb 按照mss 的长度需要分多少片,赋值给gso_segs
skb_shinfo(skb)->gso_segs = DIV_ROUND_UP(skb->len, mss);
segs = NULL;
goto out;
}
copy_destructor = gso_skb->destructor == tcp_wfree;
ooo_okay = gso_skb->ooo_okay;
/* All segments but the first should have ooo_okay cleared */
skb->ooo_okay = 0;
//真正做分段的处理函数
segs = skb_segment(skb, features);
if (IS_ERR(segs))
goto out;
/* Only first segment might have ooo_okay set */
segs->ooo_okay = ooo_okay;
/* GSO partial and frag_list segmentation only requires splitting
* the frame into an MSS multiple and possibly a remainder, both
* cases return a GSO skb. So update the mss now.
*/
if (skb_is_gso(segs))
mss *= skb_shinfo(segs)->gso_segs;
delta = htonl(oldlen + (thlen + mss));
skb = segs;
th = tcp_hdr(skb);
seq = ntohl(th->seq);
newcheck = ~csum_fold((__force __wsum)((__force u32)th->check +
(__force u32)delta));
while (skb->next) {
th->fin = th->psh = 0;
th->check = newcheck;
//为每个分段skb做tcp层checksum
if (skb->ip_summed == CHECKSUM_PARTIAL)
gso_reset_checksum(skb, ~th->check);
else
th->check = gso_make_checksum(skb, ~th->check);
//设置skb的序列号,拆分后,除了最后一个skb,其余大小均为mss
seq += mss;
if (copy_destructor) {
skb->destructor = gso_skb->destructor;
skb->sk = gso_skb->sk;
sum_truesize += skb->truesize;
}
skb = skb->next;
th = tcp_hdr(skb);
th->seq = htonl(seq);
th->cwr = 0;
}
/* Following permits TCP Small Queues to work well with GSO :
* The callback to TCP stack will be called at the time last frag
* is freed at TX completion, and not right now when gso_skb
* is freed by GSO engine
*/
if (copy_destructor) {
swap(gso_skb->sk, skb->sk);
swap(gso_skb->destructor, skb->destructor);
sum_truesize += skb->truesize;
atomic_add(sum_truesize - gso_skb->truesize,
&skb->sk->sk_wmem_alloc);
}
delta = htonl(oldlen + (skb_tail_pointer(skb) -
skb_transport_header(skb)) +
skb->data_len);
th->check = ~csum_fold((__force __wsum)((__force u32)th->check +
(__force u32)delta));
if (skb->ip_summed == CHECKSUM_PARTIAL)
gso_reset_checksum(skb, ~th->check);
else
th->check = gso_make_checksum(skb, ~th->check);
out:
return segs;
}
可以看到tcp_gso_segment里真正去做skb分段处理的是在skb_segment,skb_segment里将tso的skb按mss长度进行分段处理,对线性区域的数据,直接拷贝到分段skb的线性区域,对于非线性区域数据,直接将frags指针指向分段skb的frags;
从上面可以看出,每个TCP 的GSO分片包含了TCP 头部信息的,这也符合TCP 层的分段逻辑。另外注意 这里传递给skb_segment做分段时是不带tcp 首部的。对于UDP,其GSO处理函数是udp4_ufo_fragment.
3.0 udp4_ufo_fragment
static struct sk_buff *udp4_ufo_fragment(struct sk_buff *skb,
netdev_features_t features)
{
struct sk_buff *segs = ERR_PTR(-EINVAL);
unsigned int mss;
__wsum csum;
struct udphdr *uh;
struct iphdr *iph;
if (skb->encapsulation &&
(skb_shinfo(skb)->gso_type &
(SKB_GSO_UDP_TUNNEL|SKB_GSO_UDP_TUNNEL_CSUM))) {
segs = skb_udp_tunnel_segment(skb, features, false);
goto out;
}
if (!(skb_shinfo(skb)->gso_type & (SKB_GSO_UDP | SKB_GSO_UDP_L4)))
goto out;
if (!pskb_may_pull(skb, sizeof(struct udphdr)))
goto out;
if (skb_shinfo(skb)->gso_type & SKB_GSO_UDP_L4)
return __udp_gso_segment(skb, features, false);
mss = skb_shinfo(skb)->gso_size;
if (unlikely(skb->len <= mss))
goto out;
/* Do software UFO. Complete and fill in the UDP checksum as
* HW cannot do checksum of UDP packets sent as multiple
* IP fragments.
*/
//计算udp 的checksum
uh = udp_hdr(skb);
iph = ip_hdr(skb);
uh->check = 0;
csum = skb_checksum(skb, 0, skb->len, 0);
uh->check = udp_v4_check(skb->len, iph->saddr, iph->daddr, csum);
if (uh->check == 0)
uh->check = CSUM_MANGLED_0;
skb->ip_summed = CHECKSUM_UNNECESSARY;
/* If there is no outer header we can fake a checksum offload
* due to the fact that we have already done the checksum in
* software prior to segmenting the frame.
*/
if (!skb->encap_hdr_csum)
features |= NETIF_F_HW_CSUM;
/* Fragment the skb. IP headers of the fragments are updated in
* inet_gso_segment()
*/
//这里传递skb_segment做分片是没有将UDP首部去除的
segs = skb_segment(skb, features);
out:
return segs;
}
注意这里传递给skb_segment 做分片是带有udp首部的,分片将udp 首部作为普通数据切分,这也意味着对于UDP的gso 分片,只有第一片有UDP首部。udp 的分段其实和ip 的分片没什么区别,只是多一个计算checksum 的步骤,下面看完成分片的关键函数 skb_segment。
3.1 skb_segment
struct sk_buff *skb_segment(struct sk_buff *head_skb,
netdev_features_t features)
{
struct sk_buff *segs = NULL;
struct sk_buff *tail = NULL;
//flag_list存放ip分片数包,在ip_do_fragment里会设置
struct sk_buff *list_skb = skb_shinfo(head_skb)->frag_list;
//frags存放分散、聚合的非线性区数据包
skb_frag_t *frag = skb_shinfo(head_skb)->frags;
unsigned int mss = skb_shinfo(head_skb)->gso_size;
//doffset为mac头+ip头+tcp头 或 mac头+ip头(对UDP传入时,没有将头部偏移出去)
unsigned int doffset = head_skb->data - skb_mac_header(head_skb);
struct sk_buff *frag_skb = head_skb;
unsigned int offset = doffset;
unsigned int tnl_hlen = skb_tnl_header_len(head_skb);
unsigned int partial_segs = 0;
unsigned int headroom;
unsigned int len = head_skb->len;
__be16 proto;
bool csum, sg;
int nfrags = skb_shinfo(head_skb)->nr_frags;
int err = -ENOMEM;
int i = 0;
int pos;
int dummy;
//为首个skb分配ip、mac头空间
__skb_push(head_skb, doffset);
proto = skb_network_protocol(head_skb, &dummy);
if (unlikely(!proto))
return ERR_PTR(-EINVAL);
sg = !!(features & NETIF_F_SG);
csum = !!can_checksum_protocol(features, proto);
if (sg && csum && (mss != GSO_BY_FRAGS)) {
if (!(features & NETIF_F_GSO_PARTIAL)) {
struct sk_buff *iter;
if (!list_skb ||
!net_gso_ok(features, skb_shinfo(head_skb)->gso_type))
goto normal;
/* Split the buffer at the frag_list pointer.
* This is based on the assumption that all
* buffers in the chain excluding the last
* containing the same amount of data.
*/
skb_walk_frags(head_skb, iter) {
if (skb_headlen(iter))
goto normal;
len -= iter->len;
}
}
/* GSO partial only requires that we trim off any excess that
* doesn't fit into an MSS sized block, so take care of that
* now.
*/
partial_segs = len / mss;
if (partial_segs > 1)
mss *= partial_segs;
else
partial_segs = 0;
}
normal:
headroom = skb_headroom(head_skb);
//pos 获取线性区的长度skb->len - skb->data_len
pos = skb_headlen(head_skb);
do {
struct sk_buff *nskb;
skb_frag_t *nskb_frag;
int hsize;
int size;
if (unlikely(mss == GSO_BY_FRAGS)) {
len = list_skb->len;
} else {
//每新增一个分段skb,offset累加分段skb的长度
//len为下一个新增分段skb的长度, 最多不超过mss值
//开始时,offset只是mac header+ ip header +tcp header的长度。随着segment 增加,
// offset 每次都增加mss长度。因此len是每个segment 的payload 长度。
len = head_skb->len - offset;
if (len > mss)
len = mss;
}
//skb_headlen = skb->len - skb->data_len表示skb的线性区域长度,当第一次分段时,
//理论上线性区域就拷贝完成了,因此第二次的时候hsize应该就小于0了;
//当hsize小于0时,直接将其赋值为0,接下来新分配的分段skb就不会再申请skb->data
//线性空间了,而是直接拷贝非线性区的数据
//如果hsize 小于0,那么说明payload 在skb 的frags或frag_list中。随着offset一直增加,
//必定会有hsize 小于0 的情况出现,除非skb 是一个linearize 化的skb
hsize = skb_headlen(head_skb) - offset;
if (hsize < 0)
hsize = 0;
//如果不支持NETIF_F_SG 或者hsize大于len,那么hsize就为len(本次新分片的长度),此时说明segment的payload 还在skb 线性区中
if (hsize > len || !sg)
hsize = len;
//拷贝ip分片的数据包
//hsize 为0,表示需要从frags 数组或frag_list 链表中拷贝出数据,i>= nfrags 说明frags
//数组中的数据也拷贝完了,下面需要从frag_list链表中拷贝数据了
if (!hsize && i >= nfrags && skb_headlen(list_skb) &&
(skb_headlen(list_skb) == len || sg)) {
BUG_ON(skb_headlen(list_skb) > len);
i = 0;
nfrags = skb_shinfo(list_skb)->nr_frags;
frag = skb_shinfo(list_skb)->frags;
frag_skb = list_skb;
pos += skb_headlen(list_skb);
while (pos < offset + len) {
BUG_ON(i >= nfrags);
size = skb_frag_size(frag);
if (pos + size > offset + len)
break;
i++;
pos += size;
frag++;
}
//frag_list 的数据不用真的拷贝,只需要拷贝其skb描述符,就可以复用其数据区
nskb = skb_clone(list_skb, GFP_ATOMIC);//拷贝frag_list中的skb的描述符
list_skb = list_skb->next;//指向frag_list的下一个skb元素
if (unlikely(!nskb))
goto err;
if (unlikely(pskb_trim(nskb, len))) {
kfree_skb(nskb);
goto err;
}
hsize = skb_end_offset(nskb);
//保证新的skb的headroom有mac header + ip header + tcp/udp header 的大小
if (skb_cow_head(nskb, doffset + headroom)) {
kfree_skb(nskb);
goto err;
}
//调整truesize, 使其包含本次已分片的数据部分长度(hsize)
nskb->truesize += skb_end_offset(nskb) - hsize;
skb_release_head_state(nskb);
__skb_push(nskb, doffset);
} else {
//数据从skb的线性区或frags数组中取得
//注意,每次要拷贝出的数据长度为len,其中hsize位于线性区
nskb = __alloc_skb(hsize + doffset + headroom,
GFP_ATOMIC, skb_alloc_rx_flag(head_skb),
NUMA_NO_NODE);
if (unlikely(!nskb))
goto err;
skb_reserve(nskb, headroom);
__skb_put(nskb, doffset);
}
//segs为空时,nskb作为首个skb赋给segs,否则将新的nskb挂到next里
if (segs)
tail->next = nskb;
else
segs = nskb;
tail = nskb;
//从首个skb里拷贝skb头信息
__copy_skb_header(nskb, head_skb);
skb_headers_offset_update(nskb, skb_headroom(nskb) - headroom);
//设置mac头长度
skb_reset_mac_len(nskb);
skb_copy_from_linear_data_offset(head_skb, -tnl_hlen,
nskb->data - tnl_hlen,
doffset + tnl_hlen);
if (nskb->len == len + doffset)
goto perform_csum_check;
//如果不支持NETIF_F_SG,说明frags数组没有数据,只考虑从线性区中拷贝数据
if (!sg) {
if (!nskb->remcsum_offload)
nskb->ip_summed = CHECKSUM_NONE;
SKB_GSO_CB(nskb)->csum =
skb_copy_and_csum_bits(head_skb, offset,
skb_put(nskb, len),
len, 0);
SKB_GSO_CB(nskb)->csum_start =
skb_headroom(nskb) + doffset;
continue;
}
nskb_frag = skb_shinfo(nskb)->frags;
//如果hsize 不为0,将skb的线性区域拷贝到nskb上
//hsize=0时,没有线性区域数据需要拷贝
//
skb_copy_from_linear_data_offset(head_skb, offset,
skb_put(nskb, hsize), hsize);
skb_shinfo(nskb)->tx_flags = skb_shinfo(head_skb)->tx_flags &
SKBTX_SHARED_FRAG;
//pos的初始值为线性区长度,offset+len表示本地分段skb做完后,需要拷贝的总的数据长度,
//当pos<offset+len时,表示线性区已经拷贝完了,但每次分段skb数据还没拷贝完成,
//接来下就只能从非线性区拷贝了,每拷贝完一个frags,pos长度也相应增加对应
//的frags长度,直到本次分段skb数据拷贝完成
while (pos < offset + len) {
//非线性区frags也拷贝完成,则从ip分片区拷贝
if (i >= nfrags) {
BUG_ON(skb_headlen(list_skb));
i = 0;
nfrags = skb_shinfo(list_skb)->nr_frags;
frag = skb_shinfo(list_skb)->frags;
frag_skb = list_skb;
BUG_ON(!nfrags);
list_skb = list_skb->next;
}
if (unlikely(skb_shinfo(nskb)->nr_frags >=
MAX_SKB_FRAGS)) {
net_warn_ratelimited(
"skb_segment: too many frags: %u %u\n",
pos, mss);
goto err;
}
if (unlikely(skb_orphan_frags(frag_skb, GFP_ATOMIC)))
goto err;
//拷贝非线性区时,并非是把原来的非线性区数据拷贝到新的分段skb的线性区
//而是直接将分段skb的frags指针指向原来skb的frags
*nskb_frag = *frag;
__skb_frag_ref(nskb_frag);
size = skb_frag_size(nskb_frag);
if (pos < offset) {
nskb_frag->page_offset += offset - pos;
skb_frag_size_sub(nskb_frag, offset - pos);
}
skb_shinfo(nskb)->nr_frags++;
//拷贝完成一个frags,修改pos长度
if (pos + size <= offset + len) {
i++;
frag++;
pos += size;
} else {
skb_frag_size_sub(nskb_frag, pos + size - (offset + len));
goto skip_fraglist;
}
nskb_frag++;
}
skip_fraglist:
nskb->data_len = len - hsize;
nskb->len += nskb->data_len;
nskb->truesize += nskb->data_len;
perform_csum_check:
if (!csum) {
if (skb_has_shared_frag(nskb)) {
err = __skb_linearize(nskb);
if (err)
goto err;
}
if (!nskb->remcsum_offload)
nskb->ip_summed = CHECKSUM_NONE;
SKB_GSO_CB(nskb)->csum =
skb_checksum(nskb, doffset,
nskb->len - doffset, 0);
SKB_GSO_CB(nskb)->csum_start =
skb_headroom(nskb) + doffset;
}
} while ((offset += len) < head_skb->len); //拷贝的数据长度还没到整个skb的长度,进入下一次分段
/* Some callers want to get the end of the list.
* Put it in segs->prev to avoid walking the list.
* (see validate_xmit_skb_list() for example)
*/
segs->prev = tail;
if (partial_segs) {
struct sk_buff *iter;
int type = skb_shinfo(head_skb)->gso_type;
unsigned short gso_size = skb_shinfo(head_skb)->gso_size;
/* Update type to add partial and then remove dodgy if set */
type |= (features & NETIF_F_GSO_PARTIAL) / NETIF_F_GSO_PARTIAL * SKB_GSO_PARTIAL;
type &= ~SKB_GSO_DODGY;
/* Update GSO info and prepare to start updating headers on
* our way back down the stack of protocols.
*/
for (iter = segs; iter; iter = iter->next) {
skb_shinfo(iter)->gso_size = gso_size;
skb_shinfo(iter)->gso_segs = partial_segs;
skb_shinfo(iter)->gso_type = type;
SKB_GSO_CB(iter)->data_offset = skb_headroom(iter) + doffset;
}
if (tail->len - doffset <= gso_size)
skb_shinfo(tail)->gso_size = 0;
else if (tail != segs)
skb_shinfo(tail)->gso_segs = DIV_ROUND_UP(tail->len - doffset, gso_size);
}
/* Following permits correct backpressure, for protocols
* using skb_set_owner_w().
* Idea is to tranfert ownership from head_skb to last segment.
*/
if (head_skb->destructor == sock_wfree) {
swap(tail->truesize, head_skb->truesize);
swap(tail->destructor, head_skb->destructor);
swap(tail->sk, head_skb->sk);
}
return segs;
err:
kfree_skb_list(segs);
return ERR_PTR(err);
}
分段处理完成后,返回分段的skb链表,然后将分段好的skb链表进一步发送(dev(qdisc) --->驱动 ----->网卡).
从上面的分片过程中可以看出,分成的小skb并不一定都是线性的,如果之前的skb存在frags数组或者frag_list,则分成的小skb 也可能有指向非线性区域。并不用担心网卡不支持分散聚合IO,因为之前如果能产生这些非线性数据,就说明网卡一定支持的。