一、bufferevent filter简单说明
filter之类的东西,相信有php框架或java springboot经验的程序员应该很熟悉,就是加载在输入流之前或输出流之后的一个处理器,用来从业务处分离开来做一些额外的事情。
(1)读取(接收)的数据先进入过滤器中,在过滤器处理完成后再传递给用户进行读取,即 过滤在读取之前 ,例如压缩数据要先在 filter 中解压,然后在 underlying 读取解压后的原始;
(2)写入(发送)的数据由用户发起,再传给过滤器,过滤器处理完后再发送出去,即 过滤在写入之后 ,例如在 underlying 中发送的原始数据,会在 filter 中进行压缩后再发送。
二、bufferevent API
1、bufferevent_filter_new
struct bufferevent *
bufferevent_filter_new(struct bufferevent *underlying,
bufferevent_filter_cb input_filter,
bufferevent_filter_cb output_filter,
int options,
void (*free_context)(void *),
void *ctx);
回调函数 bufferevent_filter_cb:
/**
* @brief: 回调函数,用于实现 bufferevent 的过滤器
*
* @param src: 源事件缓存
* @param dst: 目的事件缓存
* @param dst_limit: 写入 dst 的字节上限,-1表示无限制
* @param mode: 数据刷新模式
* @param ctx: 用户传递的参数
*
* @return: 返回过滤器的处理结果,详见后面说明
*/
typedef enum bufferevent_filter_result (*bufferevent_filter_cb)(
struct evbuffer *src, struct evbuffer *dst,
ev_ssize_t dst_limit, enum bufferevent_flush_mode mode, void *ctx);
// filter 回调函数返回值
enum bufferevent_filter_result {
// 正常
BEV_OK = 0,
// 还需要更多的数据才能输出
BEV_NEED_MORE = 1,
// filter 发生错误,无法进一步处理数据
BEV_ERROR = 2
};
// filter 数据刷新模式
enum bufferevent_flush_mode {
// 正常处理数据
BEV_NORMAL = 0,
// 检查发送的所有数据
BEV_FLUSH = 1,
// 读完或写完数据后,会有 EOF 标志
BEV_FINISHED = 2
};
2、evbuffer 结构体
这个结构体就是在 filter bufferevent 和 underlying bufferevent 之间传递数据用的。
(1)evbuffer 只是一个数据缓冲区,使用链表实现,不带任何 I/O 操作。
(2)bufferevent 是一个事件缓冲 I/O,内部实现了基本 socket recv/send 操作。
3、evbuffer_add 和 evbuffer_remove
/**
* @breif: 添加数据到 evbuffer 的结尾处
*
* @param buf: 待添加数据的 evbuffer 对象
* @param data: 数据指针
* @param datlen: 数据长度,单位 byte
*
* @return: 成功返回0,失败返回-1
*/
int evbuffer_add(struct evbuffer *buf, const void *data, size_t datlen);
/**
* @brief: 从 evbuffer 的开始处读取指定长度的数据
* 若 evbuffer 中的数据不足指定长度,则尽可能多的读取数据
*
* @param buf: 待读取数据的 evbuffer 对象
* @param data: 数据指针
* @param datlen: 数据长度,单位 byte
*
* @return: 成功返回读取的字节数,失败返回-1
*/
int evbuffer_remove(struct evbuffer *buf, void *data, size_t datlen);
4、例子
#include <iostream>
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <thread>
#include <errno.h>
#include <string.h>
#ifndef _WIN32
#include <signal.h>
#else
#endif
using namespace std;
#define PORT 5001
bufferevent_filter_result filter_in(evbuffer *s, evbuffer *d, ev_ssize_t limit,
bufferevent_flush_mode mode, void *arg){
cout<<"filter_in"<<endl;
char data[1024] = {0};
//读取并清理原数据
int len = evbuffer_remove(s, data, sizeof(data) - 1);
//把所有字母转成大写
for(int i = 0; i < len; i++){
data[i] = toupper(data[i]);
}
evbuffer_add(d, data, len);
return BEV_OK;
}
bufferevent_filter_result filter_out(evbuffer *s, evbuffer *d, ev_ssize_t limit,
bufferevent_flush_mode mode, void *arg){
cout<<"filter_out"<<endl;
char data[1024] = {0};
//读取并清理原数据
int len = evbuffer_remove(s, data, sizeof(data) - 1);
//把所有字母转成大写
string str = "";
str += "============\n";
str += data;
str += "============\n";
evbuffer_add(d, str.c_str(), str.size());
return BEV_OK;
}
void read_cb(bufferevent *bev, void *arg){
cout<<"read_cb"<<endl;
char data[1024] = {0};
int len = bufferevent_read(bev, data, sizeof(data) - 1);
cout<<data<<endl;
//回复客户消息,经过输出过滤
bufferevent_write(bev, data, len);
}
void write_cb(bufferevent *bev, void *arg){
cout<<"write_cb"<<endl;
}
void event_cb(bufferevent *bev, short events, void *arg){
cout<<"event_cb"<<endl;
}
void listen_cb(evconnlistener *ev, evutil_socket_t s, sockaddr* sin, int slen, void *arg){
cout<<"listen_cb "<<endl;
//创建bufferevent 绑定bufferevent filter
event_base *base = (event_base *)arg;
bufferevent *bev = bufferevent_socket_new(base, s, BEV_OPT_CLOSE_ON_FREE);
//绑定bufferevent filter
bufferevent *bev_filter = bufferevent_filter_new(bev,
filter_in, //输入过滤函数
filter_out, //输出过滤函数
BEV_OPT_CLOSE_ON_FREE, //关闭filter时同时关闭bufferevent
0, //清理的回调函数
0 //传递给回调的参数
);
//设置bufferevent的回调
bufferevent_setcb(bev_filter, read_cb, write_cb, event_cb, NULL);
bufferevent_enable(bev_filter, EV_READ|EV_WRITE);
}
int main(int argc, char *argv[])
{
#ifdef _WIN32
WSADATA wsa;
int a = WSAStartup(MAKEWORD(2, 2), &wsa);
#else
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
return 1;
#endif
event_base *base = event_base_new();
//创建网络服务器
//设置监听的端口和地址
sockaddr_in sin;
memset(&sin,0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(PORT);
evconnlistener *ev = evconnlistener_new_bind(base,
listen_cb, //回调函数
base, //回调函数的参数arg
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,
10, //listen back
(sockaddr*)&sin,
sizeof(sin)
);
//进入事件主循环
event_base_dispatch(base);
//释放资源
evconnlistener_free(ev);
event_base_free(base);
return 0;
}
参考:
(1)libevent(十二)bufferevent filter zlib 压缩通信(二)
(2)libevent高并发网络编程 - 03_bufferevent filter过滤器
(3)Libevent 学习十一:bufferevent filter 过滤器和API