linux异步IO编程实例分析

news2024/9/23 13:17:43

 在Direct IO模式下,异步是非常有必要的(因为绕过了pagecache,直接和磁盘交互)。linux Native AIO正是基于这种场景设计的,具体的介绍见:KernelAsynchronousI/O (AIO)SupportforLinux。下面我们就来分析一下AIO编程的相关知识。

阻塞模式下的IO过程如下:

int fd = open(const char *pathname, int flags, mode_t mode);
ssize_t pread(int fd, void *buf, size_t count, off_t offset);
ssize_t pwrite(int fd, const void *buf, size_t count, off_t offset);
int close(int fd);

因为整个过程会等待read/write的返回,所以不需要任何额外的数据结构。但异步IO的思想是:应用程序不能阻塞在昂贵的系统调用上让CPU睡大觉,而是将IO操作抽象成一个个的任务单元提交给内核,内核完成IO任务后将结果放在应用程序可以取到的地方。这样在底层做I/O的这段时间内,CPU可以去干其他的计算任务。但异步的IO任务批量的提交和完成,必须有自身可描述的结构,最重要的两个就是iocb和io_event。

libaio中的structs

struct iocb {
        void     *data;  /* Return in the io completion event */
        unsigned key;   /*r use in identifying io requests */
        short           aio_lio_opcode;
        short           aio_reqprio;
        int             aio_fildes;
        union {
                struct io_iocb_common           c;
                struct io_iocb_vector           v;
                struct io_iocb_poll             poll;
                struct io_iocb_sockaddr saddr;
        } u;
};
struct io_iocb_common {
        void            *buf;
        unsigned long   nbytes;
        long long       offset;
        unsigned        flags;
        unsigned        resfd;
};

iocb是提交IO任务时用到的,可以完整地描述一个IO请求:

data是留给用来自定义的指针:可以设置为IO完成后的callback函数;

aio_lio_opcode表示操作的类型:IO_CMD_PWRITE | IO_CMD_PREAD;

aio_fildes是要操作的文件:fd;

io_iocb_common中的buf, nbytes, offset分别记录的IO请求的mem buffer,大小和偏移。

struct io_event {
        void *data;
        struct iocb *obj;
        unsigned long res;
        unsigned long res2;
};

io_event是用来描述返回结果的:

obj就是之前提交IO任务时的iocb;

res和res2来表示IO任务完成的状态。

libaio提供的API和完成IO的过程

libaio提供的API有:io_setup, io_submit, io_getevents, io_destroy。

1. 建立IO任务

int io_setup (int maxevents, io_context_t *ctxp);

io_context_t对应内核中一个结构,为异步IO请求提供上下文环境。注意在setup前必须将io_context_t初始化为0。

当然,这里也需要open需要操作的文件,注意设置O_DIRECT标志。

2.提交IO任务

long io_submit (aio_context_t ctx_id, long nr, struct iocb **iocbpp);

提交任务之前必须先填充iocb结构体,libaio提供的包装函数说明了需要完成的工作:

void io_prep_pread(struct iocb *iocb, int fd, void *buf, size_t count, long long offset)
{
        memset(iocb, 0, sizeof(*iocb));
        iocb->aio_fildes = fd;
        iocb->aio_lio_opcode = IO_CMD_PREAD;
        iocb->aio_reqprio = 0;
        iocb->u.c.buf = buf;
        iocb->u.c.nbytes = count;
        iocb->u.c.offset = offset;
}
void io_prep_pwrite(struct iocb *iocb, int fd, void *buf, size_t count, long long offset)
{
        memset(iocb, 0, sizeof(*iocb));
        iocb->aio_fildes = fd;
        iocb->aio_lio_opcode = IO_CMD_PWRITE;
        iocb->aio_reqprio = 0;
        iocb->u.c.buf = buf;
        iocb->u.c.nbytes = count;
        iocb->u.c.offset = offset;
}

这里注意读写的buf都必须是按扇区对齐的,可以用posix_memalign来分配。

3.获取完成的IO

long io_getevents (aio_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout);

这里最重要的就是提供一个io_event数组给内核来copy完成的IO请求到这里,数组的大小是io_setup时指定的maxevents。

timeout是指等待IO完成的超时时间,设置为NULL表示一直等待所有到IO的完成。

4.销毁IO任务

int io_destroy (io_context_t ctx);

相关视频推荐

让服务器底层性能飞起,异步,不一样的感觉

6种epoll的设计方法(单线程epoll、多线程epoll、多进程epoll)及每种epoll的应用场景

epoll实战揭秘-支撑亿级IO的底层基石

免费学习地址:c/c++ linux服务器开发/后台架构师

需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享

 

libaio和epoll的结合

在异步编程中,任何一个环节的阻塞都会导致整个程序的阻塞,所以一定要避免在io_getevents调用时阻塞式的等待。还记得io_iocb_common中的flags和resfd吗?看看libaio是如何提供io_getevents和事件循环的结合:

void io_set_eventfd(struct iocb *iocb, int eventfd)
{
        iocb->u.c.flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
        iocb->u.c.resfd = eventfd;
}

这里的resfd是通过系统调用eventfd生成的。

int eventfd(unsigned int initval, int flags);

eventfd是linux 2.6.22内核之后加进来的syscall,作用是内核用来通知应用程序发生的事件的数量,从而使应用程序不用频繁地去轮询内核是否有时间发生,而是由内核将发生事件的数量写入到该fd,应用程序发现fd可读后,从fd读取该数值,并马上去内核读取。

有了eventfd,就可以很好地将libaio和epoll事件循环结合起来:

1. 创建一个eventfd

efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);

2. 将eventfd设置到iocb中

io_set_eventfd(iocb, efd);

3. 交接AIO请求

io_submit(ctx, NUM_EVENTS, iocb);

4. 创建一个epollfd,并将eventfd加到epoll中

epfd = epoll_create(1);
epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent);
epoll_wait(epfd, &epevent, 1, -1);

5. 当eventfd可读时,从eventfd读出完成IO请求的数量,并调用io_getevents获取这些IO

read(efd, &finished_aio, sizeof(finished_aio);
r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms);

 

一个完整的编程实例

#define _GNU_SOURCE
#define __STDC_FORMAT_MACROS


#include <stdio.h>
#include <errno.h>
#include <libaio.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdint.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <inttypes.h>


#define TEST_FILE   "aio_test_file"
#define TEST_FILE_SIZE  (127 * 1024)
#define NUM_EVENTS  128
#define ALIGN_SIZE  512
#define RD_WR_SIZE  1024


struct custom_iocb
{
    struct iocb iocb;
    int nth_request;
};


void aio_callback(io_context_t ctx, struct iocb *iocb, long res, long res2)
{
    struct custom_iocb *iocbp = (struct custom_iocb *)iocb;
    printf("nth_request: %d, request_type: %s, offset: %lld, length: %lu, res: %ld, res2: %ld\n", 
            iocbp->nth_request, (iocb->aio_lio_opcode == IO_CMD_PREAD) ? "READ" : "WRITE",
            iocb->u.c.offset, iocb->u.c.nbytes, res, res2);
}


int main(int argc, char *argv[])
{
    int efd, fd, epfd;
    io_context_t ctx;
    struct timespec tms;
    struct io_event events[NUM_EVENTS];
    struct custom_iocb iocbs[NUM_EVENTS];
    struct iocb *iocbps[NUM_EVENTS];
    struct custom_iocb *iocbp;
    int i, j, r;
    void *buf;
    struct epoll_event epevent;


    efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (efd == -1) {
        perror("eventfd");
        return 2;
    }


    fd = open(TEST_FILE, O_RDWR | O_CREAT | O_DIRECT, 0644);
    if (fd == -1) {
        perror("open");
        return 3;
    }
    ftruncate(fd, TEST_FILE_SIZE);
    
    ctx = 0;
    if (io_setup(8192, &ctx)) {
        perror("io_setup");
        return 4;
    }


    if (posix_memalign(&buf, ALIGN_SIZE, RD_WR_SIZE)) {
        perror("posix_memalign");
        return 5;
    }
    printf("buf: %p\n", buf);


    for (i = 0, iocbp = iocbs; i < NUM_EVENTS; ++i, ++iocbp) {
        iocbps[i] = &iocbp->iocb;
        io_prep_pread(&iocbp->iocb, fd, buf, RD_WR_SIZE, i * RD_WR_SIZE);
        io_set_eventfd(&iocbp->iocb, efd);
        io_set_callback(&iocbp->iocb, aio_callback);
        iocbp->nth_request = i + 1;
    }


    if (io_submit(ctx, NUM_EVENTS, iocbps) != NUM_EVENTS) {
        perror("io_submit");
        return 6;
    }


    epfd = epoll_create(1);
    if (epfd == -1) {
        perror("epoll_create");
        return 7;
    }


    epevent.events = EPOLLIN | EPOLLET;
    epevent.data.ptr = NULL;
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent)) {
        perror("epoll_ctl");
        return 8;
    }


    i = 0;
    while (i < NUM_EVENTS) {
        uint64_t finished_aio;


        if (epoll_wait(epfd, &epevent, 1, -1) != 1) {
            perror("epoll_wait");
            return 9;
        }


        if (read(efd, &finished_aio, sizeof(finished_aio)) != sizeof(finished_aio)) {
            perror("read");
            return 10;
        }


        printf("finished io number: %"PRIu64"\n", finished_aio);
    
        while (finished_aio > 0) {
            tms.tv_sec = 0;
            tms.tv_nsec = 0;
            r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms);
            if (r > 0) {
                for (j = 0; j < r; ++j) {
                    ((io_callback_t)(events[j].data))(ctx, events[j].obj, events[j].res, events[j].res2);
                }
                i += r;
                finished_aio -= r;
            }
        }
    }
    
    close(epfd);
    free(buf);
    io_destroy(ctx);
    close(fd);
    close(efd);
    remove(TEST_FILE);


    return 0;
}


说明:

1. 在centos 6.2 (libaio-devel 0.3.107-10) 上运行通过

2. struct io_event中的res字段表示读到的字节数或者一个负数错误码。在后一种情况下,-res表示对应的

errno。res2字段为0表示成功,否则失败

3. iocb在aio请求执行过程中必须是valid的

4. 在上面的程序中,通过扩展iocb结构来保存额外的信息(nth_request),并使用iocb.data

来保存回调函数的地址。如果回调函数是固定的,那么也可以使用iocb.data来保存额外信息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/355582.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大数值金额大写转换(C语言)

关于大数值金额大写转换&#xff0c;在财务管理的应用方面没什么意义。一般来说&#xff0c;千亿级&#xff0c;万亿级的数值就够了。因为在国家级层面是以亿为单位的&#xff0c;也就表达为千万亿&#xff0c;万万亿。在企业层面数值金额转换设置到千亿、万亿就行了。大的集团…

RabbitMQ 入门到应用 ( 四 ) 与SpringBoot整合

5.与SpringBoot整合 5.1.SpringBoot项目中配置环境 5.1.1.pom.xml配置依赖 在 pom.xml 配置文件中声明依赖, 通过Maven导入 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> &l…

天津数据得出的权重

目标层&#xff1a;评价光污染准则层为四个大类&#xff1a;道路、商业、住宅、绿化方案层为25个小类指标但每个大类只和自己下面的几个小类指标相关&#xff0c;不是图示的下面两层全有关联或许考虑把25个小指标提取公共部分&#xff0c;比如路灯亮度、高度、和外墙的距离、光…

RocketMQ 2m-2s-async搭建

本文档是RocketMQ4.8两主两从异步复制的搭建过程&#xff08;也可单机部署&#xff0c;更简单一点&#xff09;,包括监控台界面. 写在前面&#xff1a;本文档适用于4.8版本&#xff0c;其它版本的坑没有踩过不清楚。我是用VMware启了两台虚拟机&#xff0c;环境&#xff1a;Ce…

10 OpenCV图像识别之人脸追踪

文章目录1 级联分类器2 人脸跟踪2.1 相关方法2.2 代码示例CV2中内置了人脸识别等常用的算法&#xff0c;这类算法是通过级联分类器实现的。 1 级联分类器 级联分类器的核心思想是使用多个分类器级联&#xff0c;每个分类器负责检测不同的特征&#xff0c;逐步排除不可能是目标…

Prometheus离线tar.gz包安装

Prometheus离线tar.gz包安装实验环境一、部署前操作二、Master2.1下载2.2解压2.3更改服务目录名称2.4创建系统服务启动文件2.5配置修改2.6启动并设置开机自启2.7访问2.8添加node节点2.8.1 添加方法2.8.2修改Prometheus配置&#xff08;Master&#xff09;实验环境节点ipcpu内存…

数据结构:归并排序和堆排序

归并排序 归并排序(merge sort)是利用“归并”操作的一种排序方法。从有序表的讨论中得知,将两个有序表“归并”为一个有序表,无论是顺序表还是链表,归并操作都可以在线性时间复杂度内实现。归并排序的基本操作是将两个位置相邻的有序记录子序列R[i…m]R[m1…n]归并为一个有序…

已解决zipfile.BadZipFile: File is not a zip file

已解决Python openpyxl 读取Excel文件&#xff0c;抛出异常zipfile.BadZipFile: File is not a zip file的正确解决&#xff0c;亲测有效&#xff01;&#xff01;&#xff01; 文章目录报错问题报错翻译报错原因解决方法联系博主免费帮忙解决报错报错问题 一个小伙伴遇到问题跑…

python基于django+vue微信小程序的校园二手闲置物品交易

在大学校园里,存在着很多的二手商品,但是由于信息资源的不流通以及传统二手商品信息交流方式的笨拙,导致了很多仍然具有一定价值或者具有非常价值的二手商品的囤积,乃至被当作废弃物处理。现在通过微信小程序的校园二手交易平台,可以方便快捷的发布和交流任何二手商品的信息,并…

【CS224W】(task2)传统图机器学习和特征工程

note 和CS224W课程对应&#xff0c;将图的基本表示写在task1笔记中了&#xff1b;传统图特征工程&#xff1a;将节点、边、图转为d维emb&#xff0c;将emb送入ML模型训练Traditional ML Pipeline Hand-crafted feature ML model Hand-crafted features for graph data Node-l…

被滥用的Slack服务:APT29针对意大利的攻击活动分析

背景 APT29&#xff0c;又名CozyBear, Nobelium, TheDukes&#xff0c;奇安信内部编号APT-Q-77&#xff0c;被认为是与东欧某国政府有关的APT组织。该组织攻击活动可追溯至2008年&#xff0c;主要攻击目标包括西方政府组织机构、智囊团。APT29曾多次实施大规模鱼叉攻击&#x…

linux高级命令之进程的注意点

进程的注意点学习目标能够说出进程的注意点1. 进程的注意点介绍进程之间不共享全局变量主进程会等待所有的子进程执行结束再结束2. 进程之间不共享全局变量import multiprocessing import time# 定义全局变量 g_list list()# 添加数据的任务defadd_data():for i in range(5):g…

snakeyaml自定义pojo写入yml文件时属性字段排序问题

snakeyaml采用LinkedHashMap保存对象&#xff0c;最后写入yml文件的时候&#xff0c;可以按照存入的顺序写入yml&#xff0c;如果采用自定义pojo&#xff0c;虽然可以写入yml&#xff0c;但是属性默认是按照字母顺序进行写入的。 如下所示&#xff0c;定义一个User实体&#xf…

kafka生产者事务踩坑记录

1. 背景 公司需要迁移一个老 spark 项目&#xff0c;之前是消费阿里 LogStore 中的实时数据&#xff0c;处理之后将结果落库。使用的是 spark streaming&#xff0c;batch 时间为 2 分钟。迁移后&#xff0c;需要将 LogStore 切换为 kafka&#xff0c;涉及到了对代码的改动。公…

常见的数据结构

栈&#xff08;stack&#xff09; 栈&#xff08; stack&#xff09;是限制插入和删除只能在一个位置上进行的表&#xff0c;该位置是表的末端&#xff0c;叫做栈顶&#xff08;top&#xff09;。它是后进先出&#xff08;LIFO&#xff09;的。对栈的基本操作只有 push&#x…

linux高级命令之线程

线程学习目标能够知道线程的作用1. 线程的介绍在Python中&#xff0c;想要实现多任务除了使用进程&#xff0c;还可以使用线程来完成&#xff0c;线程是实现多任务的另外一种方式。2. 线程的概念线程是进程中执行代码的一个分支&#xff0c;每个执行分支&#xff08;线程&#…

macos 下载 macOS 系统安装程序及安装U盘制作方法

01 下载 macOS 系统安装程序的方法 本文来自: https://discussionschinese.apple.com/docs/DOC-250004259 简介 Mac 用户时不时会需要下载 macOS 的安装程序&#xff0c;目的不同&#xff0c;或者升级或者降级&#xff0c;或者研究或者收藏。为了方便不同用户&#xff0c;除…

设计模式之委派模式与模板模式详解和应用

目录1 委派模式1.1 目标1.2 内容定位1.3 定义1.4 委派模式的应用场景1.5 委派模式在业务场景中的应用1.6 委派模式在源码中的体现1.6.1 双亲委派模型1.6.2 常用代理执行方法 invoke1.6.3 Spring loC中 在调用 doRegisterBeanDefinitions()1.6.4 SpringMVC 的DispatcherServlet1…

python基于vue微信小程序的校园闲置二手跳蚤商城的设计与实现

在当今社会的高速发展过程中,产生的劳动力越来越大,提高人们的生活水平和质量,尤其计算机科技的进步,数据和信息以人兴化为本的目的,给人们提供优质的服务,其中网上购买二手商品尤其突出,使我们的购物方式发生巨大的改变。而线上购物,不仅需要在硬件上为人们提供服务网上购物,而…

尚医通 (十七)手机登录

目录一、登录需求分析二、搭建service-user模块三、登录接口实现1、添加service接口与实现2、添加Mapper接口3、添加Controller方法四、手机验证码登录&#xff08;生成token&#xff09;1、使用JWT进行跨域身份验证1.1 传统用户身份验证1.2 解决方案2、JWT介绍3、整合JWT4、单…