【Linux】【网络】Libevent 内核实现简略版
1 event_base结构–>相当于Reactor
在使用libevent之前,就必须先创建这个结构。
以epoll为例:
1.1evbase
void* evbase-->epollop结构体
(以epoll为例)
libevent通过一个void* 型实例 evbase 来存储这些一切类型结构的指针。
evbase所指向的结构,是一个叫做epollop的结构,它包含了:
- epoll检测到有事件触发时,填写能够描述事件信息的epoll_event* 列表
- 表示epoll_event列表中,有多少个有效事件的nevents变量
- 表示epoll实例本身的epfd
- 一个用来处理时间的可以选择使用的timerfd
1.2evsel
eventop* evsel-->eventop结构体
(支持io复用的统一接口)
evsel则代表了,所有IO复用器在使用过程中的几种相似的操作。
- name:eventop结构的名称
- init函数指针:用于创建evbase实例的函数,在event_base实例初始化阶段,就要调用这个函数进行创建,在epoll的使用情景中,这个函数会创建一个上文提到的epollop结构实例。
- add函数指针:我们创建的所有的event,最后都要塞到event_io_map类型变量–io之中(event_base结构中的一个成员),而将event塞入的同时,要将与之相关联的fd,添加到事件多路分发器中,进行事件监听,而这个操作,就是通过evsel->add函数来执行(比如将fd添加到epoll的监听列表中)。这样fd就能够借助IO多路分发器感知事件,并且在触发时根据它找回对应的,与之关联的事件,并且激活,执行它。
- del函数指针:我们的event,从io这个event_io_map类型的结构中删除时,也要将与event关联的fd,从IO多路复用器监听列表中删除,这个del函数就是干这件事情的。
- dispatch函数指针:这个函数在event_base_loop中调用,主要作用就是调用select、epoll_wait、poll等函数,在没有IO事件时,将线程投入睡眠,在有IO事件到达,或定时事件触发时,唤醒线程,并且将有IO事件的fd相关的event放入激活列表中。
- dealloc函数指针:在event_base_free函数里调用,也就是在销毁event_base实例时,要顺带将evbase实例释放,释放之前要做一些反注册操作,比如将epoll的事件列表,以及实例销毁等等。
- need_reinit:是否需要重新初始化的变量标记,一般在fork一个进程时,要使用,我们的使用案例中很少用到fork,这里不深入讨论。在epoll的使用范例中,它的值是1。
- feature:我前面说过,libevent整合了多种IO多路复用技术的使用,这些不同的IO多路复用器(事件多路分发器),能够支持的特性也是不同的,这些他们本身就支持的特性就被记录在feature字段中
- EV_FEATURE_ET:支持边缘触发机制
- EV_FEATURE_O1:当有IO事件触发时,epoll获取有效事件的效率接近O(1)(epoll_wait唤醒时,event列表中,小于nevent的事件均是有效事件),而select的效率是O(n),每个都要测试
- EV_FEATURE_EARLY_CLOSE:epoll支持RDHUB事件,即对端关闭连接时,能够被epoll_wait感知
2 event结构–>相当于事件处理器
每个fd都有与之关联的event_map_entry结构,而event_map_entry结构中,又有一个event双向链表,我们创建出来的event,最后就是会被塞入这个链表中,当然,删除一个event时,也会将其从这个链表中移除。
struct event {
struct event_callback ev_evcallback;
/* for managing timeouts */
union {
TAILQ_ENTRY(event) ev_next_with_common_timeout;
int min_heap_idx;
} ev_timeout_pos;
evutil_socket_t ev_fd;
struct event_base *ev_base;
union {
/* used for io events */
struct {
LIST_ENTRY (event) ev_io_next;
struct timeval ev_timeout;
} ev_io;
/* used by signal events */
struct {
LIST_ENTRY (event) ev_signal_next;
short ev_ncalls;
/* Allows deletes in callback */
short *ev_pncalls;
} ev_signal;
} ev_;
short ev_events;
short ev_res; /* result passed to event callback */
struct timeval ev_timeout;};
- ev_evcallback:其实就是event_callback类型的变量
- ev_timeout_pos:我们默认使用小根堆,因此使用min_heap_idx,它在event是定时事件的时候使用,min_heap_idx的值表示事件位于小根堆数组的哪个位置
- ev_fd:socket的fd
- ev_base:就是前面讨论过的event_base实例的指针
- ev_events:代表事件的类别和属性,类别包括EV_READ(读)、EV_WRITE(写)等
- 属性包括EV_PERSIT(表示是个持续事件,即激活执行后,事件不会从evmap_io_map中删除,如果没设置这个属性,则表示事件只能被激活一次,被激活执行后,就会从evmap_io_map中清除)
- 这里需要注意的是,ev_events设置为EV_PERSIST的时候,ev_evcallback的evcb_closre也会被同时设置为EV_CLOSURE_EVENT_PERSIST
- ev_res:当事件被激活时,事件要被塞入激活列表(activequeue)时,它会被设置,主要分以下几种情况:
- IO多路复用器(select、epoll_wait)被唤醒时,检测到的读(EV_READ)、写(EV_WRITE)和关闭(EV_CLOSE)事件,读写事件可以同时存在
- 定时事件触发时,将其设置为EV_TIMEOUT
- ev_timeout:当event事件是定时事件时,要被使用。它和ev_中的ev_timeout不同的是,它一般记录的是绝对时间,定时器的触发时机,以这个时间为准,并且可以在定时类型为持续触发和非持续触发两种情况下使用
现在来看一下,一个已经创建好的event,是如何添加到event_io_map中的
- 这个操作是通过一个叫做evmap_io_add的函数进行的,
- 它的主要逻辑是:通过hashsocket函数计算出event fd的hash值
- hth_table_idx = hashsocket(ev) % hth_table_length
- 获取首个event_map_entry实例,event_map_entry* head = hth_table[hth_table_idx]
- 遍历event_map_entry,找到fd与event fd相等的event_map_entry实例
- 将event插入event_map_entry的evmap_io实例的event列表中
3 事件循环
接下来,要讨论的则是我们的事件循环,执行这个流程的函数主要有两个,一个是event_base_dishatch,另一个则是event_base_loop函数
int event_base_loop(struct event_base* base, int flags);
它的第一个参数是传入event_base实例,第二个参数是填的flag标记,标记会影响到loop的行为,先来看一下不填写标记,也就是flag为0的伪代码:
function event_base_loop(base, no_flag) {
local done = 0;
while (!done) {
// 从小根堆中,取出根部,作为dispatch的最大等待时间
// 如果小根堆为空,则tv为NULL,如果tv为NULL,那么dispatch会一直等待
local tv = timeout_next(base)
// 没注册事件,也没有激活的事件,直接退出,这种情况下,libevent
// 没有运转的需要
if (!has_activate_callbacks(base) && has_events(base)) {
done = 1;
break;
}
// dispatch会根据实际情况,调用select、poll、或者是
// epoll_wait,tv为NULL则表示这里会一直睡眠,直至
// 有IO事件触发,tv不为NULL,则它成为最大的睡眠时间
// IO事件触发后,相关的event_callback实例,会被塞入
// 激活列表中
local res = base->evsel->dispatch(base, tv);
// 这个函数,不断从小根堆中,抽取超时时间小于当前时间的根节点,
// 并满足条件的将定时事件的event_callback塞入激活事件列表中
timeout_process(base);
// 判断激活队列中是否有事件
if (has_activate_callbacks(base)) {
// 执行激活队列中,事件的callback函数
event_process_activate(base);
}
}
}
上述的no_flag,其实质就是flags值为0的情况,实际上,libevent为我们提供了另一个函数,用来简化调用,这个函数的定义如下所示:
int event_dispatch(void){ return (event_loop(0));}
我们接下来来看一下,设置flags的情况,主要针对EVENT_LOOP_ONCE和EVENT_LOOP_NONBLOCK,我们来看一下,设置了EVENT_LOOP_ONCE一次性事件循环模式和EVENT_LOOP_NONBLOCK非阻塞模式时的伪代码:
function event_base_loop(base, flags = EVENT_LOOP_ONCE | EVENT_LOOP_NONBLOCK) {
local done = 0;
while (!done) {
local tv = NULL
// 如果flags设置了EVENT_LOOP_NONBLOCK的标记,那么dispatch将
// 不会进行睡眠,而是立刻被唤醒
if (flags & EVENT_LOOP_NONBLOCK)
tv = 0;
else
tv = timeout_next(base);
// 没注册事件,也没有激活的事件,直接退出,这种情况下,libevent
// 没有运转的需要
if (!has_activate_callbacks(base) && has_events(base)) {
done = 1;
break;
}
// dispatch会根据实际情况,调用select、poll、或者是
// epoll_wait,tv为NULL则表示这里会一直睡眠,直至
// 有IO事件触发,tv不为NULL,则它成为最大的睡眠时间
// 如果tv为0,那么dispatch函数将不会进行睡眠,而是立刻被唤醒
// IO事件触发后,相关的event_callback实例,会被塞入
// 激活列表中
local res = base->evsel->dispatch(base, tv);
// 这个函数,不断从小根堆中,抽取超时时间小于当前时间的根节点,
// 并满足条件的将定时事件的event_callback塞入激活事件列表中
timeout_process(base);
if (has_activate_callbacks(base)) { // 判断激活队列中是否有事件
local n = event_process_activate(base); // 执行激活队列中,事件的callback函数
// 如果标记为EVENT_LOOP_ONCE,并且没有更多激活的回调,或者已处理的回调不为0
if (flags & EVENT_LOOP_ONCE &&
!has_activate_callbacks(base) &&
n != 0) {
done = 1;
}
}
else if (flags & EVENT_LOOP_NONBLOCK) {
done = 1;
}
}
}
4 注册一个读写事件的大致的逻辑流程
我们前面已经了解到了event_base结构,event结构,事件循环流程之类的。我们现在来看一下,注册一个读写事件的大致的逻辑流程是怎样的:
// 读取数据的回调函数
void socket_read_cb(evutil_socket_t fd, short events, void* cbarg) {
// 调用read函数,读取数据
// 将读出的数据,塞入自定义的buffer列表中
// 处理分包粘包,得到完整的请求包
}
// 监听连接的回调函数
void listener_cb(evutil_socket_t listener_fd, short events, void* cbarg) {
evutil_socket_t fd = accept(listener_fd, NULL, NULL);
event_base* base = (event_base*)cbarg;
// 创建一个新的事件来监听连接的读取事件
event* read_ev = event_new(base, fd, EV_READ, socket_read_cb, base);
// 将事件添加到事件队列中
event_add(base, read_ev);
}
int main() {
struct event_base* base;
struct sockaddr_in sin;
evutil_socket_t listener_fd;
// 创建事件基础对象
base = event_base_new();
// 创建监听套接字
listener_fd = socket(AF_INET, SOCK_STREAM, 0);
// 设置监听套接字地址
memset(&sin, 0, sizeof(sin));
// 绑定监听套接字
if (bind(listener_fd, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
}
// 将监听套接字设置为监听状态
if (listen(listener_fd, 16) < 0) {}
// 为监听套接字创建事件,监听可读事件(有新的连接到达)
struct event* listener_ev = event_new(base, listener_fd, EV_READ | EV_PERSIST, listener_cb, base);
// 将监听事件添加到事件队列
event_add(listener_ev, NULL);
// 启动事件循环
event_base_dispatch(base);
// 释放资源
event_base_free(base);
return 0;
}
上述伪代码,先是创建了一个监听的socket,并且在绑定了端口和设置为listen状态之后,就开始等待连接到达。
当新的连接到达时,event_base_dispatch内的base->evsel->dispatch函数被唤醒,并且将可读事件塞入激活列表中,最后再调用它们,此时listener_cb函数被触发。
在这个函数,显示accept了一个新的连接,然后为其创建了一个event,并设置了socket_read_cb作为它的读取事件,当该fd有可读事件时,这个函数会被调用,我们可以看到,伪代码中,我们需要自己主动去读取数据,然后自己管理读取buffer列表等等,处理起来非常麻烦。
为此,libevent给我们提供了一个新的结构,这个结构就叫做bufferevent,它为我们管理读写的buffer队列,帮我们处理读写事件,甚至在某种程度上,实现了网络库的proactor模式。同时它也为我们在多线程的情况下,提供了安全的读写操作功能,使得我们不用自己去写加锁解锁逻辑。
5 bufferevent
bufferevent是libevent库中非常核心的结构之一,它提供了高效、简化的网络I/O处理方式。它封装了事件的注册、回调的设置、数据的读写等操作,允许用户专注于业务逻辑的处理,而无需关心低级的I/O事件管理。以下是对bufferevent结构的详细总结:
5.1bufferevent结构体
struct bufferevent {
struct event_base *ev_base; // event_base实例,用于事件调度
const struct bufferevent_ops *be_ops; // 操作函数集,如enable, disable, destruct等
struct event ev_read; // 读取事件的结构,注册和触发读操作
struct event ev_write; // 写入事件的结构,注册和触发写操作
struct evbuffer *input; // 输入缓存区,存储从网络读取的数据
struct evbuffer *output; // 输出缓存区,存储待写入的数据
struct event_watermark wm_read; // 读取水位标记,控制何时触发读回调
struct event_watermark wm_write; // 写入水位标记,控制何时触发写回调
bufferevent_data_cb readcb; // 用户定义的读回调函数
bufferevent_data_cb writecb; // 用户定义的写回调函数
bufferevent_event_cb errorcb; // 用户定义的错误回调函数
void *cbarg; // 回调函数的上下文参数
struct timeval timeout_read; // 读取事件的超时时间
struct timeval timeout_write; // 写入事件的超时时间
short enabled; // 当前启用的事件类型(EV_READ 或 EV_WRITE)
};
-
ev_base:
- 这是指向
event_base
的指针,event_base
是事件循环的核心,管理和调度所有的事件。
- 这是指向
-
be_ops:
- 这是指向
bufferevent_ops
的指针,bufferevent_ops
包含了bufferevent
操作的函数,比如启用和禁用事件(enable
,disable
),以及实例销毁(destruct
)等。
- 这是指向
-
ev_read和 ev_write:
- 这两个字段分别是
event
类型,表示当可读事件或可写事件触发时,调用的回调函数。ev_read
用于处理输入数据,ev_write
用于处理输出数据。
- 这两个字段分别是
-
input和 output:
input
是输入缓冲区,存储从网络读取的数据,使用evbuffer
结构来实现数据的管理。output
是输出缓冲区,存储待发送的数据,同样通过evbuffer
实现。
-
wm_read 和 wm_write:
- 水位标记控制了何时触发读取或写入的回调。
wm_read
控制何时触发readcb
,wm_write
控制何时触发writecb
。它们有low
和high
两个水位,默认情况下设置为0,表示只要数据存在就触发回调。
- 水位标记控制了何时触发读取或写入的回调。
-
readcb 和 writecb:
- 这两个回调函数分别用于处理读取和写入事件。当数据准备好时,libevent会调用这两个回调函数来通知应用层,供其进行数据的处理或发送。
-
errorcb:
- 错误回调函数,在发生错误时会被调用。它可以用于处理错误,例如连接丢失、超时等,并且通常会在该回调中关闭连接。
-
cbarg:
- 用户自定义的参数,可以用于传递上下文信息,传递给回调函数。
-
timeout_read和 timeout_write:
- 设置读取和写入事件的超时时间。如果数据在指定时间内没有到达或无法写入,libevent会调用超时处理回调。
-
enabled:
- 当前启用的事件类型。可以是
EV_READ
(读事件)或EV_WRITE
(写事件)。通过bufferevent_enable
和bufferevent_disable
来动态修改。
- 当前启用的事件类型。可以是
5.2 bufferevent_private结构:
除了核心的bufferevent
结构外,libevent中还定义了bufferevent_private
结构,它包含了一些额外的字段,主要用于处理选项、引用计数和锁等。
struct bufferevent_private {
struct bufferevent bev; // 包含核心的bufferevent结构
enum bufferevent_options options; // 设置的选项,如BEV_OPT_THREADSAFE等
int refcnt; // 引用计数,当为0时,bufferevent实例会被释放
void *lock; // 如果启用了BEV_OPT_THREADSAFE,则会分配锁来保护输入输出缓存
};
5.3 重要的API函数:
-
创建bufferevent实例:
struct bufferevent* bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options);
- 创建一个新的
bufferevent
实例,绑定到指定的socket文件描述符fd
,并关联到指定的event_base
。
- 创建一个新的
-
设置回调函数:
void bufferevent_setcb(struct bufferevent *bufev, bufferevent_data_cb readcb, bufferevent_data_cb writecb, bufferevent_event_cb eventcb, void *cbarg);
- 为
bufferevent
实例设置读取、写入和错误事件的回调函数。
- 为
-
注册事件:
int bufferevent_enable(struct bufferevent *bufev, short event); int bufferevent_disable(struct bufferevent *bufev, short event);
- 注册或注销读写事件。通过
bufferevent_enable
将EV_READ
和EV_WRITE
事件添加到事件循环中,通知libevent开始监听这些事件。
- 注册或注销读写事件。通过
-
写入数据:
int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size);
- 将数据写入到输出缓冲区,
bufferevent_write
会将数据从应用程序传输到网络。
- 将数据写入到输出缓冲区,
-
销毁bufferevent实例:
void bufferevent_free(struct bufferevent *bufev);
- 销毁
bufferevent
实例并释放相关资源。
- 销毁
总结:
bufferevent
通过抽象事件的注册、回调的设置和数据的读写,简化了网络I/O的处理过程,尤其是在多线程或高并发的环境下非常有用。- 它结合了
evbuffer
进行数据缓存,支持自动处理粘包和分包问题。 - 通过
bufferevent_enable
和bufferevent_disable
可以动态管理事件的启用与禁用。 bufferevent
的使用可以显著简化基于事件驱动的网络应用开发,提高代码的可维护性和扩展性。
bufferevent 仍然是单线程
bufferevent 仍然是在 libevent 的单线程模型下运行的。
虽然 bufferevent
本身提供了一些更高级的抽象来简化 I/O 操作,但 libevent 本身的事件循环 (event_base
) 仍然是基于单线程的。即使你在 bufferevent
中使用了线程安全的选项(如 BEV_OPT_THREADSAFE
),它主要是在多个线程间进行同步,而不是完全实现多线程并行处理。
-
事件循环 (event_base) 是单线程的:
- 在 libevent 中,所有事件都通过一个
event_base
实例进行调度。默认情况下,event_base
是单线程运行的,这意味着所有事件的分发和回调处理都发生在同一个线程中。 - 即使
bufferevent
提供了线程安全的缓冲区(如通过BEV_OPT_THREADSAFE
选项),它只是确保在多线程环境下的input
和output
缓冲区操作是线程安全的,并不会改变事件循环的单线程模型。
- 在 libevent 中,所有事件都通过一个
-
事件调度和回调都在一个线程中执行:
- 当
bufferevent
注册了读写事件后,event_base
会将这些事件添加到事件循环中。当这些事件发生时,相关的回调函数(如readcb
和writecb
)会被调用,而这些回调函数是在单个线程内执行的。
- 当
-
BEV_OPT_THREADSAFE
是为缓存区加锁的:- 当你使用
BEV_OPT_THREADSAFE
选项时,libevent 会为bufferevent
的input
和output
缓冲区分配锁,以支持多线程安全的访问。这意味着你可以在多个线程中操作这些缓冲区,而不必担心线程竞争问题。 - 但是,这并不会改变
event_base
事件循环的单线程特性。因此,即使多个线程可以访问缓冲区,实际的事件处理和回调仍然是由单线程的event_base
进行调度的。
- 当你使用
-
支持多线程的设计:
- libevent 本身也提供了多线程支持,但多线程通常是通过多个 event_base 或 线程池 来实现的。这种设计允许每个线程拥有自己的事件循环实例,而每个
event_base
仍然是单线程运行的。多个线程并行处理事件的方式并不意味着单个event_base
会变成多线程。
- libevent 本身也提供了多线程支持,但多线程通常是通过多个 event_base 或 线程池 来实现的。这种设计允许每个线程拥有自己的事件循环实例,而每个
如何在 libevent 中实现多线程?
尽管 bufferevent
本身是基于单线程的,但 libevent 允许你通过一些额外的配置来支持多线程:
-
多
event_base
:- 你可以创建多个
event_base
实例,每个线程使用一个单独的event_base
实例。然后,多个线程可以并行处理不同的事件。这种方式通常用于多核处理器,可以提高事件处理的并发性。
event_base *base1 = event_base_new(); event_base *base2 = event_base_new(); // 每个线程创建一个 base 实例并调度事件
- 你可以创建多个
-
线程池:
- libevent 允许你将工作线程池与事件循环结合,多个线程可以共享一个事件循环,分发事件到不同的工作线程处理。例如,libevent 的
evthread
模块可以与线程池一起使用。 - 你可以使用
modu
库(在 libevent 的基础上扩展)来实现线程池,在这些线程池中处理事件,从而实现并发处理。
- libevent 允许你将工作线程池与事件循环结合,多个线程可以共享一个事件循环,分发事件到不同的工作线程处理。例如,libevent 的
总结:
bufferevent
本身是在 单线程 模式下运行的,即使你设置了线程安全选项,也只会对缓存区进行线程安全管理,而不会改变 libevent 的单线程事件处理模型。- libevent 的事件循环 (
event_base
) 默认是单线程的,但可以通过创建多个event_base
或使用线程池等方式实现多线程的事件处理。
因此,如果你希望利用多核处理器来处理多个事件并行,通常需要使用多个 event_base 实例或结合线程池来实现,而不是依赖单个 event_base来处理所有事件。
参考文件:
https://www.cnblogs.com/secondtonone1/p/5535722.html
http://blog.csdn.net/sparkliang/article/details/4957667
https://mp.weixin.qq.com/s/nFv4B_N_MSOA4eMEwDPR-A