文章目录
- 前言
- 实现原理
- 处理框架
- 编程接口
- 原理验证
- 事件订阅
- 服务监听
- 验证流程
前言
- Event Loop顾名思义就是事件循环,整个程序是一个大的循环,通过事件来驱动程序要做的事情。传统编程模型是顺序的,程序运行一次然后终止,这种模型简单,常用于实现一个特定功能。但这种模型不适合交互式的程序,比如图形用户程序,用户大多数时候可能不会有输入,一旦有了输入,程序需要根据其类型执行特定的代码以实现特定功能。
- 事件触发的程序有一个普遍特点,就是会长时间运行并等待用于输入,守护进程或者服务都可以判定为这类程序。Libvirt虽然没有图形接口也不等待用户的直接输入,但它作为服务会长时间监听各种socket的连接,因此也使用这种编程模型。
实现原理
处理框架
- 上图是GLib提供的事件循环状态机,可以看到事件循环最核心的动作就是poll fd(Polling)然后执行回调函数(Dispatching),poll通常使用内核提供的poll接口实现,除此以外,GLib还定义了两个阶段,初始化阶段(Initial)和准备阶段(Prepared),对于各阶段GLib实现了对应hooks方便用户实现自己的逻辑。
- 对更高级的编程用户来说,可以只关心事件循环监听的什么事件、事件触发后执行的什么,Libvirt提供的事件循环接口较GLib更接近用户使用场景,分别是添加回调、更新回调和删除回调。
编程接口
-
Libvirt的开发包(libvirt-devel)以及共享库(libvirt.so)为用户提供编程API,其中libvirt-event.h头文件提供了事件循环的编程接口,因此不仅Libvirt服务本身可以使用事件循环接口,用户程序也可以使用该接口实现自己的功能。头文件如下:
/* * libvirt-event.h * Summary: APIs for management of events * Description: Provides APIs for the management of events * * Copyright (C) 2006-2014 Red Hat, Inc. * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see * <http://www.gnu.org/licenses/>. */ #ifndef LIBVIRT_EVENT_H # define LIBVIRT_EVENT_H # ifndef __VIR_LIBVIRT_H_INCLUDES__ # error "Don't include this file directly, only use libvirt/libvirt.h" # endif /** * virEventHandleType: * * a virEventHandleType is used similar to POLLxxx FD events, but is specific * to libvirt. A client app must translate to, and from POLL events when using * this construct. * * 事件循环所监听fd的所有状态 */ typedef enum { VIR_EVENT_HANDLE_READABLE = (1 << 0), VIR_EVENT_HANDLE_WRITABLE = (1 << 1), VIR_EVENT_HANDLE_ERROR = (1 << 2), VIR_EVENT_HANDLE_HANGUP = (1 << 3), } virEventHandleType; /** * virEventHandleCallback: * * @watch: watch on which the event occurred * @fd: file handle on which the event occurred * @events: bitset of events from virEventHandleType constants * @opaque: user data registered with handle * * Callback for receiving file handle events. The callback will * be invoked once for each event which is pending. * * 当事件循环监听的fd中有用户感兴趣的状态后所调用的用户定义的回调 * 比如用户是一个监听tcp socket的服务程序,对所监听socket fd * VIR_EVENT_HANDLE_READABLE事件感兴趣,则需要实现这样一个类 * 型的回调函数 */ typedef void (*virEventHandleCallback)(int watch, int fd, int events, void *opaque); /** * Libvirt默认使用GLib提供的事件循环接口实现事件循环,但用户也可以定制事件循环中的 * 每个动作(包括增、删、更新事件回调和增、删、更新timer超时回调六种)。用户如果实现 * 此六种事件循环框架要求的方法,即可使用自己定制的事件循环,但方法的签名必须按照如 * 下定义来实现 */ /** * virEventAddHandleFunc: * @fd: file descriptor to listen on * @event: bitset of events on which to fire the callback * @cb: the callback to be called when an event occurs * @opaque: user data to pass to the callback * @ff: the callback invoked to free opaque data blob * * Part of the EventImpl, this callback adds a file handle callback to * listen for specific events. The same file handle can be registered * multiple times provided the requested event sets are non-overlapping * * @fd will always be a C runtime file descriptor. On Windows * the _get_osfhandle() method can be used if a HANDLE is required * instead. * * If the opaque user data requires free'ing when the handle * is unregistered, then a 2nd callback can be supplied for * this purpose. This callback needs to be invoked from a clean stack. * If 'ff' callbacks are invoked directly from the virEventRemoveHandleFunc * they will likely deadlock in libvirt. * * Returns -1 if the file handle cannot be registered, otherwise a handle * watch number to be used for updating and unregistering for events * * 事件循环添加回调函数的方法签名 */ typedef int (*virEventAddHandleFunc)(int fd, int event, virEventHandleCallback cb, void *opaque, virFreeCallback ff); /** * virEventUpdateHandleFunc: * @watch: file descriptor watch to modify * @event: new events to listen on * * Part of the EventImpl, this user-provided callback is notified when * events to listen on change * * 事件循环更新回调函数的方法签名 */ typedef void (*virEventUpdateHandleFunc)(int watch, int event); /** * virEventRemoveHandleFunc: * @watch: file descriptor watch to stop listening on * * Part of the EventImpl, this user-provided callback is notified when * an fd is no longer being listened on. * * If a virEventHandleFreeFunc was supplied when the handle was * registered, it will be invoked some time during, or after this * function call, when it is safe to release the user data. * * Returns -1 if the file handle was not registered, 0 upon success * * 事件循环删除回调函数的方法签名 */ typedef int (*virEventRemoveHandleFunc)(int watch); /** * virEventTimeoutCallback: * * @timer: timer id emitting the event * @opaque: user data registered with handle * * callback for receiving timer events */ typedef void (*virEventTimeoutCallback)(int timer, void *opaque); /** * virEventAddTimeoutFunc: * @timeout: The timeout to monitor * @cb: the callback to call when timeout has expired * @opaque: user data to pass to the callback * @ff: the callback invoked to free opaque data blob * * Part of the EventImpl, this user-defined callback handles adding an * event timeout. * * If the opaque user data requires free'ing when the handle * is unregistered, then a 2nd callback can be supplied for * this purpose. * * Returns a timer value */ typedef int (*virEventAddTimeoutFunc)(int timeout, virEventTimeoutCallback cb, void *opaque, virFreeCallback ff); /** * virEventUpdateTimeoutFunc: * @timer: the timer to modify * @timeout: the new timeout value * * Part of the EventImpl, this user-defined callback updates an * event timeout. */ typedef void (*virEventUpdateTimeoutFunc)(int timer, int timeout); /** * virEventRemoveTimeoutFunc: * @timer: the timer to remove * * Part of the EventImpl, this user-defined callback removes a timer * * If a virEventTimeoutFreeFunc was supplied when the handle was * registered, it will be invoked some time during, or after this * function call, when it is safe to release the user data. * * Returns 0 on success, -1 on failure */ typedef int (*virEventRemoveTimeoutFunc)(int timer); /** * 如上介绍,如果用户定制了事件循环的实现方法,通过下面函数注册事件循环 * 的方案,之后Libvirt在启动事件循环时默认使用 */ void virEventRegisterImpl(virEventAddHandleFunc addHandle, virEventUpdateHandleFunc updateHandle, virEventRemoveHandleFunc removeHandle, virEventAddTimeoutFunc addTimeout, virEventUpdateTimeoutFunc updateTimeout, virEventRemoveTimeoutFunc removeTimeout); /** * 通常情况下,为快速集成Libvirt的事件循环到用户程序,会选择用Libvirt默认方法 * 来实现事件循环,因此Libvirt提供下面这个接口让用户将默认方法注册,早期libvirt * 使用poll实现,默认方法分别是: * virEventPollAddHandle * virEventPollUpdateHandle * virEventPollRemoveHandle * virEventPollAddTimeout * virEventPollUpdateTimeout * virEventPollRemoveTimeout * 新版本libvirt使用Glib提供的事件循环,默认方法分别是: * virEventGLibHandleAdd * virEventGLibHandleUpdate * virEventGLibHandleRemove * virEventGLibTimeoutAdd * virEventGLibTimeoutUpdate * virEventGLibTimeoutRemove */ int virEventRegisterDefaultImpl(void); /** * 使用Libvirt默认事件框架,需要调用如下接口启动事件循环 */ int virEventRunDefaultImpl(void); /** * 向事件循环添加回调函数,该函数返回一个名为watch的整数,作为对更新删除 * 回调函数的句柄,它实际上是一个索引,libvirt使用这个索引在所有注册 * 的回调函数形成的数组中找到对应的回调函数 */ int virEventAddHandle(int fd, int events, virEventHandleCallback cb, void *opaque, virFreeCallback ff); /* 更新已添加的回调函数感兴趣的事件 */ void virEventUpdateHandle(int watch, int events); /* 删除已添加的回调函数 */ int virEventRemoveHandle(int watch); /* 添加一个超时回调函数 * 当frequency参数的值为-1时表示仅仅注册,不会触发回调 * 当frequency参数的值为0时表示每次event loop触发回调 * 当frequency参数的值大于0时按照该频率触发回调 */ int virEventAddTimeout(int frequency, virEventTimeoutCallback cb, void *opaque, virFreeCallback ff); void virEventUpdateTimeout(int timer, int frequency); int virEventRemoveTimeout(int timer); #endif /* LIBVIRT_EVENT_H */
原理验证
- 我们通过集成Libvirt event loop实现一个demo,它的主要功能是订阅主机上虚机的生命周期事件并打印,同时还作为一个rpc server,监听一个本地端口响应客户段的rpc request。demo框架如下所示:
- 对于订阅事件,我们可以利用Libvirt现成的API实现,对于rpc server,这是我们新增功能,要求将监听端口的socket fd加入到event loop,并注册rpc server的连接回调以及请求回调。demo代码参考event monitor。
事件订阅
- 当用户程序对libvirt管理的虚机的某个事件感兴趣,可以订阅该事件,并注册该事件发生后的回调。订阅和取消虚机事件的API在libvirt-domain.h头文件中定义,如下:
/* Use VIR_DOMAIN_EVENT_CALLBACK() to cast the 'cb' parameter */
int virConnectDomainEventRegisterAny(virConnectPtr conn,
/* 感兴趣的虚机,可选 */
virDomainPtr dom, /* Optional, to filter */
/* 感兴趣的事件 */
int eventID,
/* 事件发生后触发的回调 */
virConnectDomainEventGenericCallback cb,
void *opaque,
virFreeCallback freecb);
int virConnectDomainEventDeregisterAny(virConnectPtr conn,
int callbackID);
- 事件订阅核心逻辑如下:
/* 实现生命周期事件回调逻辑,简单将事件打印出来 */
static int
gemDomainEventLifeCycleCallback(virConnectPtr conn,
virDomainPtr dom,
int event,
int detail,
void *opaque)
{
fprintf(stderr, "%s EVENT: Domain %s(%d) %s %s\n", __func__, virDomainGetName(dom),
virDomainGetID(dom), gemEventToString(event),
gemEventDetailToString(event, detail));
return 0;
}
struct domainEventData domainEvents[] = {
DOMAIN_EVENT(VIR_DOMAIN_EVENT_ID_LIFECYCLE, gemDomainEventLifeCycleCallback),
};
int
main(int argc, char **argv)
{
virConnectPtr dconn = NULL;
/* 注册Libvirt默认方法实现demo程序的事件循环 */
virEventRegisterDefaultImpl();
/* 连接libivrtd */
dconn = virConnectOpenAuth(argc > 1 ? argv[1] : NULL,
virConnectAuthPtrDefault,
0);
/* 订阅事件 */
/* register common domain callbacks */
for (i = 0; i < G_N_ELEMENTS(domainEvents); i++) {
struct domainEventData *event = domainEvents + i;
event->id = virConnectDomainEventRegisterAny(dconn, NULL,
event->event,
event->cb,
strdup(event->name),
gemFreeFunc);
}
/* 设置超时 */
virConnectSetKeepAlive(dconn, 5, 3);
/* 启动事件循环 */
while (run) {
if (virEventRunDefaultImpl() < 0) {
fprintf(stderr, "Failed to run event loop: %s\n",
virGetLastErrorMessage());
}
}
- 事件订阅的核心实现是向event loop注册一个timer的回调函数
virObjectEventTimer
,在virConnectDomainEventRegisterAny
函数逻辑中实现,但初始化该回调函数时frequency的输入为-1,表示不触发回调,直到有事件发生时才更新frequency为0。 - demo中注册了两个timer回调:
- 注册了一个server侧有IO事件的回调函数
virNetSocketEventHandle
,该回调函数会级连触发virNetClientIncomingEvent
,该函数会处理server发来的消息,识别是否有event的通知消息(VIR_NET_MESSAGE
)到达,如果有会则更新virObjectEventTimer
的频率为0,下一次事件循环时则会触发订阅事件的用户注册的回调。
- 初始化timer回调如下:
- 事件发生时更新timer频率如下:
- 我们可以看到,订阅libvirt事件demo和libvirt服务本身并不相关,当Libvirt服务事件发生时,是通过发送通知到客户端,客户端在自己的event loop中触发事件回调。
服务监听
- 事件订阅利用libvirt提供的API实现了事件循环,并在感兴趣的事件上注册了回调。服务监听更进一步,利用libvirt提供的接口实现对网络端口的监听并提供相应的RPC服务,实现用户定制的功能。RPC服务的注册逻辑如下:
static void
gemRpcServerRegister(jrpc_server_ptr server, int port)
{
/* 初始化rpc server,创建一个tcp socket监听端口 */
jrpc_server_init(server, port);
/* 注册helloworld方法 */
jrpc_register_procedure(server, helloWorld,
"helloworld", NULL);
}
int jrpc_server_init(jrpc_server_ptr server, int port_number) {
int sockfd;
struct addrinfo hints, *servinfo, *p;
struct sockaddr_in sockaddr;
unsigned int len;
int yes = 1;
int rv;
char PORT[6];
memset(server, 0, sizeof(jrpc_server));
server->port_number = port_number;
sprintf(PORT, "%d", server->port_number);
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE; // use my IP
if ((rv = getaddrinfo(NULL, PORT, &hints, &servinfo)) != 0) {
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
return 1;
}
// loop through all the results and bind to the first we can
for (p = servinfo; p != NULL; p = p->ai_next) {
/* 创建 tcp socket */
if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol))
== -1) {
perror("server: socket");
continue;
}
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int))
== -1) {
perror("setsockopt");
exit(1);
}
/* 绑定对应端口 */
if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
close(sockfd);
perror("server: bind");
continue;
}
len = sizeof(sockaddr);
if (getsockname(sockfd, (struct sockaddr *) &sockaddr, &len) == -1) {
close(sockfd);
perror("server: getsockname");
continue;
}
server->port_number = ntohs( sockaddr.sin_port );
break;
}
freeaddrinfo(servinfo); // all done with this structure
/* 监听对应端口 */
if (listen(sockfd, 5) == -1) {
perror("listen");
exit(1);
}
if (server->debug_level)
printf("server: waiting for connections...\n");
server->fd = sockfd;
/* 将tcp socket fd和对应的回调实现注册到事件循环中,一旦fd上有读事件到达,event loop触发accept_cb回调 */
if ((server->watch = virEventAddHandle(server->fd,
VIR_EVENT_HANDLE_READABLE,
accept_cb,
server,
NULL)) < 0) {
fprintf(stderr, "failed to register accept connection callback\n");
return 3;
}
return 0;
}
/* RPC 客户端连接到达的回调函数 */
static void accept_cb(int watch, int fd, int events, void *opaque) {
jrpc_server_ptr rpc_server = opaque;
char s[INET6_ADDRSTRLEN];
jrpc_connection_ptr connection;
connection = malloc(sizeof(jrpc_connection));
struct sockaddr_storage their_addr; // connector's address information
socklen_t sin_size;
sin_size = sizeof their_addr;
/* 接受连接,得到RPC客户端的fd */
connection->fd = accept(fd, (struct sockaddr *) &their_addr,
&sin_size);
if (connection->fd == -1) {
perror("accept");
free(connection);
} else {
if (rpc_server->debug_level) {
inet_ntop(their_addr.ss_family,
get_in_addr((struct sockaddr *) &their_addr), s, sizeof s);
printf("server: got connection from %s\n", s);
}
//copy pointer to struct jrpc_server
connection->buffer_size = 1500;
connection->buffer = malloc(1500);
memset(connection->buffer, 0, 1500);
connection->pos = 0;
//copy debug_level, struct jrpc_connection has no pointer to struct jrpc_server
connection->debug_level = rpc_server->debug_level;
connection->server = rpc_server;
/* 注册RPC客户端消息到达的回调,该回调处理客户端发来的RPC请求 */
if ((connection->watch = virEventAddHandle(connection->fd,
VIR_EVENT_HANDLE_READABLE,
connection_cb,
connection,
NULL)) < 0) {
perror("failed to register rpc request callback");
}
}
}
验证流程
- 编译gvm-event-monitor服务
./gvm-conf.sh -a
- 拷贝二进制程序gvm-event-monitor和服务并启动
cp build/gvm-event-monitor /usr/bin/
cp gvm-event-monitor.service /usr/lib/systemd/system/
systemctl start gvm-event-monitor.service
- 验证生命周期事件订阅
- 在服务所在节点启动一个虚机,服务输出如下:
- 发起helloworld RPC调用