ACE框架学习

news2024/12/26 0:06:58

目录

ACE库编译

ACE Reactor框架

ACE_Time_Value类

 ACE_Event_Handler类

ACE定时器队列类

ACE_Reator类

 ACE Reactor实现

 ACE_Select_Reactor类

ACE_TP_Reactor类

ACE_WFMO_Reactor类


ACE库编译

首先去ACE官网下载安装包,通过vs2017或者2019进行编译,安装包下载地址:

Obtaining ACE, TAO, CIAO and DAnCE (vanderbilt.edu)

根据需求下载,下载ACE.zip HTTP即可。 

下载后在本地进行解压,ACE_vs2019_sln使用vs2019进行打开,打开后根据需求进行生成,生成后会在lib目录中生成需要的动态库文件。

头文件通常在ace文件夹中

配置环境变量后,配置好头文件目录和库目录后,将ACEd.dll相关文件拷贝到指定的bin目录下即可开始使用 。

ACE Reactor框架

ACE_Time_Value类

主要用于标准化时间,同时重载了operator运算符,可直接对对象进行操作

常见类方法:

#pragma once
#include "ace/config-all.h"
#include "ace/ACE.h"
#include "ace/Time_Value.h"

#include "ace/Numeric_Limits.h"
#include "ace/OS.h"

using namespace std;

//ACETimeVau
const ACE_Time_Value max_interval(60 * 60);//1 hour

int ACETimeValue()
{
	//获取当前天日期
	ACE_Time_Value ex = ACE_OS::gettimeofday();
	ACE_Time_Value interval;

	//初始化方法
	ACE_Time_Value tv1;
	ACE_Time_Value tv2(2);
	ACE_Time_Value tv3(100U);
	ACE_Time_Value tv4(1, 1000000);

	//获取时间单位
	time_t sec = tv4.sec(); //秒
	time_t usec = tv4.usec(); //毫秒
	time_t msec = tv4.msec(); //微秒

	//赋值 判断方法
	ACE_Time_Value first;
	ACE_Time_Value last(ACE_Time_Value::max_time);
	first = last;
	ACE_TEST_ASSERT(first == last);

	return 0;
}

 ACE_Event_Handler类

所有反应堆的基类,功能如下:

1、定义各类事件,如输入输出、定时器、异常等事件。

2、运行使用需要方法去扩展事件处理器子类,而不必改变框架。

当事件发生时,会触发相应的挂钩事件,方法主要如下:

当事件向反应器登记事件处理器时,必须指定事件处理的一个或多个事件类型,ACE通过ACE_Event_Handler 定义了以下枚举类型

 这组事件可以填充传给ACE_Reator::register_hander()方法中的ACE_Reator_Mask参数。(后续介绍)

hander_*()返回值

如果为0 则代表是正常的,会继续检测事件

如果大于0 也是正常的,数量有限的时候会放弃控制事件

如果为-1 代表不正常,会停止检测事件

事件管理器的常用方法:

 当返回值为-1时,会调用了ACE_Reacotor::remove_handler()事件时,还可能将以下枚举量返回给handler_close()函数。

常用方法首先需要继承 ACE_Event_Handler类,内部具备ACE_Event_Mask对象用于设置操作类型,其中READ_MASK对应handle_input()方法,WRITE_MASK对应handle_close()方法。

 每个方法都会对应hand_close()函数,当返回-1时就会进行处理

  通常服务器实现需要基于ACE_Event_Handler类,重写其事件,下面对日志服务器进行举例说明

 他们之间的类关系图如下所示

 logging_Acceptor定义主要如下

logging_Acceptor继承自ACE_Event_Handler,并定义了ACE_SOCK_Acceptor工厂的一个实例

class Logging_Acceptor : public ACE_Event_Handler
{
protected:
  // Factory that connects <ACE_SOCK_Stream>s passively.
  ACE_SOCK_Acceptor acceptor_;

protected:
  virtual ~Logging_Acceptor () {}; // No-op destructor.
}

 logging_Acceptor主要实现了open,hand_open()、hand_close()、get_handle()等方法

logging_Acceptor::open()方法初始化被动模式的接收器socket,以在local_addr处侦听链接。随后想反应器登记自身,以处理accept事件。

int Logging_Acceptor::open (const ACE_INET_Addr &local_addr) {
  if (acceptor_.open (local_addr) == -1) return -1;
  return reactor ()->register_handler
           (this, ACE_Event_Handler::ACCEPT_MASK);
}

因为 ACE_SOCK_Acceptor 中的被动模式的socket 会在可以接受新连接时变成活动状态,反应器将自动分派Logging_Acceptor::handle_input()方法。我们将在首先定义了下面的Logging_Event_Handler 类之后,后续继续讲述 Logging_Acccpior::handie_input()方法的实现:

class Logging_Event_Handler : public ACE_Event_Handler
{
protected:
  // File where log records are written.
  ACE_FILE_IO log_file_;

  // Connection to remote peer.
  Logging_Handler logging_handler_;
}

  logging_Event_Handler类也同样继承ACE_Event_Handler,主要实现了open,hand_open()、hand_close()、get_handle()等方法,负责对日志进行具体的处理操作,将日志持久化到文件中,logging_Event_Handler还含有一个ACE_FILE_IO对象,用于为每个客户保持一个独立的日志文件。

现在我们已经慨述了Logging_Event_Handier,我们将要实现 Logging_Acceptor:handle_input():每当可以接受新连接时,反应器就会对儿进行分派。这个工厂方法创建,连接并激活一个Logging_Event_Handler,如下所示:

int Logging_Acceptor::handle_input (ACE_HANDLE) {
  //用于处理相应连接
  Logging_Event_Handler *peer_handler = 0;
  ACE_NEW_RETURN (peer_handler,
                  Logging_Event_Handler (reactor ()),
                  -1);
  
  //监听数据
  if (acceptor_.accept (peer_handler->peer ()) == -1) {
    delete peer_handler;
    return -1;
  } else if (peer_handler->open () == -1) {
    peer_handler->handle_close ();
    return -1;
  }
  return 0;
}

 上述代码中,创建了一个logging_Event_Handler用于处理相应的连接,accept负责监听socket句柄,如果返回了-1,则会自动调用hand_close()函数进行回收处理。

int Logging_Acceptor::handle_close (ACE_HANDLE,
                                    ACE_Reactor_Mask) {
  acceptor_.close ();
  delete this;
  return 0;
}

 accept之后,执行open函数,该函数主要负责打开并存储日志文件,连接的客户的主机名作为日志文件名字,存在则打开,不存在就创建文件。

int Logging_Event_Handler::open () {
  //根据连接的主机名设置日志名字
  static const char LOGFILE_SUFFIX[] = ".log";
  char filename[MAXHOSTNAMELEN + sizeof (LOGFILE_SUFFIX)];
  ACE_INET_Addr logging_peer_addr;

  logging_handler_.peer ().get_remote_addr (logging_peer_addr);
  logging_peer_addr.get_host_name (filename, MAXHOSTNAMELEN);
  ACE_OS::strcat (filename, LOGFILE_SUFFIX);

  //创建或打开客户端日志文件
  ACE_FILE_Connector connector;
  connector.connect (log_file_,
                     ACE_FILE_Addr (filename),
                     0, // No timeout.
                     ACE_Addr::sap_any, // Ignored.
                     0, // Don't try to reuse the addr.
                     O_RDWR|O_CREAT|O_APPEND,
                     ACE_DEFAULT_FILE_PERMS);

  //使用ACE_Reactor:register_handler()方法来为READ事件
  //向Logging_Acceptor行的反应器登记 this 事件处理器。
  return reactor ()->register_handler
    (this, ACE_Event_Handler::READ_MASK);
}

最后登记READ事件,当有日志记录返回时,反应堆将自动派发logging_Event_Handler::hand_input()函数处理该事件

int Logging_Event_Handler::handle_input (ACE_HANDLE)
{ return logging_handler_.log_record (); }

  当recv()或read()返回0,-1时,代表这处理器随即确定关闭,关闭的情况可能有多种,网线拔了,主机奔溃等等,处理决策主要有:

1、等待,直到收到关闭连接信号才关闭,其他事件保持等待,但可能会导致等待事件过长。

2、发送心跳包数据。周期性发送心跳包数据,如果对端没有在应用时段内发送心跳消息,则单方面终止连接,则应用可以在后面再尝试打开连接。如果对端在某段时间内没有发送任何数据,连接 就被放弃,实现通常依赖于 ACE定时器队列类。

ACE定时器队列类

许多网络应用周期性地进行某种活动,或必须过了规定时间段之后收到通知。通常会遇到定时器数目有限、定时器到期会引发信号,导致不能实现跨平台等问题,避开上述问题,则可以通过对定时器进行管理,如下所示:

ACE定时器队列类同样继承于ACE_Event_Handle的时间驱动事件处理器,功能和方法如下:

 

#include "ace/Profile_Timer.h"
#include "ace/Timer_Queue.h"
#include "ace/Timer_List.h"
#include "ace/Timer_Heap.h"
#include "ace/Timer_Wheel.h"
#include "ace/Timer_Hash.h"
#include "ace/Timer_Queue.h"
#include "ace/Time_Policy.h"
#include "ace/Recursive_Thread_Mutex.h"
#include "ace/Null_Mutex.h"
#include "ace/OS_NS_unistd.h"
#include "ace/Containers_T.h"
#include "ace/Event_Handler.h"

//这个类继承了ACE_Event_Handler 重写了handle_close handle_timeout方法
class Example_Handler : public ACE_Event_Handler
{
public:
    Example_Handler() : close_count_(0) {}

    int handle_close(ACE_HANDLE, ACE_Reactor_Mask mask) override
    {
        ACE_TEST_ASSERT(mask == ACE_Event_Handler::TIMER_MASK);
        this->close_count_++;
        return 0;
    }

    int handle_timeout(const ACE_Time_Value&,
        const void* arg) override
    {
        int* act = (int*)arg;
        ACE_TEST_ASSERT(*act == 42 || *act == 007);
        int result = 0;

        if (*act == 007)
            result = -1; // This is the special value to trigger a handle_close

        delete act;
        return result;
    }

    /// Keeps track of the number of times that <handle_close> is called.
    int close_count_;
};

int ACETIMEQueue()
{
    ACE_Timer_Queue* tq;

    //ACE_Time_Value的schedule方法
    //设置一个定时器,以50毫秒为间隔启动。
    Example_Handler ih;
    ACE_Time_Value interval(0, 50 * 1000 /* number of usec in millisecond */);
    const unsigned NUM_INTERVAL_FIRINGS = 50;
    ACE_Time_Value loop_stop_time =
        tq->gettimeofday() + (NUM_INTERVAL_FIRINGS * interval);
    const unsigned EXPECTED_TRIP_COUNT =
        NUM_INTERVAL_FIRINGS + 1 /* for the first immediate firing */;

    //通过schedule指定定时器 到了时间点就轮询执行 handle_timeout函数
    long id = tq->schedule(&ih, 0 /* no act */, ACE_Time_Value::zero, interval);
    ACE_TEST_ASSERT(id != -1);

    //分配到小于或等于loop_stop_time的事件中
    do
    {
        tq->expire();
    } while (tq->gettimeofday() < loop_stop_time);

    //在指定事件执行handle_timeout函数
    Example_Handler eh;
    ACE_Time_Value earliest_time = tq->gettimeofday();

    const void* timer_act = 0;
    ACE_NEW(timer_act, int(1));
    timer_id = tq->schedule(&eh, timer_act, earliest_time);

    ACE_OS::sleep(ACE_Time_Value(0, 10));

    ACE_NEW(timer_act, int(1));
    timer_id2 = tq->schedule(&eh, timer_act, tq->gettimeofday());

    long result = tq->earliest_time() == earliest_time;
    ACE_TEST_ASSERT(result != 0);

    //取消事件 删除对象
    tq->cancel(timer_id, &timer_act);
    delete (int*)timer_act;
    tq->cancel(timer_id2, &timer_act);
    delete (int*)timer_act;
}

 上述文章中的logging_Accept和logging_event_hander虽然实现了日志服务器,但是长时间连接不发送数据的话,可以会导致资源的浪费,因此使用定时器进行管理,超出规定时间内执行以下事件:

 在logging_Accept和logging_event_hander的基础上,添加定时器队列,使用logging_Accept_Ex和logging_event_hander_Ex进行实现,其中,logging_event_hander_Ex的每个实例都登记了一个定时器。

logging_Accept_Ex改动较小,只是重新定义并修改了handle_input方法,

class Logging_Acceptor_Ex : public Logging_Acceptor
{
public:
  typedef ACE_INET_Addr PEER_ADDR;

  // Simple constructor to pass ACE_Reactor to base class.
  Logging_Acceptor_Ex (ACE_Reactor *r = ACE_Reactor::instance ())
    : Logging_Acceptor (r) {}

  int handle_input (ACE_HANDLE) {
    Logging_Event_Handler_Ex *peer_handler = 0;
    ACE_NEW_RETURN (peer_handler,
                    Logging_Event_Handler_Ex (reactor ()),
                    -1);
    if (acceptor_.accept (peer_handler->peer ()) == -1) {
      delete peer_handler;
      return -1;
    } else if (peer_handler->open () == -1) {
      peer_handler->handle_close ();
      return -1;
    }
    return 0;
  }
};

 logging_event_hander_Ex类中则更为复杂,定义了ACE_Time_Value时间类

class Logging_Event_Handler_Ex : public Logging_Event_Handler
{
private:
  // Time when a client last sent a log record.
  ACE_Time_Value time_of_last_log_record_;

  // Maximum time to wait for a client log record.
  const ACE_Time_Value max_client_timeout_;

public:
  typedef Logging_Event_Handler PARENT;

  // 3600 seconds == one hour.
  enum { MAX_CLIENT_TIMEOUT = 3600 };

  Logging_Event_Handler_Ex
    (ACE_Reactor *reactor,
     const ACE_Time_Value &max_client_timeout
       = ACE_Time_Value (MAX_CLIENT_TIMEOUT))
    : Logging_Event_Handler (reactor),
      time_of_last_log_record_ (0),
      max_client_timeout_ (max_client_timeout) {}

  virtual ~Logging_Event_Handler_Ex () {}

  virtual int open (); // Activate the event handler.

  // Called by a reactor when logging events arrive.
  virtual int handle_input (ACE_HANDLE);

  // Called when a timeout expires to check if the client has
  // been idle for an excessive amount of time.
  virtual int handle_timeout (const ACE_Time_Value &tv,
                              const void *act);

  // Called when this object is destroyed, e.g., when it's
  // removed from a reactor.
  virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
                            ACE_Reactor_Mask = 0);
};

 logging_event_hander_Ex中的handle_input函数记录相连客户端接收到日志的时间,从定时器队列中获取到时间,之后转到父类的handle_input方法进行处理。

int Logging_Event_Handler_Ex::handle_input (ACE_HANDLE h) {
  time_of_last_log_record_ =
    reactor ()->timer_queue ()->gettimeofday ();
  return PARENT::handle_input (h);
}

 open方法中也需要进行修改,用于定义最大的超时时间,4-8行则调用了ACE_Reator类的schedule_timer函数,用于检查是否有客户最近发送日志给它,初始化定时器,使其在max_client_timeout时间过期,也就是一小时,max_client_timeout/4检查一下,也就是15分钟

int Logging_Event_Handler_Ex::open () {
  int result = PARENT::open ();
  if (result != -1) {
    ACE_Time_Value reschedule (max_client_timeout_.sec () / 4);
    result =
      reactor ()->schedule_timer
        (this,
         0,
         max_client_timeout_,  // Initial timeout.
         reschedule);          // Subsequent timeouts.
  }
  return result;
}

 定时器到时时会执行handle_timeout()函数,函数如下:

int Logging_Event_Handler_Ex::handle_timeout
  (const ACE_Time_Value &now, const void *) {
  if (now - time_of_last_log_record_ >= max_client_timeout_)
    reactor ()->remove_handler (this, ACE_Event_Handler::READ_MASK);
  return 0;
}

 handle_timeout()该函数主要用于调用remove_handler,触发反应器,执行hand_close()方法移除事件的过滤器

int Logging_Event_Handler_Ex::handle_close (ACE_HANDLE,
                                            ACE_Reactor_Mask) {
  reactor ()->cancel_timer (this);
  return PARENT::handle_close ();
}

 该方法取消此处理器的过滤器,并调用父类的hand_close()方法,关闭日志文件及连接到客户的socket,并随即删除自身。

ACE_Reator类

ACE_Reator类实现门面模式,用于统一接口,定义应用于访问各种ACE_Reator框架特性接口,功能如下:

 其方法包括了多方面,主要方法如下:

1、初始化和析构方法 用于初始化和销毁

 2、事件处理器管理方法 用于登记或从中移除事件管理器

 其中register_handler()的常用方法如下:

3、事件循环管理方法 控制何时分配应用事件处理器,在登记的初始事件之后,应用下表方法来管理事件循环

 4、定时器管理方法 缺省情况下 调用ACE_Timer_Heap定时器队列来根据超时最后期限调度和分配事件管理器

使用这些方法,可以不需要与ACE定义器队列直接交互,而是复用其功能。

 5、通知方法 反应器拥有一种通知机制,应用可将其用于把事件和事件管理器插入反应器的分派引擎中,方法如下:

 6、实用方法 使得只有单个线程运行某种方法或采用单例模式,更好进行管理,防止死锁等问题。如ACE_Select_Reactor的hand_event()方法。

 下面将介绍一个新的日志服务器Reactive_Logging_Server_Ex。Reactive_Logging_Server_Ex主要对两种数据结构进行映射:

使用了ACE::Select()进行事件检测、多路分离和分派。但也存在以下缺点

而logging_Event_Handler则更具有优点:

 接下里介绍Reactor_Logging_Server.

#include "ace/ACE.h"
#include "ace/Reactor.h"

template <class ACCEPTOR>
class Reactor_Logging_Server : public ACCEPTOR
{
public:
  Reactor_Logging_Server (int argc, char *argv[],
                          ACE_Reactor *reactor);
};
template <class ACCEPTOR>
Reactor_Logging_Server<ACCEPTOR>::Reactor_Logging_Server
  (int argc, char *argv[], ACE_Reactor *reactor)
  : ACCEPTOR (reactor) {
  u_short logger_port = argc > 0 ? ACE_OS::atoi (argv[0]) : 0;//设置用于监听客户端连接的端口号
  typename ACCEPTOR::PEER_ADDR server_addr;
  int result;
   
  //设置本地服务器地址
  if (logger_port != 0)
    result = server_addr.set (logger_port,
                              (ACE_UINT32) INADDR_ANY);
  else
    result = server_addr.set ("ace_logger",
                              (ACE_UINT32) INADDR_ANY);

  //将地址传给ACCEPTOR::open,并登记this对象
  if (result != -1)
    result = ACCEPTOR::open (server_addr);
  if (result == -1) reactor->end_reactor_event_loop ();
}

 以日志服务器的main函数作为结束

//通过Logging_Acceptor_Ex来实例化 Reactor_Logging_Server模板
typedef Reactor_Logging_Server<Logging_Acceptor_Ex>
        Server_Logging_Daemon;

int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
  ACE_Reactor reactor;
  Server_Logging_Daemon *server; //动态分配Server_Logging_Daemon 
  // Ignore argv[0]...
  --argc; ++argv;
  ACE_NEW_RETURN (server,
                  Server_Logging_Daemon (argc, argv, &reactor),
                  1);

  //使用ACE_Reactor的局部实例来驱动所有所有后续的连接和数据事件处理
  if (reactor.run_reactor_event_loop () == -1)
    ACE_ERROR_RETURN ((LM_ERROR, "%p\n",
                       "run_reactor_event_loop()"), 1);
  //执行到这里时,会调用Logging_Accept::handle_close()方法来动态删除Server_Logging_Daemon对象
  //同时调用Logging_Event_Handler_Ex::handle_close()来清除服务器
  return 0;
}

时序图如下:

 ACE Reactor实现

本节主要介绍以下几个实现

 ACE采用了桥接模式,使得这些类的接口保持不变,但能够适应不同的情景。

 ACE_Select_Reactor类

ACE_Select_Reactor是 ACE_Reactor 接口的一种实现,它使用 select()同步事件多路分离器兩数来检测 I/O 和定时器事件。除了支持ACE Reactor接口的所有特性外,ACE_Select_Reactor类还提供了以下能力:

 ACE_Select_Reactor是 ACE_Reactor 在除 Windows 而外的所有平台上的缺省实现:在 Windows上使用的是ACE_WFMO_Reactor。其主要实现架构如下

在缺省的情况下,ACE_Select_Reactor通过ACE_Pipe来实现通信,类似select,使用FD_SETSIZE来控制ACE_Select_Reactor可用的事件处理器个数,超过则需要进行更改,比较麻烦,需要重新编译ACE库,其通知机制如下:

 ACE_Select_Reactor通过ACE_Token来进行加锁对线程所有者进行控制。 任何时候,只有一个线程(称为所有者)可以调用ACESelect_Reactor::handle_evens()。在缺省情况下,ACE_Reactor的所有权属于对其进行初始化的线程。ACE_Select_Reactor::owner()方法可用于将 所有权变更到特定的线程id。

使用ACE_Select_Reactor的notify()函数可以关闭日志服务器,如下图所示,步骤如下:

#include "ace/streams.h"
#include "ace/Reactor.h"
#include "ace/Select_Reactor.h"
#include "ace/Thread_Manager.h"
#include "Reactor_Logging_Server_T.h"
#include "Logging_Acceptor_Ex.h"

//实例化Server_Logging_Daemon模板
typedef Reactor_Logging_Server<Logging_Acceptor_Ex>
        Server_Logging_Daemon;

int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
  //将局部ACE_Reactor 设置为ACE_Select_Reactor 
  ACE_Select_Reactor select_reactor;
  ACE_Reactor reactor (&select_reactor);
  
  //创建Server_Logging_Daemon 实例
  Server_Logging_Daemon *server;
  // Ignore argv[0]...
  --argc; ++argv;
  ACE_NEW_RETURN (server,
                  Server_Logging_Daemon (argc, argv, &reactor),
                  1);
  
  //使用ACE_Thread_Manager 派生出一个线程,执行event_loop和controller方法
  ACE_Thread_Manager::instance ()->spawn (event_loop, &reactor);
  ACE_Thread_Manager::instance ()->spawn (controller, &reactor);

  //回收线程 避免资源浪费
  return ACE_Thread_Manager::instance ()->wait ();
  
  //全部执行完,同样也会调用handler_close()释放事件管理器资源
}

event_loop函数实现如下:

static ACE_THR_FUNC_RETURN event_loop (void *arg) {
  ACE_Reactor *reactor = static_cast<ACE_Reactor *> (arg);

  reactor->owner (ACE_OS::thr_self ());
  reactor->run_reactor_event_loop ();
  return 0;
}

controller函数实现如下:

static ACE_THR_FUNC_RETURN controller (void *arg) {

  //创建Quit_Handler
  //其hand_exception和hand_clsoe分别关闭ACE_Select_Reactor的
  //事件循环和删除事件管理器
  ACE_Reactor *reactor = static_cast<ACE_Reactor *> (arg);
  Quit_Handler *quit_handler = 0;

  ACE_NEW_RETURN (quit_handler, Quit_Handler (reactor), 0);
  
  //进入循环 等待输入quit指令
  //通过 反应器的notify方法将quit_handler传给反应器控制退出
  for (;;) {
    std::string user_input;
    std::getline (cin, user_input, '\n');
    if (user_input == "quit") {
      reactor->notify (quit_handler);
      break;
    }
  }
#endif

  return 0;
}

Quit_Handler的具体实现如下,主要负责关闭ACE_Select_Reactor的事件循环和删除事件管理器

class Quit_Handler : public ACE_Event_Handler {
public:
  Quit_Handler (ACE_Reactor *r) : ACE_Event_Handler (r) {}

  virtual int handle_exception (ACE_HANDLE) {
    reactor ()->end_reactor_event_loop ();
    return -1; // Trigger call to handle_close() method.
  }

  virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask)
  { delete this; return 0; }

protected:
  // Protected destructor ensures dynamic allocation.
  virtual ~Quit_Handler () {}
};

ACE_TP_Reactor类

尽管 ACE_Select_Reactor很灵活,但在多线程化应用中它还体在一定的同限,因为只有所冇者线程能够调用其 handle_events()方法。因此,ACE_Select_Reactor 在事件多路分离对处理进行序列化,这对于某些网络化应用米说可能会太过受限,并且不可伸缩。解决这一问题的-种途径足派生多个线程,并在各个线程中运行单独的 ACE Select_Reactor 实例的事件循环。但是,这一设计可能难以编程,因为它要求开发者实现一个代理,在各个反应器之间均划分事件处理器,从而在各个线程间平均分摊负载。

使用ACE_Reaclor框架的ACE_TP_Reactor类往往是更为有效的消除ACE_Select_Reactor 的局限的途径;在这里“TP”意指“线程池"(Thread Pool)。其功能主要如下:

 ACE_TP_Reactor派生自ACE_Select_Reactor类,并继承了其大部分实现。其日志服务器如图所示。

ACE_TP_Reactor与ACE_Select架构基本上是一致的,唯一的区别就是在hand_events采用了线程池的方法。

#include "ace/streams.h"
#include "ace/Reactor.h"
#include "ace/TP_Reactor.h"
#include "ace/Thread_Manager.h"
#include <memory>
#include "Reactor_Logging_Server_T.h"
#include "Logging_Acceptor_Ex.h"

//通过Logging_Acceptor_Ex来实例化Reactor_Logging_Server模板
//从而创建Server_Logging_Daemon类型
typedef Reactor_Logging_Server<Logging_Acceptor_Ex>
        Server_Logging_Daemon;

int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
  const size_t N_THREADS = 4;

  //创建ACE_TP_Reactor实例,并通过ACE_Reactor实现
  ACE_TP_Reactor tp_reactor;
  ACE_Reactor reactor (&tp_reactor);
  std::unique_ptr<ACE_Reactor> delete_instance
    (ACE_Reactor::instance (&reactor));

  //动态分配一个Server_Logging_Daemon对象
  Server_Logging_Daemon *server;
  // Ignore argv[0]...
  --argc; ++argv;
  ACE_NEW_RETURN (server,
                  Server_Logging_Daemon (argc, argv,
                    ACE_Reactor::instance ()),
                  1);

  //创建N_THREADS个线程,每个线程都执行event_loop方法
  //新的单体反应器指针会传给event_loop
  //ACE_TP_Reactor 会忽略event_loop中的ower方法
  ACE_Thread_Manager::instance ()->spawn_n
    (N_THREADS, event_loop, ACE_Reactor::instance ());
  
  //派生一个线程来执行controller
  ACE_Thread_Manager::instance ()->spawn
    (controller, ACE_Reactor::instance ());

  //等待其他线程退出
  return ACE_Thread_Manager::instance ()->wait ();
}
//结束后同样调用hand_close来释放资源

ACE_WFMO_Reactor类

尽管在大多数操作系统上都可以使用select()函数,但它并非在所有平台上都是最为高效或最为强大的事件多路分离器。在 Windows上,select()只支持 socket 句柄的多路分离。Windows 定义了 WaitForMultipleObjects()系统函数术缓解这些问题,但该函数使用起来比较棘手,为了更好利用WaitForMultipleObjects()函数资源,ACE提供了ACE_WFMO_Reactor对其进行管理。

Windows WaitForMultipleObjects()事件多路分离器函数与select()类似。它阻塞在最多包含 64个句柄的句柄数组上,直至它们中的一个或多个变为活动的(在 Windows 术语中称为“被激发"),或是其超时参数所指定的时间过去为止。程序员可以对它进行编程,让它或是在任意的一个或多个旬柄变得活动,或是在所有句柄变得活动时返回到调用者。在这两种情况下,它都会返回在调用者所指定的句柄数组最前面的活动句柄的索引。

select()只能多路分离 IO句柄,与此不同, WaitForMultipleObjects()可以等待许多类型的 Windows 对象,包括线程、进程、同步体(例如,事件、信号量或是互斥体)、变更通知、控制台输入以及定时器。主要功能如下:

ACE_WFMO_Reactor类在windows下是继承缺省的ACE_Reactor来实现的。其分派事件的顺序与ACE_Select_Reactor类是一样的。

ACE_WFMO_Reactor类与ACE_Select_Reactor类和ACE_TP_Reactor类在以下几个方面显著不同。

有限的句柄数:不超过62个,如果超过,则使用多个ACE_WFMO_Reactor类对象

WRITE_MASK语义与select()不同:select会持续检测WRITE状态,而WaitForMultipleObjects只在第一次连接的时候进行检测,需要持续地写入直到关闭。

不同的通知机制:ACE_Select_Reactor是通过ACE_Pipe进行通知,而ACE_WFMO_Reactor类则利用ACE_Message_Queue类来进行通知

并发考虑事项:ACE_WFMO_Reactor允许多个线程并发地调用hand_events函数,是因为维护了三个处理器信息对象集

延迟的事件处理器清理:select当hand_*返回-1或者调用remove函数时会释放资源,而ACE_WFMO_Reactor延迟了事件处理器清理,只有当上述描述的登记发生更改才会释放。

到同一处理器的多线程分派:多线程化应用可以使用ACE_WFMO_Reactor::handle_events()来并发地多路分离和分派事件。

因此,当有多个线程执行同一ACE_WFMO_Reactor 对象上的 handle_events()事件循环时,事件处理器必须明确地针对竞争状态实施保护。通过实现一种内部协议,在分派某个句柄的事件处理器之前自动将此句柄挂起,ACE_TP_Reactor绕开了这些竞争状态。

先从Quit_Handler来讲述ACE_WFMO_Reactor类的实现。

class Quit_Handler : public ACE_Event_Handler {
private:
  ACE_Manual_Event quit_seen_;

public:
  //设置控制台 读取整行文本
  Quit_Handler (ACE_Reactor *r) : ACE_Event_Handler (r) {
    SetConsoleMode (ACE_STDIN, ENABLE_LINE_INPUT
                               | ENABLE_ECHO_INPUT
                               | ENABLE_PROCESSED_INPUT);
    if (reactor ()->register_handler
          (this, quit_seen_.handle ()) == -1 //登记一个事件处理器 直到收到quit
     
      //register_stdin_handler建立输入机制,
      //使得Quit_Handler的hand_input能够重复调用,直到返回-1
        || ACE_Event_Handler::register_stdin_handler
             (this, r, ACE_Thread_Manager::instance ()) == -1)

      //如果登记失败 则立即结束
      r->end_reactor_event_loop ();
  }

 Quit_Handler类的handle_input如下

  virtual int handle_input (ACE_HANDLE h) {
    CHAR user_input[BUFSIZ];
    DWORD count;
    if (!ReadFile (h, user_input, BUFSIZ, &count, 0))
      return -1;

    user_input[count] = '\0';
    if (ACE_OS::strncmp (user_input, "quit", 4) == 0)
      return -1;
    return 0;
  }

收到quit之后则返回-1,触发ACE_WFMO_Reactor分派 Quit_Handler类的handle_close

  virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask) {
    quit_seen_.signal ();
    return 0;
  }

当事件在close中被触发,会进行多路分离,并执行signal函数

  virtual int handle_signal (int, siginfo_t *, ucontext_t *) {
    reactor ()->end_reactor_event_loop ();
    return 0;
  }

这个挂钩方法调用了end_reactor_event_loop方法,从而让所有的事件处理线程停止。析构函数如下所示:

  ~Quit_Handler () {
    //取消register_stdin_handler带来的效果
    ACE_Event_Handler::remove_stdin_handler
      (reactor (), ACE_Thread_Manager::instance ());

    //从反应器中解除事件处理器的登记
    //DONT_CALL标志用于防止回调hand_close函数
    reactor ()->remove_handler (quit_seen_.handle (),
                                ACE_Event_Handler::DONT_CALL);
  }

ACE框架还提供了ACE_Manaual_Event类和ACE_Auto_Event类,,这两个类允许进程中的多个线程以一种线程安全的方式在事件上进行等待,或是通知其他线程发生了特定事件。在Windows上这两个类是对本地事件对象的包装,而在其他平台上ACE模拟了Windows事件对象设施。

在下面这样的意义上,事件与条件变量是类似的:线程可以使用它们来发出信号(Signal),通知说“应用定义的事件已发生",或是等待该事件发生。但是,与无状态的条件变量不同,已激发(Signaled)的事件会保持设置状态,直至类特有的动作发生为止。例如,ACE_Manual_Event会保持设置状态,直至它被显式地复位为止。ACE_Auto_Event会保持设置状态,直至有一个线程在其上等待为止。这两个类允许用户控制“激发操作所唤醒的线程的数目”,并且允许事件显示出状态迁移,即使在事件被激发时没有线程在等待。

由于 ACE_WFMO_Reactor 和基于select0的反应器之间的并发差异,派生了 Logging_Event _Handler_WFMO 类来增加针对竞争状态的保护。只需重新定义 Logging_Event_Handler_Ex的 handle_input()挂钩方法,并增加一个互斥体来显式地序列化对“客户日志 daemon连接线程池中的线程”的访问。

class Logging_Event_Handler_WFMO : public Logging_Event_Handler_Ex
{
public:
  Logging_Event_Handler_WFMO (ACE_Reactor *r)
    : Logging_Event_Handler_Ex (r) {}

protected:
  int handle_input (ACE_HANDLE) {
    ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, lock_, -1);
    return logging_handler_.log_record ();
  }

  ACE_Thread_Mutex lock_; // Serialize threads in thread pool.
};

因为 Logging_Acceptor_Ex(会为每个新的客户连接实例化一个新的Logging_Event_Handler_Ex对象,对不同的事件处理器类的使用也要求使用新的接受器类。当新客户连接到达时,下面的Logging_Acceptor_Ex的子类会实例化正确的事件处理器类型:

class Logging_Acceptor_WFMO : public Logging_Acceptor_Ex
{
public:
  // Simple constructor to pass ACE_Reactor to base class.
  Logging_Acceptor_WFMO (ACE_Reactor *r = ACE_Reactor::instance ())
    : Logging_Acceptor_Ex (r) {};

protected:
  virtual int handle_input (ACE_HANDLE) {
    Logging_Event_Handler_WFMO *peer_handler = 0;
    ACE_NEW_RETURN (peer_handler,
                    Logging_Event_Handler_WFMO (reactor ()),
                    -1);

    if (acceptor_.accept (peer_handler->peer ()) == -1) {
      delete peer_handler;
      return -1;
    } else if (peer_handler->open () == -1) {
      peer_handler->handle_close ();
      return -1;
    }
    return 0;
  }
};

handle_input()方法不需要针对竞争状态实施保护,因为它只在方法的局部对象上进行操作。事实上,除了为每个新连接实例化的事件处理器类型,Logging_Acceptor_WFMO与Logging_Acceptor_Ex是完全一样的。

ACE_WFMO_Reactor的main函数实现如下所示:

int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
{
  const size_t N_THREADS = 4;

  //创建ACE_WFMO_Reactor 并实例化给ACE_Reactor 
  ACE_WFMO_Reactor wfmo_reactor;
  ACE_Reactor reactor (&wfmo_reactor);

  //初始化Server_Logging_Daemon 对象
  Server_Logging_Daemon *server;
  // Ignore argv[0]...
  --argc; ++argv;
  ACE_NEW_RETURN (server,
                  Server_Logging_Daemon (argc, argv, &reactor),
                  1);

  //创建退出事件
  Quit_Handler quit_handler (&reactor);

  //初始化N_THREADS线程执行event_loop
  ACE_Thread_Manager::instance ()->spawn_n
    (N_THREADS, event_loop, &reactor);
  return ACE_Thread_Manager::instance ()->wait ();
}

与ACE_TP_Reactor类的main函数比较主要有以下不同

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1605707.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

改进下记录学习的小网站

Strong改进 结束&#xff1a;2024-4-14 打算投入&#xff1a;10h 实际消耗&#xff1a;12h 3m 学习总是不在状态。 我的时间花得很零散&#xff0c;也有点茫然。所以想尝试一下集中式地、一块一块地花&#xff0c;比如投入30个小时&#xff0c;去干一件事&#xff0c;这样就可…

vue3数字滚动组件

效果图 一、安装插件 npm i vue3-count-to 二、components文件夹下新建BaseCountTo.vue文件 <template><BaseCountTo :endVal"endVal" :decimals"decimals" /> </template> <script setup > import { defineComponent, watch, r…

游游的you矩阵

题目&#xff1a; 游游拿到了一个字符矩阵&#xff0c;她想知道有多少个三角形满足以下条件&#xff1a; 三角形的三个顶点分别是 y、o、u 字符。三角形为直角三角形&#xff0c;且两个直角边一个为水平、另一个为垂直。 输入描述&#xff1a; 第一行输入两个正整数n,m&#…

[生活][杂项] 如何正确打开编织袋

编织袋打开的正确姿势 面对单线分离右边的线头&#xff0c;然后依次拉开即可

Python(九十四)变量的作用域

❤️ 专栏简介&#xff1a;本专栏记录了我个人从零开始学习Python编程的过程。在这个专栏中&#xff0c;我将分享我在学习Python的过程中的学习笔记、学习路线以及各个知识点。 ☀️ 专栏适用人群 &#xff1a;本专栏适用于希望学习Python编程的初学者和有一定编程基础的人。无…

G2D图像处理硬件调用和测试-基于米尔-全志T113-i开发板

本篇测评由电子工程世界的优秀测评者“jf_99374259”提供。 本文将介绍基于米尔电子MYD-YT113i开发板的G2D图像处理硬件调用和测试。 MYC-YT113i核心板及开发板 真正的国产核心板&#xff0c;100%国产物料认证 国产T113-i处理器配备2*Cortex-A71.2GHz &#xff0c;RISC-V 外置…

UE4 拍摄、保存并浏览相册

效果&#xff1a; 1.新建CameraActor类 2.修改截图保存路径 3.编写BP_Camera蓝图 注意路径 Save Image函数要在执行拍照和BeginPlay事件执行一次 按钮执行拍摄事件 3.编写UMG蓝图 技巧&#xff1a;让Index加1、减1循环赋值 4.把BP_Camera挂在玩家上

renren-fast-vue-master常见报错和解决

前言&#xff1a; 因为最近博主的实习&#xff0c;所以在小破站写那个分布式微服务电商的项目&#xff0c;什么什么商城就不说了&#xff0c;大家都明白&#xff0c;相信大家像我一样&#xff0c;在使用renren-fast-vue-master的时候都很是头痛&#xff0c;项目还没开始就结束了…

2024年第十六届“华中杯”(B题)大学生数学建模挑战赛| 时间序列,滑动窗口 | 数学建模完整代码+建模过程全解全析

当大家面临着复杂的数学建模问题时&#xff0c;你是否曾经感到茫然无措&#xff1f;作为2022年美国大学生数学建模比赛的O奖得主&#xff0c;我为大家提供了一套优秀的解题思路&#xff0c;让你轻松应对各种难题。 让我们来看看华中杯 (B题&#xff09;&#xff01; CS团队倾…

C语言链表讲解

链表的概念与结构 链表是一种物理存储非连续&#xff0c;非顺序的存储结构&#xff0c;数据元素的逻辑顺序是通过链表中的指针链接次序实现的。 如图所示: 链表通过指针域把一个一个节点链接起来,而最后一个节点的指针域指向NULL,表示到头了。 链表与顺序表的对比 链表是一种…

[疑难杂症2024-003]如何判断一张没有头信息的dcm图像,是否是压缩图像?

本文由Markdown语法编辑器编辑完成&#xff0e; 1. 前言: DCM格式&#xff0c;是医学图像领域里面的通用格式&#xff0e;DCM图像一般分为两大部分&#xff0c;一部分是TAG信息&#xff0c;一部分是像素. 而TAG信息&#xff0c;一般又会分为两部分&#xff0c;如下图所示, 是…

C++:STL-stack,queue,deque

栈和队列 1.栈和队列文档理解2.为什么没有迭代器3.Container到底是什么4.模拟实现源码--使用适配器模式5.deque5.1定义5.2底层结构头插头删随机访问扩容 5.3缺陷5.4为什么选择deque作为stack和queue的底层容器 1.栈和队列文档理解 我们通过其上的介绍发现了几个点&#xff1a; …

Python分析之3 种空间插值方法

插值是一个非常常见的数学概念,不仅数据科学家使用它,而且各个领域的人们也使用它。然而,在处理地理空间数据时,插值变得更加复杂,因为您需要基于几个通常稀疏的观测值创建代表性网格。 在深入研究地理空间部分之前,让我们简要回顾一下线性插值。 为了演示的目的,我将使…

单链表的基本操作实现:初始化、尾插法、头插法、输出单链表、求表长、按序号查找、按值查找、插入结点、删除结点。

1.参考学习博文&#xff08;写的相当好的文章&#xff09;&#xff1a; http://t.csdnimg.cn/AipNl 2.关于我的总结&#xff1a; 定义单链表&#xff1a; typedef struct LNode {Elemtype data;struct LNode* next; }LNode; data用来存放元素值&#xff0c;next用来指向后…

Vue 项目build打包发布到giteepages ,首页正常显示,其他路由页面报错404的解决方法

直接上解决方法&#xff1a; 打包之后dist上传之后&#xff0c;还要新创一个.spa文件&#xff0c;注意&#xff01;是 .spa 有个. 点&#xff0c;如下图 一般这样就可以开始部署了&#xff0c;然后开启giteepages服务。如果出现了首页正常显示&#xff0c;其他页面显示…

CTFHUB-技能树-Web前置技能-文件上传(前端验证—文件头检查)

CTFHUB-技能树-Web前置技能-文件上传&#xff08;前端验证—文件头检查&#xff09; 文章目录 CTFHUB-技能树-Web前置技能-文件上传&#xff08;前端验证—文件头检查&#xff09;前端验证—文件头检查题目解析 各种文件头标志 前端验证—文件头检查 题目考的是&#xff1a;pn…

第二部分 Python提高—GUI图形用户界面编程(三)

简单组件学习 Radiobutton 单选按钮、Checkbutton 复选按钮和canvas 画布 文章目录 Radiobutton 单选按钮Checkbutton 复选按钮canvas 画布 Radiobutton 单选按钮 Radiobutton 控件用于选择同一组单选按钮中的一个。Radiobutton 可以显示文本&#xff0c;也可以显示图像。 f…

基于XML配置bean(二)

文章目录 1.工厂中获取bean1.静态工厂1.MyStaticFactory.java2.beans.xml3.测试 2.实例工厂1.MyInstanceFactory.java2.beans.xml3.测试 3.FactoryBean&#xff08;重点&#xff09;1.MyFactoryBean.java2.beans.xml3.测试 2.bean配置信息重用继承抽象bean1.beans.xml2.测试 3.…

《系统分析与设计》实验-----在线书店系统 需求规格说明书 哈尔滨理工大学PLUS完善版

文章目录 需求规格说明书1&#xff0e;引言1.1编写目的1.2项目背景1.3定义1.4参考资料 2&#xff0e;任务概述2.1目标2.2运行环境2.3条件与限制 3&#xff0e;数据描述3.1静态数据3.2动态数据3.3数据库介绍3.4数据词典3.5数据采集 4&#xff0e;功能需求4.1功能划分4.2功能描述…

transformer架构详细详解

一、transformer的贡献 transformer架构的贡献&#xff1a;该架构只使用自注意力机制&#xff0c;没有使用RNN或卷积网络。且可以实现并行计算&#xff0c;加快模型训练速度。 &#xff08;将所有的循环层全部换成&#xff1a;multi-headed self-attention&#xff09; 二、t…