2.2 事件驱动的reactor网络设计模型

news2025/1/10 16:20:17

在网络io、io多路复用select/poll/epoll、基于事件驱动的reactor中介绍了多种网络I/O方式,特别是事件驱动的reactor。其开发效率比直接使用IO多路复用要高,它一般是单线程的,设计目标是希望一个线程使用CPU的全部资源。
并且,相对于epoll,epoll是对IO的管理,检测接入的IO,触发IO事件;reactor是对事件的管理,不同的事件调用不同的回调函数;这样带来的好处是每个事件对应不同的回调函数,每个事件数据互不影响。本文详细介绍了实现过程

目录

  • 1、定义数据存储结构体
  • 2、初始化reactor
  • 3、实现reactor索引和扩大内存功能
  • 4、初始化socket,创建监听套接字
  • 5、实现reactor事件监听功能
  • 6、实现accept回调函数
  • 7、实现recv回调函数
  • 8、实现send回调函数
  • 9、reactor主循环(mainloop)
  • 9、实例


1、定义数据存储结构体

在这里插入图片描述
以fd为索引,数据的存取和读取都针对对象zv_connect_t。例如每个block有1024个zv_connect_t,则当fd=1048时,对应的是第2个block中的第24个zv_connect_t。

typedef int (*ZVCALLBACK)(int fd, int events, void *arg);

typedef struct zv_connect_s
{
	int fd;
	ZVCALLBACK cb;  //回调函数

	char rbuffer[BUFFER_LEN];  //存储读取的数据
	int rc; 	//rbuffer的长度
	int count; 	//决定每次读多少字节
	char wbuffer[BUFFER_LEN]; //存储待发送的数据
	int wc; 	//wbuffer的长度
} zv_connect_t;

typedef struct zv_connblock_s{
	zv_connect_t *block;
	struct zv_connblock_s *next;
} zv_connblock_t;

typedef struct zv_reactor_s{
	int epfd;
	int blkcont;

	zv_connblock_t *blockheader;
} zv_reactor_t;

2、初始化reactor

//开辟reactor的内存空间
int zv_init_reactor(zv_reactor_t* reactor){
	if (!reactor) return -1;
#if 0
	// 分配两块不连续的空间
	reactor->blockheader=malloc(sizeof(zv_connblock_t));
	if (reactor->blockheader == NULL) return -1;

	reactor->blockheader->block=calloc(1024,sizeof(zv_connect_t));
	if (reactor->blockheader->block == NULL) return -1;
#elif 1
	//分配两块连续的空间
	reactor->blockheader=(zv_connblock_t *)malloc(sizeof(zv_connblock_t)+EVENTS_LEN*sizeof(zv_connect_t));
	if (reactor->blockheader == NULL) return -1;

	reactor->blockheader->block = (zv_connect_t *)(reactor->blockheader+1);
#endif
	reactor->blkcont = 1;
	reactor->blockheader->next =NULL;
	
	reactor->epfd = epoll_create(1);
}

//释放
void zv_destory_reactor(zv_reactor_t* reactor){
	if (!reactor) return ;

	if (!reactor->blockheader) free(reactor->blockheader);

	close(reactor->epfd);
}

3、实现reactor索引和扩大内存功能

int zv_connect_block(zv_reactor_t *reactor){
	if (!reactor) return -1;

	zv_connblock_t *blk = reactor->blockheader;

	while (blk->next != NULL) blk = blk->next;

	zv_connblock_t *connblock=(zv_connblock_t *)malloc(sizeof(zv_connblock_t)+EVENTS_LEN*sizeof(zv_connect_t));
	if (connblock == NULL) return -1;

	connblock->block = (zv_connect_t *)(connblock+1);
	connblock->next =NULL;

	blk->next = connblock;
	reactor->blkcont ++; 

	return 0;

}

//返回第几个zv_connblock_t中的第几个zv_connect_t
zv_connect_t *zv_connect_idx(zv_reactor_t *reactor, int fd){
	if (!reactor) return NULL;

	int blockidx = fd/EVENTS_LEN;

	while (blockidx >= reactor->blkcont){
		//再开辟空间
		zv_connect_block(reactor);
	}

	int i = 0;
	zv_connblock_t *blk = reactor->blockheader;
	while(i++ < blockidx){
		blk = blk->next;
	}

	return &blk->block[fd % EVENTS_LEN];
}

4、初始化socket,创建监听套接字

通过socket创建套接字fd,并初始化相应的协议、端口、地址;通过bind绑定,listen监听。

int init_server(short port){
	int sockfd=socket(AF_INET,SOCK_STREAM,0);

	struct sockaddr_in servaddr;
	memset(&servaddr,0,sizeof(struct sockaddr_in));
	servaddr.sin_family=AF_INET;
	servaddr.sin_addr.s_addr=htonl(INADDR_ANY);
	servaddr.sin_port=htons(port);

	if (-1 == bind(sockfd,(struct sockaddr *)&servaddr,sizeof(struct sockaddr))){
		printf("bind failed: %s",strerror(errno));
		return -1;
	}

	listen(sockfd,10);

	printf("listen port: %d\n",port);
	
	return sockfd;
}

5、实现reactor事件监听功能

int set_listen(zv_reactor_t *reactor, int fd, ZVCALLBACK cb){

	if (!reactor || !reactor->blockheader ) return -1;

	reactor->blockheader->block[fd].fd = fd;
	reactor->blockheader->block[fd].cb = cb;

	struct epoll_event ev;
	ev.events = EPOLLIN;
	ev.data.fd = fd;

	epoll_ctl(reactor->epfd,EPOLL_CTL_ADD,fd,&ev);
}

6、实现accept回调函数

通过accept获得请求连接的客户端clientfd,通过reactor索引zv_connect_idx找到clientfd对应的内存地址,然后设置相应的事件信息,更改回调函数为recv;最后设置监听事件为EPOLLIN,添加到I/O多路复用器epoll中。

//建立连接
int accept_cb(int fd, int events, void *arg){

	struct sockaddr_in clientaddr;
	socklen_t len = sizeof(struct sockaddr);
	
	int clientfd = accept(fd,(struct sockaddr *)&clientaddr,&len);
	if (clientfd < 0) {
		printf("accept errno: %d\n", errno);
		return -1;
	}

	printf(" clientfd:%d\n",clientfd);

	/*建立连接请求之后,把原来的listen fd 置为 clientfd ,回调事件由原来的accept_cb置为recv_cb
	  再调用epoll_ctl*/
	zv_reactor_t *reactor = (zv_reactor_t *)arg;
	zv_connect_t *conn = zv_connect_idx(reactor, clientfd);

	conn->fd = clientfd;
	conn->cb = recv_cb;
	conn->count = BUFFER_LEN;

	struct epoll_event ev;
	ev.events=EPOLLIN;
	ev.data.fd=clientfd;
	epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, clientfd ,&ev );
}

7、实现recv回调函数

通过reactor索引zv_connect_idx找到clientfd对应的内存地址;通过recv接收数据存放到rbuffer;将rbuffer的数据拷贝到wbuffer,实现读写分离;更改回调函数为send;修改epoll的监听事件为EPOLLOUT

//接收数据
int recv_cb(int fd, int event, void *arg){

	zv_reactor_t *reactor = (zv_reactor_t *)arg;
	zv_connect_t *conn = zv_connect_idx(reactor, fd);

	//conn->rbuffer+conn->rc 是一个指针运算,从当前 rbuffer 已经存储的位置开始继续读取数据
	int ret = recv(fd,conn->rbuffer+conn->rc,conn->count,0);
	if (ret < 0){

	}
	else if (ret == 0){
		//释放空间,以供下个使用
		conn ->fd = -1;
		conn ->rc = 0;
		conn ->wc = 0;
		//移除
		epoll_ctl(reactor->epfd,EPOLL_CTL_DEL,fd,NULL);
		//关闭
		close(fd);

		return -1;
	}
	conn->rc += ret;
	printf("rbuffer:  %s, rc: %d\n", conn->rbuffer, conn->rc);

	//为了将读写分开,把要发送到数据,存到wbuffer
    memcpy(conn->wbuffer, conn->rbuffer, conn->rc);
    conn->wc = conn->rc;
    
	//置为写事件
    conn->cb = send_cb;

	struct epoll_event ev;
	ev.events=EPOLLOUT;
	ev.data.fd=fd;
	epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd ,&ev );
}

8、实现send回调函数

通过reactor索引zv_connect_idx找到clientfd对应的内存地址;通过send发送wbuffer中的数据;更改回调函数为recv;修改epoll的监听事件为EPOLLIN


//发送数据
int send_cb(int fd, int event, void *arg){

	zv_reactor_t *reactor = (zv_reactor_t *)arg;
	zv_connect_t *conn = zv_connect_idx(reactor, fd);

    send(fd, conn->wbuffer, conn->wc, 0);

    conn->cb = recv_cb;


	struct epoll_event ev;
	ev.events=EPOLLIN;
	ev.data.fd=fd;
	epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, fd ,&ev );
}

9、reactor主循环(mainloop)

创建MAX_PORT个监听fd,循环监听epoll,根据触发的事件选择相关回调函数。

//创建MAX_PORT个监听套接字
	int i=0;
	for (i=0;i<MAX_PORT;i++){
		int sockfd = init_server(post+i);
		set_listen(&reactor, sockfd, accept_cb);
	}

	struct epoll_event events[EVENTS_LEN] = {0};
	while(1){
		int nready = epoll_wait(reactor.epfd, events, EVENTS_LEN, -1);
		if (nready < 0) continue;

		int i = 0;
		for (i = 0;i<nready;i++){
			int connfd = events[i].data.fd;
			zv_connect_t *conn = zv_connect_idx(&reactor, connfd);

			if (events[i].events & EPOLLIN){ //读事件(连接请求、接收消息)
				conn->cb(connfd, events[i].events, &reactor);
			}
			if (events[i].events & EPOLLOUT){ //写事件(发送消息)
				conn->cb(connfd, events[i].events, &reactor);
			}
		}
	}

9、实例

代码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/611074.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Rxjava2系列:RXjava2.1.7源码下载

找到github项目地址&#xff1a;https://github.com/ReactiveX/RxJava/ 找到右侧的release&#xff0c;点击打开。 搜索2.1.7 (2.1.7是这篇文章&#xff1a;RxJava 是如何实现线程切换的&#xff08;上&#xff09;的源码版本&#xff09; 找到Assert&#xff0c;下载 解…

计算机专业应届毕业生有没有必要参加IT培训?

大学学习的计算机专业&#xff0c;毕业还需要进行IT培训吗&#xff1f;我想&#xff0c;这个问题也困扰着你们吧。那今天小课就带着你们分析一下&#xff0c;计算机专业毕业的应届生到底有没有必要进行培训。 了解企业的技术需求 考虑培训不培训&#xff0c;首先要了解一下现在…

Vue.js 中的数据双向绑定是如何实现的?

Vue.js 中的数据双向绑定是如何实现的&#xff1f; Vue.js 是一款流行的前端框架&#xff0c;它的核心功能之一是数据双向绑定。本文将介绍 Vue.js 中数据双向绑定的实现原理&#xff0c;并附上相关代码实例。 什么是数据双向绑定&#xff1f; 在传统的前端开发中&#xff0c…

Matlab论文插图绘制模板第98期—大小不同多子图(Subplot)

上一篇文章分享了Matlab多子图的绘制模板&#xff1a; 但假如子图的大小不是相同的&#xff0c;该怎么操作呢&#xff1f; 本期就来分享一下大小不同多子图的绘制模板。 先来看一下成品效果&#xff1a; 特别提示&#xff1a;本期内容『数据代码』已上传资源群中&#xff0c;…

java SSM 房屋管理系统统myeclipse开发mysql数据库springMVC模式java编程计算机网页设计

一、源码特点 java SSM 房屋管理系统是一套完善的web设计系统&#xff08;系统采用SSM框架进行设计开发&#xff0c;springspringMVCmybatis&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和 数据库&#xff0c;系统主要采用B…

Linux - 第22节 - 网络基础(其他重要协议或技术)

1.DNS协议 • DNS&#xff08;Domain Name System&#xff0c;域名系统&#xff09;协议&#xff0c;是一个用来将域名转化为IP地址的应用层协议。 • DNS协议属于应用层协议&#xff0c;由UDP实现其域名解析功能。 1.1.DNS背景 TCP/IP中通过IP地址和端口号的方式&#xff0c;来…

地震勘探基础(七)之地震静校正

地震静校正 首先&#xff0c;为什么要进行地震静校正处理呢&#xff1f;主要的原因是地震勘探中激发和接收的观测面不完全是水平的。尤其是在山区、沙漠和黄土原地区。而且近地表还存在风化层或低、降速带低、降速带的厚度和速度会发生变化&#xff0c;这就导致反射波的传播时…

HTML5 FormData对象

利用FormData对象,你可以使用一系列的键值对来模拟一个完整的表单,然后使用XMLHttpRequest发送这个"表单". 创建一个FormData对象 你可以先创建一个空的FormData对象,然后使用append()方法向该对象里添加字段,如下: var oMyForm new FormData();oMyForm.append(&…

Web服务器的工作原理

Web服务器的工作原理 什么是web服务器、应用服务器和web容器&#xff1f;什么是Servlet&#xff1f;他们有什么作用&#xff1f;什么是ServletContext&#xff1f;它由谁创建&#xff1f;ServletRequest和ServletResponse从哪里进入生命周期&#xff1f;如何管理Session&#x…

界面开发框架Qt新手入门教程:Dir视图使用实例

Qt 是目前最先进、最完整的跨平台C开发工具。它不仅完全实现了一次编写&#xff0c;所有平台无差别运行&#xff0c;更提供了几乎所有开发过程中需要用到的工具。如今&#xff0c;Qt已被运用于超过70个行业、数千家企业&#xff0c;支持数百万设备及应用。 点击获取Qt Widget组…

容器(第一篇)docker安装、基础操作命令

docker是什么&#xff1f; docker是一个go语言开发的应用容器引擎。 docker的作用&#xff1f; ①运行容器里的应用&#xff1b; ②docker是用来管理容器和镜像的一种工具。 容器 与 虚拟机 的区别&#xff1f; 容器 虚拟机所有容器共享宿主机…

AIGC大模型之——以文生图介绍

一、什么是以文生图&#xff1f; 以文生图是AIGC ( AI Generated Content &#xff09;框架中的一个关键技术&#xff0c;通过文字描述&#xff0c;将文字转化为图像并展示出来。以文生图具有白动化程度高、精度高、可扩展性强、可定制化等优势&#xff0c;具有广泛的应用前景&…

PyTorch 提示和技巧:从张量到神经网络

张量和梯度 我们将深入探讨使用 PyTorch 构建自己的神经网络必须了解的 2 个基本概念&#xff1a;张量和梯度。 张量 张量是 PyTorch 中的中央数据单元。它们是类似于数组的数据结构&#xff0c;在功能和属性方面与 Numpy 数组非常相似。它们之间最重要的区别是 PyTorch 张量…

Hadoop中HDFS概述

Hadoop概述之HDFS HDFS架构概述优缺点HDFS架构HDFS文件块大小HDFS的shell命令HDFS读写流程写数据流程 HDFS读数据流程NameNode 和 SecondaryNameNode工作机制DataNode工作机制DataNode数据完整性如何保证 端口名称Hadoop2.xHadoop3.xNameNode内部通信端口8020/9000NameNode HTT…

【STM32单片机】基于语音识别的智能分类垃圾桶,ld3320语音识别模块如何使用,mp3播放模块如何使用

文章目录 需求语音识别模块MY1690 播放模块舵机源码 需求 对于“可回收物”“有害垃圾”“厨余垃圾”“其它垃圾”&#xff0c;不能分清扔到哪个垃圾桶怎么办&#xff1f; 基于语音识别的智能分类垃圾桶&#xff0c;识别到关键词就打开对应的垃圾桶&#xff0c;完全没有分不清…

echarts中彻底清除所有实例和相关数据

单个实例 dispose销毁实例&#xff0c;销毁后实例无法再被使用。 myChart.dispose();实例比较多的时候 获取Dom元素 let doms document.getElementsByClassName(my-chart)销毁所有实例 if(doms && doms.length) {for (let i 0; i < doms.length; i) {let chartIn…

Jumpserver 2.28.8使用分享

目录 一、Jumpserver 介绍 1、跳板机和堡垒机理解 1.1、跳板机 1.2、堡垒机 2、jumpserver简介 二、Jumpserver 安装部署 2.1、部署规划 2.2 、安装要求 JumpServer 环境要求: 2.3、安装方法介绍 官方提供了多种安装方法 三、Jumpserver平台使用 3.1、Admin登录 3.…

bug(Tomcat):StandardContext.startInternal 由于之前的错误,Context[/day01]启动失败

引出 项目启动失败&#xff0c;一个困扰了一上午的bug 报错信息 org.apache.catalina.core.StandardContext.startInternal 一个或多个筛选器启动失败。完整的详细信息将在相应的容器日志文件中找到 org.apache.catalina.core.StandardContext.startInternal 由于之前的错误…

骨传导是哪个意思,推荐几款性能优的骨传导耳机

​骨传导耳机是通过头部骨迷路传递声音&#xff0c;而不是直接通过耳膜的振动来传递声音。与传统的入耳式耳机相比&#xff0c;骨传导耳机不会堵耳朵&#xff0c;在跑步、骑车等运动时可以更好的接收外界环境音&#xff0c;保护听力&#xff0c;提升安全性。此外&#xff0c;骨…

图解LeetCode——114. 二叉树展开为链表

一、题目 给你二叉树的根结点 root &#xff0c;请你将它展开为一个单链表&#xff1a; 展开后的单链表应该同样使用 TreeNode &#xff0c;其中 right 子指针指向链表中下一个结点&#xff0c;而左子指针始终为 null 。 展开后的单链表应该与二叉树 先序遍历 顺序相同。 二…