xdp 全称 eXpress Data Path,是 linux ebpf 中的一个功能。ebpf 在内核中预留了一些插入点,用户可以在这些插入点插入自己的处理逻辑,当数据路过插入点时可以做一些预期的处理,具体实现方式如下:
① 用户编写数据处理代码,也就是对于路过这个插入点的数据想做什么处理
② 将代码编译
③ 将编译好的目标文件安装到插入点
安装之后,数据路过插入点时便会被安装的代码处理。
插入点的处理逻辑就像一些路口的收费站,不同身份的车辆通过收费站的时候,可能需要做不同的事情。有些车可以直接通过,不需要任何处理,也不用交钱;有些车不让通过,直接原路返回;有些车需要交钱之后再通过,不同类型的车所需要交的钱也可能是不一样的。
这些插入点提供了内核网络可编程的能力,也就是说我们不需要重新编译内核,只需要在用户态编写处理逻辑,然后将代码编译后的文件插入到内核,就可以实现我们的处理目的。内核还有其它一些方法,比如 kprobe,netfilter,内核热补丁,这些方式都是可以在不重新编译内核或者不重启内核的前提下改变内核的一些处理流程。
1 xdp 基本概念
1.1 xdp action
xdp 提供了一种处理网络报文的高性能方案,之所以性能高,是因为 xdp 对报文的处理在报文进入 IP TCP 协议栈之前,避免了漫长而繁琐的协议栈处理过程, 也就是 xdp 在收到包时最早能处理包的地方进行处理。下图中红色的 XDP 表示 xdp 的插入点,也表示在此插入点插入的用户程序。
程序对报文的处理可以有 5 个返回值:
XDP_PASS | 不对报文做特殊处理,就当这段代码是透明的,什么都没发生,没有产生任何影响 |
XDP_TX | 报文还是从本设备发送出去 |
XDP_REDIRECT | 报文重定向,报文重定向的目标有 3 个:重定向到 xsk,重定向到另一个 cpu,重定向到另一个网卡 |
XDP_DROP | 将报文丢弃 常用于防攻击,将攻击报文丢弃 |
XDP_ABORT | 数据包出错,最终也会被丢掉,与 XDP_DROP 不同的是会做异常统计 |
其中 XDP_REDIRECT 的意思是将报文进行重定向,重定向有 3 个选择:
① 将报文从另一个网卡发送出去
② 将报文重定向到另一个 cpu 进行处理
③ 将报文重定向到 xdp sock,用户态可以通过 xdp sock 接收这个报文进行处理
那么当一个 xdp prog 返回 XDP_REDIRECT 之后,报文具体的处理方式是怎么决定的呢,是由 linux 内核中的 bpf_redirect_info 中的 map 类型决定的。
如下代码,xdp_do_redirect() 函数是网卡驱动中处理 XDP_REDIRECT 时调用的函数。从该函数中可以看到,一个重要的变量是 ri,即 struct bpf_redirect_info,xdp 后边的处理过程由这个变量来决定,下边以重定向到 xdk sock 来举例。
ri 成员 | 说明 |
map | map 的类型指定了报文的重定向类型,map 类型有以下 4 种: BPF_MAP_TYPE_DEVMAP // 网卡 BPF_MAP_TYPE_DEVMAP_HASH // 网卡 BPF_MAP_TYPE_CPUMAP // cpu BPF_MAP_TYPE_XSKMAP // xsk 前两种类型的 map,报文要重定向到另一个网卡;BPF_MAP_TYPE_CPUMAP 说明报文要重定向到另一个 cpu;BPF_MAP_TYPE_XSKMAP 说明报文要放到 xdp sock,用户态可以通过 xdp sock 对报文进行接收。 |
index | 在 map 类型是 BPF_MAP_TYPE_XSKMAP 时,index 是网卡的 queue index,一个网卡中可能有多个 queue,每个 queue 都会有一个 index |
tgt_value | 在 map 类型是 BPF_MAP_TYPE_XSKMAP 时,tgt_value 是 xsk fd,xsk fd 保存在 xsk map 中。 xsk map 的 key 是 queue index,value 是 xdp sock。 |
int xdp_do_redirect(struct net_device *dev, struct xdp_buff *xdp,
struct bpf_prog *xdp_prog)
{
struct bpf_redirect_info *ri = this_cpu_ptr(&bpf_redirect_info);
struct bpf_map *map = READ_ONCE(ri->map);
u32 index = ri->tgt_index;
void *fwd = ri->tgt_value;
int err;
ri->tgt_index = 0;
ri->tgt_value = NULL;
WRITE_ONCE(ri->map, NULL);
...
err = __bpf_tx_xdp_map(dev, fwd, map, xdp);
...
return 0;
}
xdp_do_redirect() 函数最终会调用函数 __bpf_tx_xdp_map() 对报文进行重定向。
static int __bpf_tx_xdp_map(struct net_device *dev_rx, void *fwd,
struct bpf_map *map, struct xdp_buff *xdp)
{
switch (map->map_type) {
case BPF_MAP_TYPE_DEVMAP:
case BPF_MAP_TYPE_DEVMAP_HASH:
return dev_map_enqueue(fwd, xdp, dev_rx);
case BPF_MAP_TYPE_CPUMAP:
return cpu_map_enqueue(fwd, xdp, dev_rx);
case BPF_MAP_TYPE_XSKMAP:
return __xsk_map_redirect(fwd, xdp);
default:
return -EBADRQC;
}
return 0;
}
struct bpf_redirect_info 的定义如下,那么这个结构体里边的成员是在什么时候赋值的呢,是在用户写的 xdp prog 里边赋值的。
bpf prog 中可以调用函数 bpf_redirect_map(&xsks_map, index, 0) 来对 bpf_redirect_info 进行赋值,这个函数最终也会通过系统调用调用到内核中的 bpf_xdp_redirect_map。
struct bpf_redirect_info {
u32 flags;
u32 tgt_index;
void *tgt_value;
struct bpf_map *map;
u32 kern_flags;
struct bpf_nh_params nh;
};
bpf_xdp_redirect_map():
BPF_CALL_3(bpf_xdp_redirect_map, struct bpf_map *, map, u32, ifindex,
u64, flags)
{
struct bpf_redirect_info *ri = this_cpu_ptr(&bpf_redirect_info);
/* Lower bits of the flags are used as return code on lookup failure */
if (unlikely(flags > XDP_TX))
return XDP_ABORTED;
ri->tgt_value = __xdp_map_lookup_elem(map, ifindex);
if (unlikely(!ri->tgt_value)) {
/* If the lookup fails we want to clear out the state in the
* redirect_info struct completely, so that if an eBPF program
* performs multiple lookups, the last one always takes
* precedence.
*/
WRITE_ONCE(ri->map, NULL);
return flags;
}
ri->flags = flags;
ri->tgt_index = ifindex;
WRITE_ONCE(ri->map, map);
return XDP_REDIRECT;
}
xdp sock map 的 key 是网卡的 queue index,value 是 xsk sock 对应的 fd,在创建 xdp sock 的时候存储这个信息。
对报文的处理往往都会有几个选项,在 netfilter 使用时,也有类似的选项。可以通过内核模块向 netfilter 的 hook 点上插入代码,报文经过的时候使用我们插入的代码进行处理,返回值有 3 种。
NF_ACCEPT | 接收这个报文,直接 break,后边的规则不再检查。 返回值是 1,返回之后会继续执行 okfn 对报文做后续处理。 |
NF_DROP | 丢弃报文, 后续不再处理 |
NF_QUEUE | 把报文加入到队列,下一个规则继续处理。 |
1.2 xdp map
xdp map 在内核态可以访问,在用户态也可以访问,是用户态和内核态进行通信的桥梁。map 有很多种类型,其中 BPF_MAP_TYPE_DEVMAP,BPF_MAP_TYPE_DEVMAP_HASH,BPF_MAP_TYPE_CPUMAP,BPF_MAP_TYPE_XSKMAP 这 4 种 map 用在 XDP_REDIRECT 情况下,决定将报文重定向到另一个网卡,另一个 cpu 还是将报文重定向到 xdp sock。
enum bpf_map_type {
BPF_MAP_TYPE_UNSPEC,
BPF_MAP_TYPE_HASH,
BPF_MAP_TYPE_ARRAY,
BPF_MAP_TYPE_PROG_ARRAY,
BPF_MAP_TYPE_PERF_EVENT_ARRAY,
BPF_MAP_TYPE_PERCPU_HASH,
BPF_MAP_TYPE_PERCPU_ARRAY,
BPF_MAP_TYPE_STACK_TRACE,
BPF_MAP_TYPE_CGROUP_ARRAY,
BPF_MAP_TYPE_LRU_HASH,
BPF_MAP_TYPE_LRU_PERCPU_HASH,
BPF_MAP_TYPE_LPM_TRIE,
BPF_MAP_TYPE_ARRAY_OF_MAPS,
BPF_MAP_TYPE_HASH_OF_MAPS,
BPF_MAP_TYPE_DEVMAP,
BPF_MAP_TYPE_SOCKMAP,
BPF_MAP_TYPE_CPUMAP,
BPF_MAP_TYPE_XSKMAP,
BPF_MAP_TYPE_SOCKHASH,
BPF_MAP_TYPE_CGROUP_STORAGE,
BPF_MAP_TYPE_REUSEPORT_SOCKARRAY,
BPF_MAP_TYPE_PERCPU_CGROUP_STORAGE,
BPF_MAP_TYPE_QUEUE,
BPF_MAP_TYPE_STACK,
BPF_MAP_TYPE_SK_STORAGE,
BPF_MAP_TYPE_DEVMAP_HASH,
BPF_MAP_TYPE_STRUCT_OPS,
BPF_MAP_TYPE_RINGBUF,
BPF_MAP_TYPE_INODE_STORAGE,
};
假如有这样一种场景,我们需要将报文重定向到 xdp sock,然后应用通过 xdp sock 来接收报文。但是我们又不想重定向所有的报文,我们只关心发给我们应用自己的报文。比如我们的应用使用 tcp 协议,每建立一个连接,都会占用一个 tcp 端口号,那么我们就可以使用一个 map 来保存应用已经使用的端口号,每建立一个新的连接,都将端口号保存到 map 中,然后在 xdp prog 中对报文进行解析,解析出来端口号之后在 map 中查找,如果查到这个端口号,就返回 XDP_REDIRECT,否则就返回 XDP_PASS。
逻辑如下图所示,如果报文是 tcp 协议,并且目标端口号是在 map 中,那么 xdp prog 返回 XDP_REDIRECT,否则返回 XDP_PASS。这种使用场景下,只需要 key 就可以了,value 没有实际意义。当连接建立时,将端口号保存到 map 中,当连接断开时,将端口号从 map 中删除。
在讨论高性能网络时,xdp 和 dpdk 两个概念经常一块出现,两者都是为了提升网络性能而出现的技术,但是两者本身有本质的区别:
① 是否 bypass 了内核
dpdk 是 intel 发起开发的用户态的数据面开发包,bypass 了内核,运行在用户态;
xdp 还是 linux 内核的功能,并没有 bypass 内核。
② 是否接管了网卡
dpdk 接管了网卡,网卡中的所有报文都在用户态处理,这就无法使用 linux 内核已经提供的一些基础架构,比如 tcp/ip 协议栈,iptables 等。
xdp 没有接管网卡,使用比较灵活。
1.3 xdp 程序的加载点
xdp 有 3 个加载点,网卡硬件,网卡驱动,链路层代码,在代码中用一个 enum 来表示:
enum bpf_xdp_mode {
XDP_MODE_SKB = 0, // 链路层代码
XDP_MODE_DRV = 1, // 网卡驱动
XDP_MODE_HW = 2, // 网卡硬件
__MAX_XDP_MODE
};
当加载 xdp 程序的时候,可以指定加载到什么位置,也可以不指定。如果不指定的话,具体将 xdp 程序加载哪个插入点,是内核自己确定的,内核判断的优先级从高到低的顺序是 XDP_MODE_HW,XDP_MODE_DRV,XDP_MODE_SKB,也就是说如果能加载到网卡硬件,那么便会直接加载到网卡硬件,如果不能便会判断能不能加载网卡驱动;如果能则加载到网卡驱动,如果网卡驱动也不支持的话,那么只能加载到链路层代码。报文被 xdp 处理越早越好。
XDP_MODE_HW, XDP_MODE_DRV 需要网卡或者网卡驱动支持,如果不支持则无法加载;XDP_MODE_SKB 则是 linux 网络架构代码支持,不依赖于驱动程序或者网卡硬件支持,肯定能加载成功。
3 个加载点的共同点是:都在报文进入 IP, TCP 协议栈之前。
如果某个网卡加载了 xdp 程序,使用命令行 ip link show dev xxx 可看到标记。下图是在我的虚拟机的网卡上加载了 xdp 程序,虚拟机网卡不支持硬件卸载或者驱动,所以加载模式为 SKB 模式,SKB 模式会显示 xdpgeneric。驱动模式会显示 xdpdrv,卸载到硬件的话会显示 xdpoffload。
2 示例
如下是一个 xdp 的实例代码。代码实现非常简单,所有的报文,不做任何判断和处理,直接返回 XDP_PASS,相当于 xdp 程序是透明的,没有对报文产生任何影响。xdp 代码的返回值和入参都是固定的,入参 struct xdp_md 中包括报文的数据和一些报文的元数据(比如来自哪个网卡,网卡的哪个队列)。xdp 代码需要用 SEC("xxx") 来标记。在 xdp 程序中可以使用 bpf_printk() 来打印日志,日志保存在 /sys/kernel/debug/tracing/trace_pipe 中。
#include <linux/bpf.h>
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_endian.h>
struct {
__uint(type, BPF_MAP_TYPE_XSKMAP);
__type(key, __u32);
__type(value, __u32);
__uint(max_entries, 64);
} xsks_map SEC(".maps");
SEC("xdp_hello")
int xdp_hello_test(struct xdp_md *ctx) {
bpf_printk("sizeof(struct xdp_md) = %d\n", sizeof(struct xdp_md));
bpf_printk("xdp hello, rx ifindex = %u\n", ctx->ingress_ifindex);
bpf_printk("xdp hello, rx queue index = %u\n", ctx->rx_queue_index);
bpf_printk("data start = %p, data end = %p, data len = %u\n", \
(void *)(long)(ctx->data), (void *)(long)(ctx->data_end), ctx->data_end - ctx->data);
return XDP_PASS;
}
char _license[] SEC("license") = "GPL";
(1)代码中定义了一个 xsk map,虽然在代码中没有使用,但是通过这段代码也能知道怎么定义一个 map
(2)打印了这个报文是从哪个网卡接收的(ifindex),从网卡的哪个队列接收的(queue index);打印了报文的起始地址和结束地址,计算并打印了报文的长度
(3)xdp 代码和 map 都是使用 SEC() 进行声明,编译出来的 xdp 文件是一个 .o 文件,是目标文件;目标文件的格式是 elf 格式。
objdump -h -S xdp.o 可以查看 xdp.o 中的段信息。如下图所示,可以看到 xdp 程序和 map 分别保存在 xdp_hello 和 .maps 段里。xdp_hello 段的属性中有 CODE,说明里边是代码指令,.maps 段的属性里边有 DATA,说明是存放数据的。
2.1 编译
编译 xdp 代码使用 clang,编译出来是 .o 目标文件(和内核模块是不一样的,内核模块是 .ko 文件)。
clang -O2 -g -Wall -target bpf -c xdp.c -o xdp.o
ebpf 程序使用 llvm 进行编译。
我们在编译 c 代码时,常常使用的编译器是 gcc,其实 clang 也可以编译 c 代码。
编译器通常分为 3 个部分,前端,优化器和后端。前端负责词法和语法分析,将源代码转化成语法树;优化器主要对前端处理后的代码进行优化;后端则是将优化器优化后的代码转化成机器码。
编译 ebpf 程序使用 clang 来编译,clang 是编译器的前端,llvm 是后端。clang 和 llvm 主要是苹果公司推动发展的。
2.2 安装
xdp 程序安装,使用如下命令可以安装 xdp 程序,安装命令中需要指定 xdp 目标文件和 xdp 程序的段名。
ip link set dev ens33 xdp obj xdp.o sec xdp_hello
上边这个命令中的 xdp 是指定 xdp 的 mode,有 4 个可选:xdp、 xdpgeneric、 xdpdrv、 xdpoffload。如果选择 xdp,则具体加载点由内核确定,xdpgeneric 对应 SKB 模式,drv 对应网卡驱动,xdpoffload 对应网卡硬件。
加载 xdp 程序之后,使用 ip link show dev ens33 可以看到网卡中多了一个标志,xdpgeneric,说明 xdp 加载点是 xdpgeneric,这也说明当前的网卡不支持 xdpdrv 和 xdpoffload 模式。
如果当前网卡不支持 xdpdrv 和 xdpoffload 这种方式,并且命令行中还指定了 xdpdrv 或者 xdpoffload,那么会返回错误信息。
使用 bpftool prog 能看到已经安装的 bpf 程序:
使用 bpftool map 能看到已经安装的 map:
2.3 查看 xdp 程序打印的日志
加载 xdp 程序之后,可以在 /sys/kernel/debug/tracing/trace_pipe 中看到 xdp prog 中的打印信息。
2.4 卸载
ip link set dev ens33 xdp off
2.5 ebpf 校验器
ebpf 程序加载的时候,首先会使用校验器进行检查。因为 ebpf 程序最终要运行在内核,如果有问题,容易导致内核崩溃,所以检查比较严格。如果校验器检查不通过, xdp 程序安装就会失败。
循环语句
低版本的 ebpf 不支持循环语句,后来的版本支持了循环语句。
但是循环的次数一定得是确定的,能够完成的,如下边的代码,有一个循环,永远无法退出。这个代码可以编译通过,但是加载的时候会加载失败。
#include <linux/bpf.h>
#include <bpf/bpf_helpers.h>
SEC("xdp_hello")
int xdp_hello_test(struct xdp_md *ctx) {
bpf_printk("xdp hello\n");
for (int i = 0; i < 10;) {
bpf_printk("i %d\n", i);
}
return XDP_PASS;
}
char _license[] SEC("license") = "GPL";
加载失败的提示信息如下:
另外,使用未定义的变量,内存访问越界等,也会导致编译告警。
3 xsk 和 umem
xdp socket 简称 xsk,也是 socket 的一种。使用 xsk 不像使用 tcp socket 那样简洁,xsk 的使用涉及到内存池的管理,涉及到收发包队列的管理,内存池管理称作 umem。 使用 xsk 一般基于 libbpf 提供的 api 进行开发,xsk 使用使用的 api 比较多,后边会对使用到的 api 进行整理。
xsk 使用的内存池,内存是用户态申请的,用户态申请内存之后调用 bpf 相关的 api 对内存进行切分,将内存分配到收发包队列中。
3.1 收发包队列
在网络领域,内存池,发送队列,接收队列,完成队列,这些概念是绕不开的。网卡驱动收发包,tcp 收发包,甚至用户态的网络应用,都要维护自己的内存池和收发包队列。同样在 xdp 中也存在这些概念。
xdp 中维护了 4 个队列,如下图所示:
4 个队列,发送方向上使用 fq 和 rx,接收方向上使用 tx 和 cq。
队列名 | 生产者 | 消费者 |
fq(fill ring) | 用户 用户态将空闲的 buffer 放到这个队列中。 | 内核 内核态收到数据之后,从这个队列中获取空闲的 buffer,然后将数据保存到 buffer 中 |
rx(rx ring) | 内核 内核将保存了数据的 buffer 放到 rx 中 | 用户 用户从这个队列中获取接收到的数据 |
tx(tx ring) | 用户 用户发送数据时,将数据放到 buffer 中,然后将 buffer 放到 tx 队列。放到队列之后,调用 sendto() 通知内核发送数据 | 内核 用户调用 sendto() 的时候,内核从 tx 中消费 buffer,把 buffer 中的数据发送出去 |
cq(complete ring) | 内核 内核将 buffer 中的数据发送完成之后,将空闲的 buffer 放到 cq 中 | 用户 用户从 cq 中获取空闲的 buffer,后续发送数据使用 |
3.2 bpf 相关 api
xdp 中 xsk 相关的 api,umem 相关的 api,收发包队列相关的 api,这些 api 都是 libbpf 提供的。xdp 中这些概念是相互耦合在一起的,不像 tcp socket 这样,调用几个系统调用就可以使用,使用 xdp 时需要调用的函数是比较多的。
3.2.1 xsk 和 umem
先创建 umem,之后再创建 xdp socket。
3.2.1.1 创建 umem
LIBBPF_API int xsk_umem__create(struct xsk_umem **umem,
void *umem_area, __u64 size,
struct xsk_ring_prod *fill,
struct xsk_ring_cons *comp,
const struct xsk_umem_config *config);
形参 1: struct xsk_umem
struct xsk_umem 结构体如下,其中包括 fq 和 cq 两个队列,这两个队列分别用于接收侧和发送侧;另外还有内存池的内存地址。
struct xsk_umem {
struct xsk_ring_prod *fill_save;
struct xsk_ring_cons *comp_save;
char *umem_area;
struct xsk_umem_config config;
int fd;
int refcount;
struct list_head ctx_list;
bool rx_ring_setup_done;
bool tx_ring_setup_done;
};
形参 2: char *umem_area
这个是 xsk 内存池的地址,需要用户态申请这块内存,假如我们使用的每个 buffer 大小是 4096B,接收方向和发送方向队列的长度均是 2048,我们就需要申请 4096 * 2048 * 2 = 16M 的内存。内存池里中包含很多个 buffer,在创建内存池的时候并不是一个 buffer 一个 buffer 申请,而是申请一整块内存,然后再这一大块内存的基础上切分出这些 buffer。
形参 3: __u64 size
buffer 的长度
形参 4: struct xsk_ring_prod *fill
fq,接收侧使用
形参 5:struct xsk_ring_cons *comp
cq,发送侧使用
形参 6:struct xsk_umem_config *config
umem 的配置,上边我们可以看到传了一整块内存,但是 buffer 数量是多少,每个 buffer 的大小是多大,这个都在 xsk_umem 中配置。
struct xsk_umem_config {
__u32 fill_size;
__u32 comp_size;
__u32 frame_size;
__u32 frame_headroom;
__u32 flags;
};
3.2.1.2 创建 xsk
LIBBPF_API int xsk_socket__create(struct xsk_socket **xsk,
const char *ifname, __u32 queue_id,
struct xsk_umem *umem,
struct xsk_ring_cons *rx,
struct xsk_ring_prod *tx,
const struct xsk_socket_config *config);
形参 1:struct xsk_socket **xsk
入参是 xsk 指针,xsk_socket__create 内部负责创建和初始化
形参 2,3: const char *ifname, __u32 queue_id
指定网卡和网卡的 queue
形参 4,5:struct xsk_ring_cons *rx, struct xsk_ring_prod *tx,
接收队列和发送队列,xsk_socket__create 内部负责创建和初始化
形参 6:const struct xsk_socket_config *config
指定 rx 和 tx 的 buffer 数量
3.2.2 收发包队列
从 struct xsk_umem 和 struct xsk_socket 的定义中可以看出来,fq 和 tx 的类型是 struct xsk_ring_prod *,cq 和 rx 的数据类型是 struct xsk_ring_cons。这两个数据类型,一个以 prod 结尾,一个以 cons 结尾,prod 是 producer 生产者,cons 是 consumer 消费者。生产者还是消费者,是针对用户态的用户来说的(这是理所当然的,从用户的角度来看),fq 和 tx 的生产者是用户,cq 和 rx 的消费者是用户。
struct xsk_umem {
struct xsk_ring_prod *fill_save;
struct xsk_ring_cons *comp_save;
char *umem_area;
struct xsk_umem_config config;
int fd;
int refcount;
struct list_head ctx_list;
bool rx_ring_setup_done;
bool tx_ring_setup_done;
};
struct xsk_socket {
struct xsk_ring_cons *rx;
struct xsk_ring_prod *tx;
__u64 outstanding_tx;
struct xsk_ctx *ctx;
struct xsk_socket_config config;
int fd;
};
4 个队列是共用一个数据结构类型的,只不过名字不一样。
从结构体可以看出来,有生产者索引,消费者索引,并且生产者索引和消费者索引都有 2 个,一个带 cached,一个不带 cached。带 cached,说明当前还在操作,可能还没操作完;不带 cached,说明已经操作完了。
/* Do not access these members directly. Use the functions below. */
#define DEFINE_XSK_RING(name) \
struct name { \
__u32 cached_prod; \
__u32 cached_cons; \
__u32 mask; \
__u32 size; \
__u32 *producer; \
__u32 *consumer; \
void *ring; \
__u32 *flags; \
}
DEFINE_XSK_RING(xsk_ring_prod);
DEFINE_XSK_RING(xsk_ring_cons);
向队列中生产或者从队列中消费,要调用 3 个函数,3段式。
生产:
(1)reserve 预留队列空间
(2)向预留的队列空间中填充元素
(3)提交,就是修改 producer 索引,producer 索引提交之后,说明生产完毕,新生产的元素可以消费了
消费:
(1)peak,获取队列中的元素
(2)逐个处理队列中的元素
(3)release,更新 consumer 索引,consumer 索引更新之后,新释放的空间才可以被填充新的元素
内存屏障:
在队列管理中,特别是在更新队列索引的时候,往往需要使用到内存屏障。内存屏障,就像养猪场或者养牛场中的栅栏,栅栏两侧的牲口不能跨越栅栏走动,同样的内存屏障上下的指令也不能陆续执行。
为什么需要内存屏障 ?如下解释来源于 《Linux 内核设计与实现》
假如有如下代码:
a = 1;
b = 2;
程序在执行的时候,有可能在 a 中存放新值之前就在 b 中存放了新值。
编译器和处理器都看不出来 a 和 b 之间的关系。编译器会在编译时按这种顺序编译,这种顺序会是静态的,编译的目标代码就是把 a 放在 b 之前。但是,处理器会重新动态排序,因为处理器在执行指令期间,会在取指令和分派时,把表面上看似无关的指令按自认为最好的顺序排列。大多数情况下,这样的排序是最佳的,因为 a 和 b 之间没有明显的关系。
尽管前边的例子可能被重新排序,但是处理器和编译器绝对不会对下边的代码重新排序。
a = 1;
b = a;
此处 a 和 b 均为全局变量,因为 a 和 b 之间有明确的数据依赖关系。
rmd() | 读内存屏障,它确保跨越 rmb() 的载入动作不会发生重排序。也就是说在 rmb() 之前的载入操作不会被重新排在调用之后,同理,在 rmb() 之后的载入动作不会被重新排在该调用之前。 |
wmb() | 写内存屏障,和 rmb 类似,区别是针对的操作有载入动作变成了存储动作。 |
mb() | 对载入动作和存储动作都生效。 |
static inline void xsk_ring_prod__submit(struct xsk_ring_prod *prod, size_t nb);
static inline size_t xsk_ring_cons__peek(struct xsk_ring_cons *cons,
size_t nb, __u32 *idx);
static inline void xsk_ring_cons__release(struct xsk_ring_cons *cons, size_t nb);
3.2.2.1 向 fq 生产[接收内存池]
(1)把位置预留好
static inline size_t xsk_ring_prod__reserve(struct xsk_ring_prod *prod,
size_t nb, __u32 *idx)
{
if (xsk_prod_nb_free(prod, nb) < nb)
return 0;
*idx = prod->cached_prod;
prod->cached_prod += nb;
return nb;
}
(2)获取到队列中这个位置的地址,然后向这个位置填地址
static inline __u64 *xsk_ring_prod__fill_addr(struct xsk_ring_prod *fill,
__u32 idx)
{
__u64 *addrs = (__u64 *)fill->ring;
return &addrs[idx & fill->mask];
}
向队列中填地址:
*xsk_ring_prod__fill_addr = buffer_addr
(3)更新 producer 索引
static inline void xsk_ring_prod__submit(struct xsk_ring_prod *prod, size_t nb)
{
/* Make sure everything has been written to the ring before indicating
* this to the kernel by writing the producer pointer.
*/
libbpf_smp_wmb();
*prod->producer += nb;
}
3.2.2.2 向 tx 生产[发包]
向 tx 生产,通过 struct xdp_desc 来进行。在网卡收发包的时候,buffer describer 简称 bd 也是很常用的,往往使用两个队列一个是 bd 队列,一个是 buffer 队列,两个队列的长度一致。bd 中最基础的属性是内存地址和长度,对于功能比较复杂的网卡,这个 bd 的属性往往是比较多的。
struct xdp_desc {
__u64 addr;
__u32 len;
__u32 options;
};
(1)把位置预留好
static inline size_t xsk_ring_prod__reserve(struct xsk_ring_prod *prod,
size_t nb, __u32 *idx)
{
if (xsk_prod_nb_free(prod, nb) < nb)
return 0;
*idx = prod->cached_prod;
prod->cached_prod += nb;
return nb;
}
(2)获取位置上的描述符 struct xdp_desc 并做填充
static inline struct xdp_desc *xsk_ring_prod__tx_desc(struct xsk_ring_prod *tx,
__u32 idx)
{
struct xdp_desc *descs = (struct xdp_desc *)tx->ring;
return &descs[idx & tx->mask];
}
(3)提交
static inline void xsk_ring_prod__submit(struct xsk_ring_prod *prod, size_t nb)
{
/* Make sure everything has been written to the ring before indicating
* this to the kernel by writing the producer pointer.
*/
libbpf_smp_wmb();
*prod->producer += nb;
}
最终 xsk 报文的发送还要调用 sendto() 进行发送。和使用 tcp socket 的时候不一样的是,使用 tcp socket 的时候,在调用 sendto 的时候需要带上数据的地址和长度;而使用 xsk 发送的时候不需要带这些信息,xsk 的 sendto 在内核的函数是 xsk_generic_xmit() 这个函数会从 tx 中消费 buffer 然后将数据发送出去。
3.2.2.3 从 rx 消费[收包]
tx 和 rx 队列中存储的对象都是一个 struct xdp_dexc。
(1)获取可消费的元素个数
static inline size_t xsk_ring_cons__peek(struct xsk_ring_cons *cons,
size_t nb, __u32 *idx)
{
size_t entries = xsk_cons_nb_avail(cons, nb);
if (entries > 0) {
/* Make sure we do not speculatively read the data before
* we have received the packet buffers from the ring.
*/
libbpf_smp_rmb();
*idx = cons->cached_cons;
cons->cached_cons += entries;
}
return entries;
}
(2)获取 bd,然后处理接收的数据
static inline const struct xdp_desc *
xsk_ring_cons__rx_desc(const struct xsk_ring_cons *rx, __u32 idx)
{
const struct xdp_desc *descs = (const struct xdp_desc *)rx->ring;
return &descs[idx & rx->mask];
}
(3)数据处理完毕,释放队列空间
static inline void xsk_ring_cons__release(struct xsk_ring_cons *cons, size_t nb)
{
/* Make sure data has been read before indicating we are done
* with the entries by updating the consumer pointer.
*/
libbpf_smp_rwmb();
*cons->consumer += nb;
}
3.2.2.4 从 cq 中消费,回收 buffer [发送内存池]
(1)获取空闲的元素个数
static inline size_t xsk_ring_cons__peek(struct xsk_ring_cons *cons,
size_t nb, __u32 *idx)
{
size_t entries = xsk_cons_nb_avail(cons, nb);
if (entries > 0) {
/* Make sure we do not speculatively read the data before
* we have received the packet buffers from the ring.
*/
libbpf_smp_rmb();
*idx = cons->cached_cons;
cons->cached_cons += entries;
}
return entries;
}
(2)获取空闲的 buffer
static inline const __u64 *
xsk_ring_cons__comp_addr(const struct xsk_ring_cons *comp, __u32 idx)
{
const __u64 *addrs = (const __u64 *)comp->ring;
return &addrs[idx & comp->mask];
}
(3)处理完毕,更新队列索引
static inline void xsk_ring_cons__release(struct xsk_ring_cons *cons, size_t nb)
{
/* Make sure data has been read before indicating we are done
* with the entries by updating the consumer pointer.
*/
libbpf_smp_rwmb();
*cons->consumer += nb;
}