目录
ACE库编译
ACE Reactor框架
ACE_Time_Value类
ACE_Event_Handler类
ACE定时器队列类
ACE_Reator类
ACE Reactor实现
ACE_Select_Reactor类
ACE_TP_Reactor类
ACE_WFMO_Reactor类
ACE库编译
首先去ACE官网下载安装包,通过vs2017或者2019进行编译,安装包下载地址:
Obtaining ACE, TAO, CIAO and DAnCE (vanderbilt.edu)
根据需求下载,下载ACE.zip HTTP即可。
下载后在本地进行解压,ACE_vs2019_sln使用vs2019进行打开,打开后根据需求进行生成,生成后会在lib目录中生成需要的动态库文件。
头文件通常在ace文件夹中
配置环境变量后,配置好头文件目录和库目录后,将ACEd.dll相关文件拷贝到指定的bin目录下即可开始使用 。
ACE Reactor框架
ACE_Time_Value类
主要用于标准化时间,同时重载了operator运算符,可直接对对象进行操作
常见类方法:
#pragma once
#include "ace/config-all.h"
#include "ace/ACE.h"
#include "ace/Time_Value.h"
#include "ace/Numeric_Limits.h"
#include "ace/OS.h"
using namespace std;
//ACETimeVau
const ACE_Time_Value max_interval(60 * 60);//1 hour
int ACETimeValue()
{
//获取当前天日期
ACE_Time_Value ex = ACE_OS::gettimeofday();
ACE_Time_Value interval;
//初始化方法
ACE_Time_Value tv1;
ACE_Time_Value tv2(2);
ACE_Time_Value tv3(100U);
ACE_Time_Value tv4(1, 1000000);
//获取时间单位
time_t sec = tv4.sec(); //秒
time_t usec = tv4.usec(); //毫秒
time_t msec = tv4.msec(); //微秒
//赋值 判断方法
ACE_Time_Value first;
ACE_Time_Value last(ACE_Time_Value::max_time);
first = last;
ACE_TEST_ASSERT(first == last);
return 0;
}
ACE_Event_Handler类
所有反应堆的基类,功能如下:
1、定义各类事件,如输入输出、定时器、异常等事件。
2、运行使用需要方法去扩展事件处理器子类,而不必改变框架。
当事件发生时,会触发相应的挂钩事件,方法主要如下:
当事件向反应器登记事件处理器时,必须指定事件处理的一个或多个事件类型,ACE通过ACE_Event_Handler 定义了以下枚举类型
这组事件可以填充传给ACE_Reator::register_hander()方法中的ACE_Reator_Mask参数。(后续介绍)
hander_*()返回值
如果为0 则代表是正常的,会继续检测事件
如果大于0 也是正常的,数量有限的时候会放弃控制事件
如果为-1 代表不正常,会停止检测事件
事件管理器的常用方法:
当返回值为-1时,会调用了ACE_Reacotor::remove_handler()事件时,还可能将以下枚举量返回给handler_close()函数。
常用方法首先需要继承 ACE_Event_Handler类,内部具备ACE_Event_Mask对象用于设置操作类型,其中READ_MASK对应handle_input()方法,WRITE_MASK对应handle_close()方法。
每个方法都会对应hand_close()函数,当返回-1时就会进行处理
通常服务器实现需要基于ACE_Event_Handler类,重写其事件,下面对日志服务器进行举例说明
他们之间的类关系图如下所示
logging_Acceptor定义主要如下
logging_Acceptor继承自ACE_Event_Handler,并定义了ACE_SOCK_Acceptor工厂的一个实例
class Logging_Acceptor : public ACE_Event_Handler
{
protected:
// Factory that connects <ACE_SOCK_Stream>s passively.
ACE_SOCK_Acceptor acceptor_;
protected:
virtual ~Logging_Acceptor () {}; // No-op destructor.
}
logging_Acceptor主要实现了open,hand_open()、hand_close()、get_handle()等方法
logging_Acceptor::open()方法初始化被动模式的接收器socket,以在local_addr处侦听链接。随后想反应器登记自身,以处理accept事件。
int Logging_Acceptor::open (const ACE_INET_Addr &local_addr) {
if (acceptor_.open (local_addr) == -1) return -1;
return reactor ()->register_handler
(this, ACE_Event_Handler::ACCEPT_MASK);
}
因为 ACE_SOCK_Acceptor 中的被动模式的socket 会在可以接受新连接时变成活动状态,反应器将自动分派Logging_Acceptor::handle_input()方法。我们将在首先定义了下面的Logging_Event_Handler 类之后,后续继续讲述 Logging_Acccpior::handie_input()方法的实现:
class Logging_Event_Handler : public ACE_Event_Handler
{
protected:
// File where log records are written.
ACE_FILE_IO log_file_;
// Connection to remote peer.
Logging_Handler logging_handler_;
}
logging_Event_Handler类也同样继承ACE_Event_Handler,主要实现了open,hand_open()、hand_close()、get_handle()等方法,负责对日志进行具体的处理操作,将日志持久化到文件中,logging_Event_Handler还含有一个ACE_FILE_IO对象,用于为每个客户保持一个独立的日志文件。
现在我们已经慨述了Logging_Event_Handier,我们将要实现 Logging_Acceptor:handle_input():每当可以接受新连接时,反应器就会对儿进行分派。这个工厂方法创建,连接并激活一个Logging_Event_Handler,如下所示:
int Logging_Acceptor::handle_input (ACE_HANDLE) {
//用于处理相应连接
Logging_Event_Handler *peer_handler = 0;
ACE_NEW_RETURN (peer_handler,
Logging_Event_Handler (reactor ()),
-1);
//监听数据
if (acceptor_.accept (peer_handler->peer ()) == -1) {
delete peer_handler;
return -1;
} else if (peer_handler->open () == -1) {
peer_handler->handle_close ();
return -1;
}
return 0;
}
上述代码中,创建了一个logging_Event_Handler用于处理相应的连接,accept负责监听socket句柄,如果返回了-1,则会自动调用hand_close()函数进行回收处理。
int Logging_Acceptor::handle_close (ACE_HANDLE,
ACE_Reactor_Mask) {
acceptor_.close ();
delete this;
return 0;
}
accept之后,执行open函数,该函数主要负责打开并存储日志文件,连接的客户的主机名作为日志文件名字,存在则打开,不存在就创建文件。
int Logging_Event_Handler::open () {
//根据连接的主机名设置日志名字
static const char LOGFILE_SUFFIX[] = ".log";
char filename[MAXHOSTNAMELEN + sizeof (LOGFILE_SUFFIX)];
ACE_INET_Addr logging_peer_addr;
logging_handler_.peer ().get_remote_addr (logging_peer_addr);
logging_peer_addr.get_host_name (filename, MAXHOSTNAMELEN);
ACE_OS::strcat (filename, LOGFILE_SUFFIX);
//创建或打开客户端日志文件
ACE_FILE_Connector connector;
connector.connect (log_file_,
ACE_FILE_Addr (filename),
0, // No timeout.
ACE_Addr::sap_any, // Ignored.
0, // Don't try to reuse the addr.
O_RDWR|O_CREAT|O_APPEND,
ACE_DEFAULT_FILE_PERMS);
//使用ACE_Reactor:register_handler()方法来为READ事件
//向Logging_Acceptor行的反应器登记 this 事件处理器。
return reactor ()->register_handler
(this, ACE_Event_Handler::READ_MASK);
}
最后登记READ事件,当有日志记录返回时,反应堆将自动派发logging_Event_Handler::hand_input()函数处理该事件
int Logging_Event_Handler::handle_input (ACE_HANDLE)
{ return logging_handler_.log_record (); }
当recv()或read()返回0,-1时,代表这处理器随即确定关闭,关闭的情况可能有多种,网线拔了,主机奔溃等等,处理决策主要有:
1、等待,直到收到关闭连接信号才关闭,其他事件保持等待,但可能会导致等待事件过长。
2、发送心跳包数据。周期性发送心跳包数据,如果对端没有在应用时段内发送心跳消息,则单方面终止连接,则应用可以在后面再尝试打开连接。如果对端在某段时间内没有发送任何数据,连接 就被放弃,实现通常依赖于 ACE定时器队列类。
ACE定时器队列类
许多网络应用周期性地进行某种活动,或必须过了规定时间段之后收到通知。通常会遇到定时器数目有限、定时器到期会引发信号,导致不能实现跨平台等问题,避开上述问题,则可以通过对定时器进行管理,如下所示:
ACE定时器队列类同样继承于ACE_Event_Handle的时间驱动事件处理器,功能和方法如下:
#include "ace/Profile_Timer.h"
#include "ace/Timer_Queue.h"
#include "ace/Timer_List.h"
#include "ace/Timer_Heap.h"
#include "ace/Timer_Wheel.h"
#include "ace/Timer_Hash.h"
#include "ace/Timer_Queue.h"
#include "ace/Time_Policy.h"
#include "ace/Recursive_Thread_Mutex.h"
#include "ace/Null_Mutex.h"
#include "ace/OS_NS_unistd.h"
#include "ace/Containers_T.h"
#include "ace/Event_Handler.h"
//这个类继承了ACE_Event_Handler 重写了handle_close handle_timeout方法
class Example_Handler : public ACE_Event_Handler
{
public:
Example_Handler() : close_count_(0) {}
int handle_close(ACE_HANDLE, ACE_Reactor_Mask mask) override
{
ACE_TEST_ASSERT(mask == ACE_Event_Handler::TIMER_MASK);
this->close_count_++;
return 0;
}
int handle_timeout(const ACE_Time_Value&,
const void* arg) override
{
int* act = (int*)arg;
ACE_TEST_ASSERT(*act == 42 || *act == 007);
int result = 0;
if (*act == 007)
result = -1; // This is the special value to trigger a handle_close
delete act;
return result;
}
/// Keeps track of the number of times that <handle_close> is called.
int close_count_;
};
int ACETIMEQueue()
{
ACE_Timer_Queue* tq;
//ACE_Time_Value的schedule方法
//设置一个定时器,以50毫秒为间隔启动。
Example_Handler ih;
ACE_Time_Value interval(0, 50 * 1000 /* number of usec in millisecond */);
const unsigned NUM_INTERVAL_FIRINGS = 50;
ACE_Time_Value loop_stop_time =
tq->gettimeofday() + (NUM_INTERVAL_FIRINGS * interval);
const unsigned EXPECTED_TRIP_COUNT =
NUM_INTERVAL_FIRINGS + 1 /* for the first immediate firing */;
//通过schedule指定定时器 到了时间点就轮询执行 handle_timeout函数
long id = tq->schedule(&ih, 0 /* no act */, ACE_Time_Value::zero, interval);
ACE_TEST_ASSERT(id != -1);
//分配到小于或等于loop_stop_time的事件中
do
{
tq->expire();
} while (tq->gettimeofday() < loop_stop_time);
//在指定事件执行handle_timeout函数
Example_Handler eh;
ACE_Time_Value earliest_time = tq->gettimeofday();
const void* timer_act = 0;
ACE_NEW(timer_act, int(1));
timer_id = tq->schedule(&eh, timer_act, earliest_time);
ACE_OS::sleep(ACE_Time_Value(0, 10));
ACE_NEW(timer_act, int(1));
timer_id2 = tq->schedule(&eh, timer_act, tq->gettimeofday());
long result = tq->earliest_time() == earliest_time;
ACE_TEST_ASSERT(result != 0);
//取消事件 删除对象
tq->cancel(timer_id, &timer_act);
delete (int*)timer_act;
tq->cancel(timer_id2, &timer_act);
delete (int*)timer_act;
}
上述文章中的logging_Accept和logging_event_hander虽然实现了日志服务器,但是长时间连接不发送数据的话,可以会导致资源的浪费,因此使用定时器进行管理,超出规定时间内执行以下事件:
在logging_Accept和logging_event_hander的基础上,添加定时器队列,使用logging_Accept_Ex和logging_event_hander_Ex进行实现,其中,logging_event_hander_Ex的每个实例都登记了一个定时器。
logging_Accept_Ex改动较小,只是重新定义并修改了handle_input方法,
class Logging_Acceptor_Ex : public Logging_Acceptor
{
public:
typedef ACE_INET_Addr PEER_ADDR;
// Simple constructor to pass ACE_Reactor to base class.
Logging_Acceptor_Ex (ACE_Reactor *r = ACE_Reactor::instance ())
: Logging_Acceptor (r) {}
int handle_input (ACE_HANDLE) {
Logging_Event_Handler_Ex *peer_handler = 0;
ACE_NEW_RETURN (peer_handler,
Logging_Event_Handler_Ex (reactor ()),
-1);
if (acceptor_.accept (peer_handler->peer ()) == -1) {
delete peer_handler;
return -1;
} else if (peer_handler->open () == -1) {
peer_handler->handle_close ();
return -1;
}
return 0;
}
};
logging_event_hander_Ex类中则更为复杂,定义了ACE_Time_Value时间类
class Logging_Event_Handler_Ex : public Logging_Event_Handler
{
private:
// Time when a client last sent a log record.
ACE_Time_Value time_of_last_log_record_;
// Maximum time to wait for a client log record.
const ACE_Time_Value max_client_timeout_;
public:
typedef Logging_Event_Handler PARENT;
// 3600 seconds == one hour.
enum { MAX_CLIENT_TIMEOUT = 3600 };
Logging_Event_Handler_Ex
(ACE_Reactor *reactor,
const ACE_Time_Value &max_client_timeout
= ACE_Time_Value (MAX_CLIENT_TIMEOUT))
: Logging_Event_Handler (reactor),
time_of_last_log_record_ (0),
max_client_timeout_ (max_client_timeout) {}
virtual ~Logging_Event_Handler_Ex () {}
virtual int open (); // Activate the event handler.
// Called by a reactor when logging events arrive.
virtual int handle_input (ACE_HANDLE);
// Called when a timeout expires to check if the client has
// been idle for an excessive amount of time.
virtual int handle_timeout (const ACE_Time_Value &tv,
const void *act);
// Called when this object is destroyed, e.g., when it's
// removed from a reactor.
virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
ACE_Reactor_Mask = 0);
};
logging_event_hander_Ex中的handle_input函数记录相连客户端接收到日志的时间,从定时器队列中获取到时间,之后转到父类的handle_input方法进行处理。
int Logging_Event_Handler_Ex::handle_input (ACE_HANDLE h) {
time_of_last_log_record_ =
reactor ()->timer_queue ()->gettimeofday ();
return PARENT::handle_input (h);
}
open方法中也需要进行修改,用于定义最大的超时时间,4-8行则调用了ACE_Reator类的schedule_timer函数,用于检查是否有客户最近发送日志给它,初始化定时器,使其在max_client_timeout时间过期,也就是一小时,max_client_timeout/4检查一下,也就是15分钟
int Logging_Event_Handler_Ex::open () {
int result = PARENT::open ();
if (result != -1) {
ACE_Time_Value reschedule (max_client_timeout_.sec () / 4);
result =
reactor ()->schedule_timer
(this,
0,
max_client_timeout_, // Initial timeout.
reschedule); // Subsequent timeouts.
}
return result;
}
定时器到时时会执行handle_timeout()函数,函数如下:
int Logging_Event_Handler_Ex::handle_timeout
(const ACE_Time_Value &now, const void *) {
if (now - time_of_last_log_record_ >= max_client_timeout_)
reactor ()->remove_handler (this, ACE_Event_Handler::READ_MASK);
return 0;
}
handle_timeout()该函数主要用于调用remove_handler,触发反应器,执行hand_close()方法移除事件的过滤器
int Logging_Event_Handler_Ex::handle_close (ACE_HANDLE,
ACE_Reactor_Mask) {
reactor ()->cancel_timer (this);
return PARENT::handle_close ();
}
该方法取消此处理器的过滤器,并调用父类的hand_close()方法,关闭日志文件及连接到客户的socket,并随即删除自身。
ACE_Reator类
ACE_Reator类实现门面模式,用于统一接口,定义应用于访问各种ACE_Reator框架特性接口,功能如下:
其方法包括了多方面,主要方法如下:
1、初始化和析构方法 用于初始化和销毁
2、事件处理器管理方法 用于登记或从中移除事件管理器
其中register_handler()的常用方法如下:
3、事件循环管理方法 控制何时分配应用事件处理器,在登记的初始事件之后,应用下表方法来管理事件循环
4、定时器管理方法 缺省情况下 调用ACE_Timer_Heap定时器队列来根据超时最后期限调度和分配事件管理器
使用这些方法,可以不需要与ACE定义器队列直接交互,而是复用其功能。
5、通知方法 反应器拥有一种通知机制,应用可将其用于把事件和事件管理器插入反应器的分派引擎中,方法如下:
6、实用方法 使得只有单个线程运行某种方法或采用单例模式,更好进行管理,防止死锁等问题。如ACE_Select_Reactor的hand_event()方法。
下面将介绍一个新的日志服务器Reactive_Logging_Server_Ex。Reactive_Logging_Server_Ex主要对两种数据结构进行映射:
使用了ACE::Select()进行事件检测、多路分离和分派。但也存在以下缺点
而logging_Event_Handler则更具有优点:
接下里介绍Reactor_Logging_Server.
#include "ace/ACE.h"
#include "ace/Reactor.h"
template <class ACCEPTOR>
class Reactor_Logging_Server : public ACCEPTOR
{
public:
Reactor_Logging_Server (int argc, char *argv[],
ACE_Reactor *reactor);
};
template <class ACCEPTOR>
Reactor_Logging_Server<ACCEPTOR>::Reactor_Logging_Server
(int argc, char *argv[], ACE_Reactor *reactor)
: ACCEPTOR (reactor) {
u_short logger_port = argc > 0 ? ACE_OS::atoi (argv[0]) : 0;//设置用于监听客户端连接的端口号
typename ACCEPTOR::PEER_ADDR server_addr;
int result;
//设置本地服务器地址
if (logger_port != 0)
result = server_addr.set (logger_port,
(ACE_UINT32) INADDR_ANY);
else
result = server_addr.set ("ace_logger",
(ACE_UINT32) INADDR_ANY);
//将地址传给ACCEPTOR::open,并登记this对象
if (result != -1)
result = ACCEPTOR::open (server_addr);
if (result == -1) reactor->end_reactor_event_loop ();
}
以日志服务器的main函数作为结束
//通过Logging_Acceptor_Ex来实例化 Reactor_Logging_Server模板
typedef Reactor_Logging_Server<Logging_Acceptor_Ex>
Server_Logging_Daemon;
int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
ACE_Reactor reactor;
Server_Logging_Daemon *server; //动态分配Server_Logging_Daemon
// Ignore argv[0]...
--argc; ++argv;
ACE_NEW_RETURN (server,
Server_Logging_Daemon (argc, argv, &reactor),
1);
//使用ACE_Reactor的局部实例来驱动所有所有后续的连接和数据事件处理
if (reactor.run_reactor_event_loop () == -1)
ACE_ERROR_RETURN ((LM_ERROR, "%p\n",
"run_reactor_event_loop()"), 1);
//执行到这里时,会调用Logging_Accept::handle_close()方法来动态删除Server_Logging_Daemon对象
//同时调用Logging_Event_Handler_Ex::handle_close()来清除服务器
return 0;
}
时序图如下:
ACE Reactor实现
本节主要介绍以下几个实现
ACE采用了桥接模式,使得这些类的接口保持不变,但能够适应不同的情景。
ACE_Select_Reactor类
ACE_Select_Reactor是 ACE_Reactor 接口的一种实现,它使用 select()同步事件多路分离器兩数来检测 I/O 和定时器事件。除了支持ACE Reactor接口的所有特性外,ACE_Select_Reactor类还提供了以下能力:
ACE_Select_Reactor是 ACE_Reactor 在除 Windows 而外的所有平台上的缺省实现:在 Windows上使用的是ACE_WFMO_Reactor。其主要实现架构如下
在缺省的情况下,ACE_Select_Reactor通过ACE_Pipe来实现通信,类似select,使用FD_SETSIZE来控制ACE_Select_Reactor可用的事件处理器个数,超过则需要进行更改,比较麻烦,需要重新编译ACE库,其通知机制如下:
ACE_Select_Reactor通过ACE_Token来进行加锁对线程所有者进行控制。 任何时候,只有一个线程(称为所有者)可以调用ACESelect_Reactor::handle_evens()。在缺省情况下,ACE_Reactor的所有权属于对其进行初始化的线程。ACE_Select_Reactor::owner()方法可用于将 所有权变更到特定的线程id。
使用ACE_Select_Reactor的notify()函数可以关闭日志服务器,如下图所示,步骤如下:
#include "ace/streams.h"
#include "ace/Reactor.h"
#include "ace/Select_Reactor.h"
#include "ace/Thread_Manager.h"
#include "Reactor_Logging_Server_T.h"
#include "Logging_Acceptor_Ex.h"
//实例化Server_Logging_Daemon模板
typedef Reactor_Logging_Server<Logging_Acceptor_Ex>
Server_Logging_Daemon;
int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
//将局部ACE_Reactor 设置为ACE_Select_Reactor
ACE_Select_Reactor select_reactor;
ACE_Reactor reactor (&select_reactor);
//创建Server_Logging_Daemon 实例
Server_Logging_Daemon *server;
// Ignore argv[0]...
--argc; ++argv;
ACE_NEW_RETURN (server,
Server_Logging_Daemon (argc, argv, &reactor),
1);
//使用ACE_Thread_Manager 派生出一个线程,执行event_loop和controller方法
ACE_Thread_Manager::instance ()->spawn (event_loop, &reactor);
ACE_Thread_Manager::instance ()->spawn (controller, &reactor);
//回收线程 避免资源浪费
return ACE_Thread_Manager::instance ()->wait ();
//全部执行完,同样也会调用handler_close()释放事件管理器资源
}
event_loop函数实现如下:
static ACE_THR_FUNC_RETURN event_loop (void *arg) {
ACE_Reactor *reactor = static_cast<ACE_Reactor *> (arg);
reactor->owner (ACE_OS::thr_self ());
reactor->run_reactor_event_loop ();
return 0;
}
controller函数实现如下:
static ACE_THR_FUNC_RETURN controller (void *arg) {
//创建Quit_Handler
//其hand_exception和hand_clsoe分别关闭ACE_Select_Reactor的
//事件循环和删除事件管理器
ACE_Reactor *reactor = static_cast<ACE_Reactor *> (arg);
Quit_Handler *quit_handler = 0;
ACE_NEW_RETURN (quit_handler, Quit_Handler (reactor), 0);
//进入循环 等待输入quit指令
//通过 反应器的notify方法将quit_handler传给反应器控制退出
for (;;) {
std::string user_input;
std::getline (cin, user_input, '\n');
if (user_input == "quit") {
reactor->notify (quit_handler);
break;
}
}
#endif
return 0;
}
Quit_Handler的具体实现如下,主要负责关闭ACE_Select_Reactor的事件循环和删除事件管理器
class Quit_Handler : public ACE_Event_Handler {
public:
Quit_Handler (ACE_Reactor *r) : ACE_Event_Handler (r) {}
virtual int handle_exception (ACE_HANDLE) {
reactor ()->end_reactor_event_loop ();
return -1; // Trigger call to handle_close() method.
}
virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask)
{ delete this; return 0; }
protected:
// Protected destructor ensures dynamic allocation.
virtual ~Quit_Handler () {}
};
ACE_TP_Reactor类
尽管 ACE_Select_Reactor很灵活,但在多线程化应用中它还体在一定的同限,因为只有所冇者线程能够调用其 handle_events()方法。因此,ACE_Select_Reactor 在事件多路分离对处理进行序列化,这对于某些网络化应用米说可能会太过受限,并且不可伸缩。解决这一问题的-种途径足派生多个线程,并在各个线程中运行单独的 ACE Select_Reactor 实例的事件循环。但是,这一设计可能难以编程,因为它要求开发者实现一个代理,在各个反应器之间均划分事件处理器,从而在各个线程间平均分摊负载。
使用ACE_Reaclor框架的ACE_TP_Reactor类往往是更为有效的消除ACE_Select_Reactor 的局限的途径;在这里“TP”意指“线程池"(Thread Pool)。其功能主要如下:
ACE_TP_Reactor派生自ACE_Select_Reactor类,并继承了其大部分实现。其日志服务器如图所示。
ACE_TP_Reactor与ACE_Select架构基本上是一致的,唯一的区别就是在hand_events采用了线程池的方法。
#include "ace/streams.h"
#include "ace/Reactor.h"
#include "ace/TP_Reactor.h"
#include "ace/Thread_Manager.h"
#include <memory>
#include "Reactor_Logging_Server_T.h"
#include "Logging_Acceptor_Ex.h"
//通过Logging_Acceptor_Ex来实例化Reactor_Logging_Server模板
//从而创建Server_Logging_Daemon类型
typedef Reactor_Logging_Server<Logging_Acceptor_Ex>
Server_Logging_Daemon;
int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
const size_t N_THREADS = 4;
//创建ACE_TP_Reactor实例,并通过ACE_Reactor实现
ACE_TP_Reactor tp_reactor;
ACE_Reactor reactor (&tp_reactor);
std::unique_ptr<ACE_Reactor> delete_instance
(ACE_Reactor::instance (&reactor));
//动态分配一个Server_Logging_Daemon对象
Server_Logging_Daemon *server;
// Ignore argv[0]...
--argc; ++argv;
ACE_NEW_RETURN (server,
Server_Logging_Daemon (argc, argv,
ACE_Reactor::instance ()),
1);
//创建N_THREADS个线程,每个线程都执行event_loop方法
//新的单体反应器指针会传给event_loop
//ACE_TP_Reactor 会忽略event_loop中的ower方法
ACE_Thread_Manager::instance ()->spawn_n
(N_THREADS, event_loop, ACE_Reactor::instance ());
//派生一个线程来执行controller
ACE_Thread_Manager::instance ()->spawn
(controller, ACE_Reactor::instance ());
//等待其他线程退出
return ACE_Thread_Manager::instance ()->wait ();
}
//结束后同样调用hand_close来释放资源
ACE_WFMO_Reactor类
尽管在大多数操作系统上都可以使用select()函数,但它并非在所有平台上都是最为高效或最为强大的事件多路分离器。在 Windows上,select()只支持 socket 句柄的多路分离。Windows 定义了 WaitForMultipleObjects()系统函数术缓解这些问题,但该函数使用起来比较棘手,为了更好利用WaitForMultipleObjects()函数资源,ACE提供了ACE_WFMO_Reactor对其进行管理。
Windows WaitForMultipleObjects()事件多路分离器函数与select()类似。它阻塞在最多包含 64个句柄的句柄数组上,直至它们中的一个或多个变为活动的(在 Windows 术语中称为“被激发"),或是其超时参数所指定的时间过去为止。程序员可以对它进行编程,让它或是在任意的一个或多个旬柄变得活动,或是在所有句柄变得活动时返回到调用者。在这两种情况下,它都会返回在调用者所指定的句柄数组最前面的活动句柄的索引。
select()只能多路分离 IO句柄,与此不同, WaitForMultipleObjects()可以等待许多类型的 Windows 对象,包括线程、进程、同步体(例如,事件、信号量或是互斥体)、变更通知、控制台输入以及定时器。主要功能如下:
ACE_WFMO_Reactor类在windows下是继承缺省的ACE_Reactor来实现的。其分派事件的顺序与ACE_Select_Reactor类是一样的。
ACE_WFMO_Reactor类与ACE_Select_Reactor类和ACE_TP_Reactor类在以下几个方面显著不同。
有限的句柄数:不超过62个,如果超过,则使用多个ACE_WFMO_Reactor类对象
WRITE_MASK语义与select()不同:select会持续检测WRITE状态,而WaitForMultipleObjects只在第一次连接的时候进行检测,需要持续地写入直到关闭。
不同的通知机制:ACE_Select_Reactor是通过ACE_Pipe进行通知,而ACE_WFMO_Reactor类则利用ACE_Message_Queue类来进行通知
并发考虑事项:ACE_WFMO_Reactor允许多个线程并发地调用hand_events函数,是因为维护了三个处理器信息对象集
延迟的事件处理器清理:select当hand_*返回-1或者调用remove函数时会释放资源,而ACE_WFMO_Reactor延迟了事件处理器清理,只有当上述描述的登记发生更改才会释放。
到同一处理器的多线程分派:多线程化应用可以使用ACE_WFMO_Reactor::handle_events()来并发地多路分离和分派事件。
因此,当有多个线程执行同一ACE_WFMO_Reactor 对象上的 handle_events()事件循环时,事件处理器必须明确地针对竞争状态实施保护。通过实现一种内部协议,在分派某个句柄的事件处理器之前自动将此句柄挂起,ACE_TP_Reactor绕开了这些竞争状态。
先从Quit_Handler来讲述ACE_WFMO_Reactor类的实现。
class Quit_Handler : public ACE_Event_Handler {
private:
ACE_Manual_Event quit_seen_;
public:
//设置控制台 读取整行文本
Quit_Handler (ACE_Reactor *r) : ACE_Event_Handler (r) {
SetConsoleMode (ACE_STDIN, ENABLE_LINE_INPUT
| ENABLE_ECHO_INPUT
| ENABLE_PROCESSED_INPUT);
if (reactor ()->register_handler
(this, quit_seen_.handle ()) == -1 //登记一个事件处理器 直到收到quit
//register_stdin_handler建立输入机制,
//使得Quit_Handler的hand_input能够重复调用,直到返回-1
|| ACE_Event_Handler::register_stdin_handler
(this, r, ACE_Thread_Manager::instance ()) == -1)
//如果登记失败 则立即结束
r->end_reactor_event_loop ();
}
Quit_Handler类的handle_input如下
virtual int handle_input (ACE_HANDLE h) {
CHAR user_input[BUFSIZ];
DWORD count;
if (!ReadFile (h, user_input, BUFSIZ, &count, 0))
return -1;
user_input[count] = '\0';
if (ACE_OS::strncmp (user_input, "quit", 4) == 0)
return -1;
return 0;
}
收到quit之后则返回-1,触发ACE_WFMO_Reactor分派 Quit_Handler类的handle_close
virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask) {
quit_seen_.signal ();
return 0;
}
当事件在close中被触发,会进行多路分离,并执行signal函数
virtual int handle_signal (int, siginfo_t *, ucontext_t *) {
reactor ()->end_reactor_event_loop ();
return 0;
}
这个挂钩方法调用了end_reactor_event_loop方法,从而让所有的事件处理线程停止。析构函数如下所示:
~Quit_Handler () {
//取消register_stdin_handler带来的效果
ACE_Event_Handler::remove_stdin_handler
(reactor (), ACE_Thread_Manager::instance ());
//从反应器中解除事件处理器的登记
//DONT_CALL标志用于防止回调hand_close函数
reactor ()->remove_handler (quit_seen_.handle (),
ACE_Event_Handler::DONT_CALL);
}
ACE框架还提供了ACE_Manaual_Event类和ACE_Auto_Event类,,这两个类允许进程中的多个线程以一种线程安全的方式在事件上进行等待,或是通知其他线程发生了特定事件。在Windows上这两个类是对本地事件对象的包装,而在其他平台上ACE模拟了Windows事件对象设施。
在下面这样的意义上,事件与条件变量是类似的:线程可以使用它们来发出信号(Signal),通知说“应用定义的事件已发生",或是等待该事件发生。但是,与无状态的条件变量不同,已激发(Signaled)的事件会保持设置状态,直至类特有的动作发生为止。例如,ACE_Manual_Event会保持设置状态,直至它被显式地复位为止。ACE_Auto_Event会保持设置状态,直至有一个线程在其上等待为止。这两个类允许用户控制“激发操作所唤醒的线程的数目”,并且允许事件显示出状态迁移,即使在事件被激发时没有线程在等待。
由于 ACE_WFMO_Reactor 和基于select0的反应器之间的并发差异,派生了 Logging_Event _Handler_WFMO 类来增加针对竞争状态的保护。只需重新定义 Logging_Event_Handler_Ex的 handle_input()挂钩方法,并增加一个互斥体来显式地序列化对“客户日志 daemon连接线程池中的线程”的访问。
class Logging_Event_Handler_WFMO : public Logging_Event_Handler_Ex
{
public:
Logging_Event_Handler_WFMO (ACE_Reactor *r)
: Logging_Event_Handler_Ex (r) {}
protected:
int handle_input (ACE_HANDLE) {
ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, lock_, -1);
return logging_handler_.log_record ();
}
ACE_Thread_Mutex lock_; // Serialize threads in thread pool.
};
因为 Logging_Acceptor_Ex(会为每个新的客户连接实例化一个新的Logging_Event_Handler_Ex对象,对不同的事件处理器类的使用也要求使用新的接受器类。当新客户连接到达时,下面的Logging_Acceptor_Ex的子类会实例化正确的事件处理器类型:
class Logging_Acceptor_WFMO : public Logging_Acceptor_Ex
{
public:
// Simple constructor to pass ACE_Reactor to base class.
Logging_Acceptor_WFMO (ACE_Reactor *r = ACE_Reactor::instance ())
: Logging_Acceptor_Ex (r) {};
protected:
virtual int handle_input (ACE_HANDLE) {
Logging_Event_Handler_WFMO *peer_handler = 0;
ACE_NEW_RETURN (peer_handler,
Logging_Event_Handler_WFMO (reactor ()),
-1);
if (acceptor_.accept (peer_handler->peer ()) == -1) {
delete peer_handler;
return -1;
} else if (peer_handler->open () == -1) {
peer_handler->handle_close ();
return -1;
}
return 0;
}
};
handle_input()方法不需要针对竞争状态实施保护,因为它只在方法的局部对象上进行操作。事实上,除了为每个新连接实例化的事件处理器类型,Logging_Acceptor_WFMO与Logging_Acceptor_Ex是完全一样的。
ACE_WFMO_Reactor的main函数实现如下所示:
int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
const size_t N_THREADS = 4;
//创建ACE_WFMO_Reactor 并实例化给ACE_Reactor
ACE_WFMO_Reactor wfmo_reactor;
ACE_Reactor reactor (&wfmo_reactor);
//初始化Server_Logging_Daemon 对象
Server_Logging_Daemon *server;
// Ignore argv[0]...
--argc; ++argv;
ACE_NEW_RETURN (server,
Server_Logging_Daemon (argc, argv, &reactor),
1);
//创建退出事件
Quit_Handler quit_handler (&reactor);
//初始化N_THREADS线程执行event_loop
ACE_Thread_Manager::instance ()->spawn_n
(N_THREADS, event_loop, &reactor);
return ACE_Thread_Manager::instance ()->wait ();
}
与ACE_TP_Reactor类的main函数比较主要有以下不同