io_uring
1、概述
io_uring是Linux(内核版本在5.1以后)在2019年加入到内核中的一种新型的异步I/O模型;
io_uring使用共享内存,解决高IOPS场景中的用户态和内核态的切换过程,减少系统调用;用户可以直接向共享内存提交要发起的I/O操作,内核线程可以直接获取共享内存中的I/O操作,并进行相应的读写操作;
优点
-
避免了提交I/O事件和完成事件中存在的内存拷贝(使用共享内存)
-
减少的了I/O任务提交和完成事件任务是的系统调用过程
-
采取无锁队列,减少了锁资源的竞争
主要内存结果
- 提交队列(Submission Queue,SQ)连续的内存空间,环形队列,存放将要执行的I/O操作数据
- 完成队列(Completion Queue, CQ)连续的内存空间,环形队列,存放执行完成I/O操作后的返回结果
- 提交队列项数组提(Submission Queue Entry,SQE):方便通过环形缓冲区提交内存请求
主要接口
io_uring提供三个用户态的系统调用接口
- io_uring_setup:初始化一个新的io_uring对象,一个SQ和一个CQ,通过使用共享内存进行数据操作
- io_uring_register:注册用于异步I/O的文件或用户缓冲区(buffers)
- io_uring_enter:提交I/O任务,等待I/O完成
SQ和CQ保存的都是SQEs数据的索引,不是真正的请求,真实是请求保存在SQE数组中,在提交请求时可以批量提交一组SQE数值上不连续的请求;
SQ、CQ、SQE中的内存区域都是有内核进行分配的,用户初始化会返回对应的fd,通过fd进行mmap和内核共享内存空间;
第三方库
liburing通过对io_uring进行分装,提供了一个简单的API,通过一下命令可以安装该动态库
git clone https://github.com/axboe/liburing.git
cd liburing
./configure
make
sudo make install
sudo ldconfig #更新动态库连接缓存
主要使用流程
1. io_uring初始化
io_uring通过io_uring_setup函数初始化,在liburing库中,通过io_uring_queue_init_params函数进行初始化,创建sumbmit队列和complete队列,以及SQE内存数组;
//io_uring实现异步的方式
struct io_uring_params pragma;
memset(&pragma, 0, sizeof(pragma));
struct io_uring ring;
// 初始化io_uring 创建submit队列和complite队列
io_uring_queue_init_params(1024, &ring, &pragma);
2. io_uring 提交(注册)到SQ环形队列
io_uring通过io_uring_register函数提交(注册)到用于异步I/O的缓冲区中,在liburing中通过io_uring_prep_accept函数对io_uring_refister进行封装使用;
// 获取ringbuffer的头
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
connect_info_t accept_info = {sockfd, EVENT_ACCEPT};
// 注册一个I/O事件
io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)clientaddr, addrlen, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));
3. io_uring_enter 提交I/O
io_uring中通过io_uring_enter函数来提交I/O,并等待事件的完成;在liburing中通过io_uring_submit来提交SQE的读写请求,io_uring_wait_cqe来等待I/O的处理结果,io_uring_peek_batch_cqe来获取CQ中的处理结果;
// 提交worker中执行
io_uring_submit(&ring);
struct io_uring_cqe *cqe;
//等待complete队列中的结果
io_uring_wait_cqe(&ring, &cqe);
struct io_uring_cqe *cqes[128];
// 获取CQ环形队列中的处理结果
int count = io_uring_peek_batch_cqe(&ring, cqes, 128);
实现
io_uring_server.c
#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <netinet/in.h>
enum event_type {
EVENT_ACCEPT,
EVENT_READ,
EVENT_WRITE
};
typedef struct connect_info{
int conn_fd;
int event;
}connect_info_t;
struct conn_info {
int fd;
int event;
};
int init_server(unsigned short port)
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("socket");
return -1;
}
struct sockaddr_in serveraddr;;
serveraddr.sin_family = AF_INET;
serveraddr.sin_port = htons(port);
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) < 0) {
perror("bind error");
return -1;
}
int opt = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
perror("setsockopt");
return -1;
}
listen(sockfd, 10);
return sockfd;
}
int set_event_recv(struct io_uring *ring, int sockfd, void *buf, int len, int flags)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
connect_info_t accept_info = {sockfd, EVENT_READ};
io_uring_prep_recv(sqe, sockfd, buf, len, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));
printf("set event recv----\n");
return 0;
}
int set_event_send(struct io_uring *ring, int sockfd, const void *buf, int len, int flags)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
connect_info_t accept_info = {sockfd, EVENT_WRITE};
io_uring_prep_send(sqe, sockfd, buf, len, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));
printf("set event send----\n");
return 0;
}
int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *clientaddr,
socklen_t *addrlen, int flags) {
// 获取sqe
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
// 初始化accept_info
connect_info_t accept_info = {sockfd, EVENT_ACCEPT};
// 准备accept操作
io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)clientaddr, addrlen, flags);
// 设置用户数据
memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));
printf("set event accept\n");
return 0;
}
int main(int argc, char *argv[])
{
// 初始化服务器
unsigned short port = 9999;
// 初始化服务器
int socketfd = init_server(port);
if (socketfd < 0)
return -1;
//io_uring实现异步的方式
struct io_uring_params pragma;
// 初始化io_uring 创建submit队列和complite队列
memset(&pragma, 0, sizeof(pragma));
struct io_uring ring;
io_uring_queue_init_params(1024, &ring, &pragma);
struct sockaddr_in clientaddr;
socklen_t addrlen = sizeof(struct sockaddr);
// 提交到submit队列中
set_event_accept(&ring, socketfd, (struct sockaddr*)&clientaddr, &addrlen, 0);
char buffer[1024] = {0};
while (1)
{
// 提交worker中执行
io_uring_submit(&ring);
printf("complete\n");
struct io_uring_cqe *cqe;
//等待complete队列中的结果
io_uring_wait_cqe(&ring, &cqe);
printf("complete end\n");
struct io_uring_cqe *cqes[128];
int count = io_uring_peek_batch_cqe(&ring, cqes, 128);
for (int i = 0; i < count; i++)
{
struct io_uring_cqe *entries = cqes[i];
connect_info_t result;
//struct conn_info result;
memcpy(&result, &entries->user_data, sizeof(connect_info_t));
if (result.event == EVENT_ACCEPT)
{
// 设置读事件
set_event_accept(&ring, socketfd, (struct sockaddr*)&clientaddr, &addrlen, 0);
printf("accept success\n");
int conn_fd = entries->res;
printf("conn_fd = %d res = %d\n", conn_fd, entries->res);
// 设置读事件
set_event_recv(&ring, conn_fd, buffer, 1024,0);
}
else if (result.event == EVENT_READ)
{
int ret = entries->res;
printf("set_event_recv ret: %d, %s\n", ret, buffer);
if (ret == 0)
{
close(result.conn_fd);
continue;
}
else if (ret > 0)
{
// 设置写事件
set_event_send(&ring, result.conn_fd, buffer, ret,0);
}
printf("read success\n");
}
else if (result.event == EVENT_WRITE)
{
int ret = entries->res;
set_event_recv(&ring, result.conn_fd, buffer, 1024,0);
printf("write success\n");
}
}
io_uring_cq_advance(&ring, count);
}
return 0;
}
io_uring_test.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#define TIMESUB_MS(tv1, tv2) (((tv2).tv_sec - (tv1).tv_sec) * 1000 + ((tv2).tv_usec - (tv1).tv_usec) / 1000)
#define TEST_MESSAGE "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyz\r\n"
#define RBUFFER_LENGTH 2048
#define WBUFFER_LENGTH 2048
typedef struct test_conttext
{
char server_ip[16];
int server_port;
int thread_num;
int connection_num;
int request_num;
int fail_num;
} test_conttext_t;
int send_recv_tcp(int sockfd)
{
char wbuffer[WBUFFER_LENGTH];
char rbuffer[RBUFFER_LENGTH];
memset(wbuffer, 0, sizeof(wbuffer));
memset(rbuffer, 0, sizeof(rbuffer));
for (int i = 0; i < 8; i++)
{
strcpy(wbuffer + i * strlen(TEST_MESSAGE), TEST_MESSAGE);
}
int res = send(sockfd, wbuffer, strlen(wbuffer), 0);
if (res <= 0)
{
return -1;
}
res = recv(sockfd, rbuffer, sizeof(rbuffer), 0);
if (res <= 0)
{
return -1;
}
if (strcmp(rbuffer, wbuffer) != 0)
{
printf("failed: '%s' != '%s'\n", rbuffer, wbuffer);
return -1;
}
return 0;
}
int connect_tcpserver(char *ip, int port)
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
perror("socket");
return -1;
}
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = inet_addr(ip);
if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0)
{
perror("connect");
close(sockfd);
return -1;
}
return sockfd;
}
static void *test_qps(void *arg)
{
test_conttext_t *ctx = (test_conttext_t *)arg;
int sockfd = connect_tcpserver(ctx->server_ip, ctx->server_port);
if (sockfd < 0)
{
printf("connect server failed\n");
return NULL;
}
int conut = ctx->request_num / ctx->connection_num;
int indx = 0;
int res;
while (indx++ < conut)
{
res = send_recv_tcp(sockfd);
if (res < 0)
{
printf("send_recv_tcp failed\n");
ctx->fail_num++;
continue;
}
}
return NULL;
}
int main(int argc, char *argv[])
{
int i;
printf("----%d\n", argc);
// for (i = 1; i < argc; i++)
// printf("%s\n", argv[i]);
test_conttext_t ctx = {0};
int opt;
while ((opt = getopt(argc, argv, "s:p:t:c:n:")) != -1)
{
switch (opt)
{
case 's':
strcpy(ctx.server_ip, optarg);
printf("-s: %s\n", optarg);
break;
case 'p':
ctx.server_port = atoi(optarg);
printf("-p: %s\n", optarg);
break;
case 't':
ctx.thread_num = atoi(optarg);
printf("-t: %s\n", optarg);
break;
case 'c':
ctx.connection_num = atoi(optarg);
printf("-c: %s\n", optarg);
break;
case 'n':
ctx.request_num = atoi(optarg);
printf("-n: %s\n", optarg);
break;
default:
return
EXIT_FAILURE;
}
}
pthread_t *threads = (pthread_t *)malloc(sizeof(pthread_t) * ctx.thread_num);
struct timeval start, end;
gettimeofday(&start, NULL);
for (i = 0; i < ctx.thread_num; i++)
{
printf("thread %d pthread_create\n", i);
pthread_create(&threads[i], NULL, test_qps, &ctx);
}
for (i = 0; i < ctx.thread_num; i++)
{
pthread_join(threads[i], NULL);
printf("thread %d finished\n", i);
}
gettimeofday(&end, NULL);
int time_used = TIMESUB_MS(start, end);
printf("success :%d, failed:%d, time used: %d , qps %d\n",
ctx.request_num-ctx.fail_num, ctx.fail_num, time_used, ctx.request_num * 1000 / time_used);
free(threads);
return EXIT_SUCCESS;
}