一、io_uring的引入
为了方便说明io_uring的作用,先举一个通俗点的例子
1、通过异步提高读写的效率
假设有一批数量很大的货,需要分批次运到厂里处理。这个时候就有两种方式:
1)同步方式:运送一批到厂里,等厂里反馈OK了,再回来送下一批;
2)异步方式:送到厂里之后,不用等厂里反馈,直接回来接送下一批货。
哪一种方式比较好呢?当然如果货的数量不多,同步方式比较可靠,效率差别也不大。但如果面对数量很大,比如读取一个很大的数据包,那异步方式效率就会显著提高。
2、减少系统开销
另外,正常送货到工厂,需要开门进入,卸完货之后再出来。这样频繁的进出消耗很大。因此,可以在工厂门口开辟一块共享仓库,货车卸货进仓库,工厂从仓库里取货。
对应的,IO操作需要进行系统调用。而在调用系统调用时,会从用户态切换到内核态,进行上下文切换。在高 IOPS(Input/Output Per Second)的情况下,进行上下文切换会消耗大量的CPU时间。
io_uring为了减少系统调用带来的上下文切换,采用了用户态和内核态共享内存的方式。用户态对共享内存进行读写操作是不需要使用系统调用的,所以不会发生上下文切换的情况。
二、io_uring
1、内核接口
linux内核(5.10版本之后)为io_uring提供了三个接口
1)io_uring_setup
:该函数用于初始化和配置 io_uring环境。该函数将返回一个文件描述符,称为 io_uring文件描述符,用于后续的 I/O 操作。
int io_uring_setup(unsigned entries, struct io_uring_params *p);
- entries:指定io_uring 的入口数目,即同时处理的 I/O 事件数目。
- p:指向 struct io_uring_params 结构的指针,用于传递其他配置参数。
2)io_uring_enter
:该函数用于提交 I/O 事件并等待其完成。该函数将返回已完成的 I/O 事件数量。
int io_uring_enter(int fd, unsigned to_submit, unsigned min_complete, unsigned flags, sigset_t *sig);
- fd:io_uring文件描述符。
- to_submit:要提交的 I/O 事件数量。
- min_complete:指定在返回之前至少完成的 I/O 事件数量。
- flags:用于指定操作行为的标志。
- sig:用于传递信号集,以便在等待期间阻塞特定的信号。
3)io_uring_register
:该函数用于将文件描述符或内存区域与io_uring关联起来。该函数将返回注册的文件描述符或内存区域的索引,以便后续的 I/O 操作可以使用。
int io_uring_register(int fd, unsigned int opcode, const void *arg, unsigned int nr_args);
- fd:io_uring文件描述符。
- opcode:指定注册操作的类型,如文件描述符的注册或内存区域的注册。
- arg:指向相关数据结构的指针,用于传递需要注册的文件描述符或内存区域的信息。
- nr_args:指定相关参数的数量。
2、提交队列SQ和完成队列CQ
1)提交队列(Submission Queues,SQ):环形队列,存放即将执行I/O操作的数据;
2)完成队列(Completion Queue, CQ):环形队列,存放I/O操作完成返回后的结果;
对于提交队列,内核会将io_sq_ring
结构映射到应用程序的内存空间,这样应用程序可以直接向io_sq_ring
结构的环形队列提交I/O操作,内核可以通过io_sq_ring
结构的环形队列读取I/O操作。这样不用通过系统调用,避免了上下文切换。
同样对于完成队列也是如此。
2.1 io_sq_ring
和 io_cq_ring
io_sq_ring
结构用于向内核发起 I/O 操作请求,即将需要执行的 I/O 操作请求提交到队列中。io_sq_ring
中的array指向一个环形队列的SQE数组,该数组用于存储应用程序提交的I/O请求信息。每个SQE对应一个io_uring_sqe
结构体,代表一个具体的 I/O 操作请求,其中包含了执行该操作所需的各种参数和信息。
同样对于io_cq_ring
结构也是如此。需要特别注意的是结构体中res
的含义会有所不同:
对于读取操作(如 read()),res
表示成功读取的字节数。
对于写入操作(如 write()),res
表示成功写入的字节数。
对于 accept 操作(如 accept()),res
表示接受到的新连接的文件描述符(即,已连接套接字)。
3、io_uring
的流程
io_uring的操作流程如下:
1)应用程序向 提交队列io_sq_ring
提交I/O操作
2)SQ内核线程从 提交队列 中读取 I/O 操作。
3)SQ内核线程发起 I/O 请求。
4)I/O 请求完成后,SQ内核线程会将 I/O 请求的结果写入到 io_uring 的 完成队列io_cq_ring
中。
5)应用程序可以从 完成队列io_cq_ring
中读取到 I/O 操作的结果。
三、liburing库
liburing
是已经封装好io_uring
的库
1、安装liburing
1)下载源码
sudo apt-get install git
git clone git://git.kernel.dk/liburing
2)进入liburing
cd liburing/
3)配置
./configure
4)编译和安装
make && sudo make install
5)配置pkg-config文件,以便其他程序能够使用已安装的liburing库
echo "/usr/local/lib/pkgconfig" | sudo tee /etc/ld.so.conf.d/usr-local-lib.conf >/dev/null
sudo ldconfig -v
6)编译应用程序
gcc -o uring uring.c -luring -static
2、接口
2.1 io_uring_queue_init_params()
执行io_uring_setup()系统调用来初始化io_uring队列。
int io_uring_queue_init_params(unsigned entries, struct io_uring *ring, const struct io_uring_params *p);
1)entries:指定 I/O uring 的入口数目,即同时处理的 I/O 事件数目。
2)ring:指向 struct io_uring 结构的指针,用于接收初始化后的 I/O uring 环境。
3)p:指向 struct io_uring_params 结构的指针,包含了自定义的初始化参数。
2.2 io_uring_get_sqe()
用于从 I/O uring 环境的 submission queue (SQ) 中获取一个 Submission Queue Entry (SQE)。通过获取 SQE,你可以将 I/O 操作请求添加到 I/O uring 中,并提交给内核进行处理。
struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring);
1)ring:指向 struct io_uring 结构的指针,表示要操作的 I/O uring 环境。
该函数返回一个指向 struct io_uring_sqe 结构的指针,该结构表示一个 Submission Queue Entry (SQE)。SQE 包含了要执行的 I/O 操作的详细信息
2.3 io_uring_prep_accept()
用于准备执行 accept 操作的 SQE (Submission Queue Entry)。通过使用 io_uring_prep_accept 函数,可以设置要接受连接的套接字描述符、新连接的文件描述符和连接地址等参数。
void io_uring_prep_accept(struct io_uring_sqe *sqe, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags);
1)sqe:一个指向 struct io_uring_sqe 结构的指针,表示要准备的 SQE。
2)fd:要接受连接的套接字描述符。
3)addr:一个指向 struct sockaddr 结构的指针,用于接收连接的远程地址信息。
4)addrlen:一个指向 socklen_t 类型的指针,表示 addr 缓冲区的长度。
5)flags:一个整数,用于设置一些额外的标志。
2.4 io_uring_prep_recv()
用于准备执行接收数据的 SQE (Submission Queue Entry)。通过使用 io_uring_prep_recv 函数,你可以设置要接收数据的套接字描述符以及数据缓冲区等参数。
void io_uring_prep_accept(struct io_uring_sqe *sqe, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags);
1)sqe:一个指向 struct io_uring_sqe 结构的指针,表示要准备的 SQE。
2)fd:要接受连接的套接字描述符。
3)addr:一个指向 struct sockaddr 结构的指针,用于接收连接的远程地址信息。
4)addrlen:一个指向 socklen_t 类型的指针,表示 addr 缓冲区的长度。
5)flags:一个整数,用于设置一些额外的标志。
2.5 io_uring_prep_send()
用于准备执行发送数据的 SQE (Submission Queue Entry)。通过使用 io_uring_prep_send 函数,你可以设置要发送数据的套接字描述符以及数据缓冲区等参数。
void io_uring_prep_send(struct io_uring_sqe *sqe, int fd, const void *buf, unsigned int len, int flags);
1)sqe:一个指向 struct io_uring_sqe 结构的指针,表示要准备的 SQE。
2)fd:要发送数据的套接字描述符。
3)buf:一个指向数据缓冲区的指针,包含要发送的数据。
4)len:一个无符号整数,表示数据缓冲区中要发送的数据长度。
5)flags:一个整数,用于设置一些额外的标志。
2.6 io_uring_submit()
用于提交 SQE (Submission Queue Entry) 到 I/O uring 环境中进行处理。
int io_uring_submit(struct io_uring *ring);
1)ring:一个指向 struct io_uring 结构的指针,表示要提交的 I/O uring 环境。
2)io_uring_submit 函数将等待队列中的 SQEs 进行提交,并触发 I/O uring 环境的处理。
函数返回已提交的 SQE 数量,或者在出现错误时返回负数
2.7 io_uring_wait_cqe()
用于等待完成队列项(Completion Queue Entry,CQE)的到来。
int io_uring_wait_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr);
1)ring:指向 struct io_uring 的指针,表示I/O uring环境。
2)cqe_ptr:指向CQE指针的指针,用于存储返回的已完成CQE。
io_uring_wait_cqe() 函数会阻塞当前线程,直到有已完成的CQE可用。一旦有CQE可用,函数将填充 cqe_ptr 指向的指针,并返回0。
2.8 io_uring_peek_batch_cqe()
用于批量获取已完成的CQE(Completion Queue Entry)而无需等待.
int io_uring_peek_batch_cqe(struct io_uring *ring, struct io_uring_cqe **cqes, unsigned int count);
1)ring:指向 struct io_uring 的指针,表示I/O uring环境。
2)cqes:一个指向指针数组的指针,用于存储返回的已完成CQE。
3)count:要获取的CQE数量。
io_uring_peek_batch_cqe() 函数会尝试从I/O uring环境中立即获取指定数量的已完成CQE,并将它们存储到 cqes 指向的指针数组中。如果成功获取了指定数量的CQE,函数将返回成功接收到并处理完毕的I/O事件数量,否则将返回负值。
2.9 io_uring_cq_advance()
用于标记完成队列(Completion Queue,CQ)上已经处理的CQE数量。当一个或多个CQE被处理后,需要调用io_uring_cq_advance()函数来更新下一次读取CQE时应该从哪个位置开始。
void io_uring_cq_advance(struct io_uring *ring, unsigned int steps);
1)ring:指向 struct io_uring 的指针,表示 I/O uring 环境。
2)steps:要向前推进的步数,即要消耗的 CQE 数量。
io_uring_cq_advance() 函数将指定数量的已完成的 CQE 标记为已消耗,从而使 I/O uring 可以继续接收新的 CQE。
2.10 io_uring_cq_advance()
用于推进完成队列(Completion Queue)中的消费指针,即更新消费指针的位置。以告知 I/O uring 已经成功处理了一定数量的完成事件,并且可以释放这些事件所占用的资源。
void io_uring_cq_advance(struct io_uring *ring, unsigned int count);
1)ring 是指向 struct io_uring 的指针,代表 I/O uring 环境。
2)count 是要推进的完成事件数量。
三、liburing的TCP服务器实现
基于liburing,实现多个TCP服务器的收发
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include <liburing.h>
#define ENTRIES_LENGTH 1024
enum {
EVENT_ACCEPT = 0,
EVENT_READ,
EVENT_WRITE
};
typedef struct _conninfo{
int connfd;
int event;
}conninfo;
//设置accept事件所需的信息,并将其与相应的 SQE 关联起来
void set_accept_event(struct io_uring *ring, int sockfd, struct sockaddr *addr,
socklen_t *addrlen, int flags){
//获取一个SQE
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
//准备执行accept操作的SQE
io_uring_prep_accept(sqe, sockfd, addr, addrlen, flags);
//创建了一个结构体变量 info_accept,其中存储了与连接相关的信息
conninfo info_accept = {
.connfd = sockfd,
.event = EVENT_ACCEPT
};
memcpy(&sqe->user_data, &info_accept, sizeof(info_accept));
}
//设置recv事件所需的信息,并将其与相应的 SQE 关联起来
void set_recv_event(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags){
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_recv(sqe, sockfd, buf, len, flags);
conninfo info_recv = {
.connfd = sockfd,
.event = EVENT_READ
};
memcpy(&sqe->user_data, &info_recv, sizeof(info_recv));
}
//设置send事件所需的信息,并将其与相应的 SQE 关联起来
void set_send_event(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags){
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_send(sqe, sockfd, buf, len, flags);
conninfo info_send = {
.connfd = sockfd,
.event = EVENT_WRITE
};
memcpy(&sqe->user_data, &info_send, sizeof(info_send));
}
int main(){
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(struct sockaddr_in));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(9999);
if (-1 == bind(sockfd, (struct sockaddr *)&servaddr, sizeof(struct sockaddr))){
printf("bind failed:%s\n",strerror(errno));
return -1;
}
listen(sockfd, 10);
//linuring
struct io_uring_params params;
memset(¶ms, 0 ,sizeof(struct io_uring_params));
struct io_uring ring;
//初始化io_uring队列
io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms);
//获取一个SQE
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
struct sockaddr_in clientaddr;
socklen_t clilen = sizeof(struct sockaddr);
//设置accept事件所需的信息,并将其与相应的 SQE 关联起来
set_accept_event(&ring, sockfd, (struct sockaddr *)&clientaddr, &clilen, 0);
char buffer[1024] = {0};
while (1){
//提交 SQE
io_uring_submit(&ring);
struct io_uring_cqe *cqe;
//等待完成队列项cqe
io_uring_wait_cqe(&ring, &cqe);
struct io_uring_cqe *cqes[10];
//批量获取已完成IO操作的CQE
int cqecount = io_uring_peek_batch_cqe(&ring, cqes, 10);
int i = 0;
for (i = 0; i < cqecount; i++){
cqe = cqes[i];
conninfo ci;
memcpy(&ci, &cqe->user_data, sizeof(ci));
if (ci.event == EVENT_ACCEPT){ //此时已经完成accept
if (cqe->res < 0) continue;
int connfd = cqe->res;
//在已接受连接的connfd上设置recv事件处理器,以便处理已建立连接的数据接收操作。
set_recv_event(&ring, connfd, buffer, 1024, 0); //此处的connfd表示已建立连接的fd,clientfd
//重新设置接受连接的事件,以便处理新连接的接受操作。
set_accept_event(&ring, ci.connfd, (struct sockaddr *)&clientaddr, &clilen, 0); //此处的connfd表示新连接的fd,listenfd
}else if (ci.event == EVENT_READ){//此时已经完成read
if (cqe->res < 0) continue;
if (cqe->res == 0){
close(ci.connfd);
}else{
printf("recv --> %s, %d\n", buffer, cqe->res);
//重新设置写的事件,以便回发
set_send_event(&ring, ci.connfd, buffer, cqe->res, 0);
}
} else if (ci.event == EVENT_WRITE){//此时已经完成write
//重新设置读的事件
set_recv_event(&ring, ci.connfd, buffer, 1024, 0);
}
}
io_uring_cq_advance(&ring, cqecount);
}
getchar();
}