网络编程 io_uring

news2024/10/7 4:29:49

io_uring

1、概述

io_uring是Linux(内核版本在5.1以后)在2019年加入到内核中的一种新型的异步I/O模型;

io_uring使用共享内存,解决高IOPS场景中的用户态和内核态的切换过程,减少系统调用;用户可以直接向共享内存提交要发起的I/O操作,内核线程可以直接获取共享内存中的I/O操作,并进行相应的读写操作;

优点
  • 避免了提交I/O事件和完成事件中存在的内存拷贝(使用共享内存)

  • 减少的了I/O任务提交和完成事件任务是的系统调用过程

  • 采取无锁队列,减少了锁资源的竞争

主要内存结果
  • 提交队列(Submission Queue,SQ)连续的内存空间,环形队列,存放将要执行的I/O操作数据
  • 完成队列(Completion Queue, CQ)连续的内存空间,环形队列,存放执行完成I/O操作后的返回结果
  • 提交队列项数组提(Submission Queue Entry,SQE):方便通过环形缓冲区提交内存请求
主要接口

io_uring提供三个用户态的系统调用接口

  1. io_uring_setup:初始化一个新的io_uring对象,一个SQ和一个CQ,通过使用共享内存进行数据操作
  2. io_uring_register:注册用于异步I/O的文件或用户缓冲区(buffers)
  3. io_uring_enter:提交I/O任务,等待I/O完成

在这里插入图片描述

SQ和CQ保存的都是SQEs数据的索引,不是真正的请求,真实是请求保存在SQE数组中,在提交请求时可以批量提交一组SQE数值上不连续的请求;

SQ、CQ、SQE中的内存区域都是有内核进行分配的,用户初始化会返回对应的fd,通过fd进行mmap和内核共享内存空间;

第三方库

liburing通过对io_uring进行分装,提供了一个简单的API,通过一下命令可以安装该动态库

git clone https://github.com/axboe/liburing.git
cd liburing
./configure
make
sudo make install
sudo ldconfig #更新动态库连接缓存
主要使用流程
1. io_uring初始化

io_uring通过io_uring_setup函数初始化,在liburing库中,通过io_uring_queue_init_params函数进行初始化,创建sumbmit队列和complete队列,以及SQE内存数组;

//io_uring实现异步的方式
struct io_uring_params pragma;
memset(&pragma, 0, sizeof(pragma));
struct io_uring ring;
// 初始化io_uring 创建submit队列和complite队列
io_uring_queue_init_params(1024, &ring, &pragma);
2. io_uring 提交(注册)到SQ环形队列

io_uring通过io_uring_register函数提交(注册)到用于异步I/O的缓冲区中,在liburing中通过io_uring_prep_accept函数对io_uring_refister进行封装使用;

// 获取ringbuffer的头
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
connect_info_t accept_info = {sockfd, EVENT_ACCEPT};
// 注册一个I/O事件
io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)clientaddr, addrlen, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));
3. io_uring_enter 提交I/O

io_uring中通过io_uring_enter函数来提交I/O,并等待事件的完成;在liburing中通过io_uring_submit来提交SQE的读写请求,io_uring_wait_cqe来等待I/O的处理结果,io_uring_peek_batch_cqe来获取CQ中的处理结果;

 // 提交worker中执行
io_uring_submit(&ring);
struct io_uring_cqe *cqe;
//等待complete队列中的结果
io_uring_wait_cqe(&ring, &cqe);
struct io_uring_cqe *cqes[128];
// 获取CQ环形队列中的处理结果
int count = io_uring_peek_batch_cqe(&ring, cqes, 128);
实现

io_uring_server.c

#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <netinet/in.h>

enum event_type {
    EVENT_ACCEPT,
    EVENT_READ,
    EVENT_WRITE
};

typedef struct connect_info{
    int conn_fd;
    int event;
}connect_info_t;

struct conn_info {
	int fd;
	int event;
};

int init_server(unsigned short port) 
{   
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("socket");
        return -1;
    }
    struct sockaddr_in serveraddr;;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_port = htons(port);
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    if (bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) < 0) {
        perror("bind error");
        return -1;
    }

    int opt = 1;
    if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
        perror("setsockopt");
        return -1;
    }

    listen(sockfd, 10);
    return sockfd; 
}

int set_event_recv(struct io_uring *ring, int sockfd, void *buf, int len, int flags)
{
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    connect_info_t accept_info = {sockfd, EVENT_READ};
    io_uring_prep_recv(sqe, sockfd, buf, len, flags);
	memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));
    printf("set event recv----\n");
    return 0;
}

int set_event_send(struct io_uring *ring, int sockfd, const void *buf, int len, int flags)
{
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    connect_info_t accept_info = {sockfd, EVENT_WRITE};
    io_uring_prep_send(sqe, sockfd, buf, len, flags);
	memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));
    printf("set event send----\n");
    return 0;
}

int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *clientaddr,
					socklen_t *addrlen, int flags) {

	// 获取sqe
	struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
	// 初始化accept_info
    connect_info_t accept_info = {sockfd, EVENT_ACCEPT};
	// 准备accept操作
	io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)clientaddr, addrlen, flags);
	// 设置用户数据
	memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));
    printf("set event accept\n");
    return 0;
}

int main(int argc, char *argv[])
{
    // 初始化服务器
    unsigned short port = 9999;
    // 初始化服务器
    int socketfd = init_server(port);
    if (socketfd < 0)
        return -1;
    //io_uring实现异步的方式
    struct io_uring_params pragma;
    // 初始化io_uring 创建submit队列和complite队列
    memset(&pragma, 0, sizeof(pragma));
    struct io_uring ring;
    io_uring_queue_init_params(1024, &ring, &pragma);

    struct sockaddr_in clientaddr;
    socklen_t addrlen = sizeof(struct sockaddr);
    // 提交到submit队列中
    set_event_accept(&ring, socketfd, (struct sockaddr*)&clientaddr, &addrlen, 0);

    char buffer[1024] = {0};

    while (1)
    {
        // 提交worker中执行
        io_uring_submit(&ring);
        printf("complete\n");
        struct io_uring_cqe *cqe;
        //等待complete队列中的结果
        io_uring_wait_cqe(&ring, &cqe);
        printf("complete end\n");

        struct io_uring_cqe *cqes[128];
        int count = io_uring_peek_batch_cqe(&ring, cqes, 128);

        for (int i = 0; i < count; i++)
        {
            struct io_uring_cqe *entries = cqes[i];
            connect_info_t result;
            //struct conn_info result;
	        memcpy(&result, &entries->user_data, sizeof(connect_info_t));
            if (result.event == EVENT_ACCEPT) 
            {
                // 设置读事件
                set_event_accept(&ring, socketfd, (struct sockaddr*)&clientaddr, &addrlen, 0);
                printf("accept success\n");
                int conn_fd = entries->res;
                printf("conn_fd = %d  res = %d\n", conn_fd, entries->res);
                // 设置读事件
                set_event_recv(&ring, conn_fd, buffer, 1024,0);
            }
            else if (result.event == EVENT_READ)
            {
                int ret = entries->res;
                printf("set_event_recv ret: %d, %s\n", ret, buffer);

                if (ret == 0)
                {
                    close(result.conn_fd);
                    continue;
                }
                else if (ret > 0)
                {
                    // 设置写事件
                    set_event_send(&ring, result.conn_fd, buffer, ret,0);
                }
                printf("read success\n");
            }
            else if (result.event == EVENT_WRITE)
            {
                int ret = entries->res;
                set_event_recv(&ring, result.conn_fd, buffer, 1024,0);
                printf("write success\n");
            }
        }
        io_uring_cq_advance(&ring, count);
    }
    
    return 0;
}

io_uring_test.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>

#include <sys/socket.h>
#include <arpa/inet.h>

#define TIMESUB_MS(tv1, tv2)  (((tv2).tv_sec - (tv1).tv_sec) * 1000 + ((tv2).tv_usec - (tv1).tv_usec) / 1000)
#define TEST_MESSAGE   "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyz\r\n"
#define RBUFFER_LENGTH 2048
#define WBUFFER_LENGTH 2048

typedef struct test_conttext
{
    char server_ip[16];
    int server_port;
    int thread_num;
    int connection_num;
    int request_num;
    int fail_num;
} test_conttext_t;

int send_recv_tcp(int sockfd)
{
    char wbuffer[WBUFFER_LENGTH];
    char rbuffer[RBUFFER_LENGTH];
    memset(wbuffer, 0, sizeof(wbuffer));
    memset(rbuffer, 0, sizeof(rbuffer));
    for (int i = 0; i < 8; i++)
    {
        strcpy(wbuffer + i * strlen(TEST_MESSAGE), TEST_MESSAGE);
    }

    int res = send(sockfd, wbuffer, strlen(wbuffer), 0);
    if (res <= 0)
    {
        return -1;
    }

    res = recv(sockfd, rbuffer, sizeof(rbuffer), 0);
    if (res <= 0)
    {
        return -1;
    }

    if (strcmp(rbuffer, wbuffer) != 0)
    {
        printf("failed: '%s' != '%s'\n", rbuffer, wbuffer);
        return -1;
    }
    return 0;
}

int connect_tcpserver(char *ip, int port)
{
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
    {
        perror("socket");
        return -1;
    }
   
    struct sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(port);
    server_addr.sin_addr.s_addr = inet_addr(ip);
    if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0)
    {
        perror("connect");
        close(sockfd);
        return -1;
    }
    return sockfd;
}

static void *test_qps(void *arg)
{
    test_conttext_t *ctx = (test_conttext_t *)arg;
    int sockfd = connect_tcpserver(ctx->server_ip, ctx->server_port);
    if (sockfd < 0)
    {
        printf("connect server failed\n");
        return NULL;
    }
    int conut = ctx->request_num / ctx->connection_num;
    int indx = 0;
    int res;
    while (indx++ < conut)
    {
        res = send_recv_tcp(sockfd);
        if (res < 0)
        {
            printf("send_recv_tcp failed\n");
            ctx->fail_num++;
            continue;
        }
    }
    return NULL;
}

int main(int argc, char *argv[])
{
    int i;
    printf("----%d\n", argc);
    // for (i = 1; i < argc; i++)
    //     printf("%s\n", argv[i]);
    
    test_conttext_t ctx = {0};
    int opt;
    while ((opt = getopt(argc, argv, "s:p:t:c:n:")) != -1)
    {
        switch (opt)
        {
        case 's':
            strcpy(ctx.server_ip, optarg);
            printf("-s: %s\n", optarg);
            break;
        case 'p':
            ctx.server_port = atoi(optarg);
            printf("-p: %s\n", optarg);
            break;
        case 't':
            ctx.thread_num = atoi(optarg);
            printf("-t: %s\n", optarg);
            break;
        case 'c':
            ctx.connection_num = atoi(optarg);
            printf("-c: %s\n", optarg);
            break;
        case 'n':
            ctx.request_num = atoi(optarg);
            printf("-n: %s\n", optarg);
            break;
        default:
            return 
                EXIT_FAILURE;
        }
    }

    pthread_t *threads = (pthread_t *)malloc(sizeof(pthread_t) * ctx.thread_num);
    struct timeval start, end;
    gettimeofday(&start, NULL);
    for (i = 0; i < ctx.thread_num; i++)
    {
        printf("thread %d pthread_create\n", i);
        pthread_create(&threads[i], NULL, test_qps, &ctx);
    }
    for (i = 0; i < ctx.thread_num; i++)
    {
        pthread_join(threads[i], NULL);
        printf("thread %d finished\n", i);
    }
    gettimeofday(&end, NULL);
    int time_used = TIMESUB_MS(start, end);
    printf("success :%d, failed:%d,  time used: %d , qps %d\n", 
        ctx.request_num-ctx.fail_num, ctx.fail_num, time_used, ctx.request_num * 1000 / time_used);
    free(threads);
    return EXIT_SUCCESS;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1479706.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

BioTech - 大分子药物设计 概述

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/136302202 大分子药物设计领域主要包括3个方面&#xff0c;即大环类药物设计、蛋白质与多肽类药物设计、核酸药物设计等&#xff0c;具体如下&…

计算机设计大赛 深度学习实现行人重识别 - python opencv yolo Reid

文章目录 0 前言1 课题背景2 效果展示3 行人检测4 行人重识别5 其他工具6 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **基于深度学习的行人重识别算法研究与实现 ** 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c…

JavaScript数据类型 检测数据类型 数据类型转换 数值相等比较

数值相等比较 JavaScript 提供三种不同的值比较运算&#xff1a; ——严格相等&#xff08;三个等号&#xff09; ——宽松相等&#xff08;两个等号&#xff09; 8种数据类型 前七种为基础数据类型。 Object类型为引用数据类型。 数据类型概念以及存储方式 let a {name:…

文本描述,简介

文章目录 需求分析要点剖析源码 需求 实现如下效果 分析 要点剖析 标题字体可以用 h1、h2、h3段落标签用 p 标签涉及到开头空两个格的使用 text-indent:2em;&#xff0c;如下&#xff1a; DIV、P标签首行缩进 <div style"text-indent:2em;">缩进的内容<…

《互联网的世界》第三讲-tcp

dns 找到了地址&#xff0c;spf 确定了路径&#xff0c;如何运输数据呢&#xff1f;今天讲 tcp。 计算机网络领域的特定技术是最后当你干这个事时才要用的&#xff0c;我对孩子们这样说&#xff0c;实际上你可以随便看一个快递单子来理解端到端传输协议。 源地址&#xff0c…

【k8s配置与存储--配置管理】

1、ConfigMap的配置 1.1 ConfigMap介绍 ConfigMap 是一种 API 对象&#xff0c;用来将非机密性的数据保存到键值对中。使用时&#xff0c; Pod 可以将其用作环境变量、命令行参数或者存储卷中的配置文件。 ConfigMap 将你的环境配置信息和容器镜像解耦&#xff0c;便于应用配…

GEE入门篇|图像处理(二):在Earth Engine中进行波段计算

目录 波段计算 1.NDVI的计算 2.NDVI 归一化差值的单次运算计算 3.使用 NDWI 的归一化差值 波段计算 许多指数可以使用 Earth Engine 中的波段运算来计算。 波段运算是对图像中两个或多个波段进行加、减、乘或除的过程。 在这里&#xff0c;我们将首先手动执行此操作&#x…

Day06:基础入门-抓包技术HTTPS协议APP小程序PC应用WEB转发联动

目录 HTTP/HTTPS协议抓包工具 Web浏览器抓包 APP应用抓包 WX小程序&PC应用抓包 思维导图 章节知识点&#xff1a; 应用架构&#xff1a;Web/APP/云应用/三方服务/负载均衡等 安全产品&#xff1a;CDN/WAF/IDS/IPS/蜜罐/防火墙/杀毒等 渗透命令&#xff1a;文件上传下载…

scons,一个实用的 Python 构建工具!

目录 前言 什么是SCons库&#xff1f; 安装SCons库 使用SCons库 SCons库的功能特性 1. 基于Python的构建描述语言 2. 自动化依赖管理 3. 多种构建环境支持 SCons库的应用场景 1. C/C项目构建 2. Python项目构建 3. 嵌入式系统开发 4. 持续集成环境 5. 跨平台项目构建 总…

云服务器比价之阿里云PK腾讯云,看看哪家便宜?

2024年阿里云服务器和腾讯云服务器价格战已经打响&#xff0c;阿里云服务器优惠61元一年起&#xff0c;腾讯云服务器62元一年&#xff0c;2核2G3M、2核4G、4核8G、8核16G、16核32G、16核64G等配置价格对比&#xff0c;阿腾云atengyun.com整理阿里云和腾讯云服务器详细配置价格表…

用人才测评来招聘,以及团队组建和优化

HR招聘流程&#xff1a;发布招聘信息&#xff0c;收集求职者的简历&#xff0c;筛选简历&#xff0c;线上&#xff08;人才测评&#xff09;&#xff0c;线下安排面试&#xff0c;线上&#xff08;人才测评&#xff09;&#xff0c;线下面试&#xff0c;&#xff08;线上人才测…

33-k8s项目实战-02-k8s的ca证书有效期更新

一、概述 我们知道&#xff0c;k8s各项组件之间的通信&#xff0c;都是使用https协议进行的&#xff0c;也就是ca证书&#xff0c;那么我们也知道ca证书都是有“有限期的”&#xff0c;一旦过期&#xff0c;系统就无法进行通信了&#xff1b; 这也是k8s在企业当中经常遇到的证书…

tcpdump 常用用法

简要记录下tcpdump用法 监控某个ip上的某个端口的流量 tcpdump -i enp0s25 tcp port 5432 -nn -S 各个参数作用 -i enp0s25 指定抓包的网卡是enp0s25 -nn 显示ip地址和数字端口 &#xff0c;如果只 -n 则显示ip&#xff0c;但是端口为services文件中的服务名 如果一个…

Orange3数据预处理(唯一组件)

唯一 删除重复的数据实例。 输入 数据&#xff1a;数据表格 输出 数据&#xff1a;无重复的数据表格 该组件删除重复的数据实例。用户可以选择一部分观察变量&#xff0c;因此&#xff0c;即使它们在其他人忽视的其他变量值上有所不同&#xff0c;两个实例也会被…

图片卷子怎么转换成word文档?3种方法轻松转换

图片卷子怎么转换成word文档&#xff1f;在日常学习中&#xff0c;将图片卷子转换成Word文档可以极大地方便学生们的学习和复习。首先&#xff0c;转换成Word文档后&#xff0c;学生们可以轻松地编辑、复制和粘贴其中的内容&#xff0c;从而快速整理学习笔记或制作复习资料。其…

C++内存对齐原则(struct长度大小)

一、什么是内存对齐原则 内存对齐原则指的是&#xff0c;保证各个存储空间的对齐。其目的是为了方便操作系统更加快捷的访问各个存储空间&#xff0c;也就是保证每次访问的偏移量都尽可能规律。 二、结构体strcut的内存对齐原则 对于C语言的struct而言&#xff0c;如果想计算s…

2024-02-29(Flink)

1.Flink原理&#xff08;角色分工&#xff09; 2.Flink执行流程 on yarn版&#xff1a; 3.相关概念 1&#xff09;DataFlow&#xff1a;Flink程序在执行的时候会被映射成一个数据流模型&#xff1b; 2&#xff09;Operator&#xff1a;数据流模型中的每一个操作被称作Operat…

rtu遥测终端介绍(智能rtu数据采集终端使用案例参考)

​随着工业互联网的快速发展,各类智能终端逐渐应用于工业现场,实现对设备和过程的智能监控。其中,RTU(遥测终端)凭借采集控制一体化的优势,在提升工作效率的同时降低了系统集成的复杂度,成为工业物联网建设中必不可少的重要组件之一。今天,我们就来看看厦门星创易联这一工业物联…

深度学习_15_过拟合欠拟合

过拟合和欠拟合 过拟合和欠拟合是训练模型中常会发生的事&#xff0c;如所要识别手势过于复杂&#xff0c;如五角星手势&#xff0c;那就需要更改高级更复杂的模型去训练&#xff0c;若用比较简单模型去训练&#xff0c;就会导致模型未能抓住手势的全部特征&#xff0c;那简单…

【OCR识别】使用OCR技术还原加密字体文字

文章目录 1. 写在前面2. 页面分析3. 字符知识4. 加密分析 【作者主页】&#xff1a;吴秋霖 【作者介绍】&#xff1a;Python领域优质创作者、阿里云博客专家、华为云享专家。长期致力于Python与爬虫领域研究与开发工作&#xff01; 【作者推荐】&#xff1a;对JS逆向感兴趣的朋…