ACE Reactor for Windows模型源码研究

最近研究了下ACE的Reactor模型的源码。相比之前自己写的ACE Select模型,复杂了不少。ACE的Reactor框架,用户通过继承ACE_Event_Handler事件处理类。关联ACE_Reactor反应器,将无阻塞的IO隐蔽在ACE_Reactor对象的底层实现,这样减少了开发的事件和风险,提高了效率。

照例,首先叙述顶层的例子。这里,我首先定义一个ACE_Event_Handler的派生类HandleAccept(故名思议也知道是干什么的),负责输入描述符的处理:

class HandleAccept : public ACE_Event_Handler
{
	private:
		ACE_SOCK_Acceptor acceptor_;
		ACE_INET_Addr inet_address_;
		ACE_Reactor_Mask mask_;

		u_short mPort;
		
		HandleData *handle_data_;
		
	public:
		HandleAccept( ACE_Reactor *reactor ) : ACE_Event_Handler(reactor) {}
		
		int open(u_short nPort);
		virtual int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE);
		virtual int handle_output(ACE_HANDLE handle = ACE_INVALID_HANDLE){}
		virtual int handle_close( ACE_HANDLE handle = ACE_INVALID_HANDLE,ACE_Reactor_Mask mask_ = 0);
		virtual ACE_HANDLE get_handle (void) const;
};

再定义一个ACE_Event_Handler的派生类HandleData,负责数据的处理:

class HandleData : public ACE_Event_Handler
{
	private:

		ACE_SOCK_Stream peer_;
		ACE_Message_Block *head_;
		ACE_Message_Block *data_;

	public:
		HandleData(ACE_Reactor *reactor) : ACE_Event_Handler(reactor) {}
		int open();
		virtual int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE);
		virtual int handle_output(ACE_HANDLE handle = ACE_INVALID_HANDLE){}
		virtual int handle_close(ACE_HANDLE handle = ACE_INVALID_HANDLE,ACE_Reactor_Mask mask_ = 0);
		virtual ACE_HANDLE get_handle(void) const;
		
		ACE_SOCK_Stream &peer() {return peer_;}
		int recv_data(ACE_SOCK_Stream strem);
};

这里着重介绍几个重要的接口,首先是open,在open中需要先初始化一个acceptor的socket,并且注册相关事件的mask到ACE_Reactor的反应器中(其实这里主要是ACCEPT_MASK)。
int HandleAccept::open(u_short nPort)
{
	inet_address_.set(nPort);
	if ( acceptor_.open(inet_address_,1) < 0 ) return -1;

	ACE_SET_BITS(mask_,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::WRITE_MASK | ACE_Event_Handler::ACCEPT_MASK);
	
	std::cout<<"HandleAccept::open()"<<std::endl;
	return reactor()->register_handler(this,mask_);
}
当有输入事件的时候,handle_input回调将会被调用,在这里我们先new一个HandleData,这也是一个ACE_Event_Handler的派生类,负责数据的处理。然后调用ACE_SOCK_Acceptor的accept方法,等待输入的描述符。
int HandleAccept::handle_input(ACE_HANDLE handle)
{
	handle_data_ = new HandleData(reactor()); 
	ACE_INET_Addr remote_addr_;
	if ( acceptor_.accept(handle_data_->peer(),&remote_addr_) < 0 ) 
	{
		std::cout<<"*ERROR* Fail to Accept connection"<<std::endl;
		return -1;
	}
	std::cout<<"connect from "<<remote_addr_.get_host_addr()<<std::endl;
	
	handle_data_->open();
	
	return 0;
}

随后,在main的主程序中:

int main(int argc,char *argv[])
{	
	HandleAccept yankee(ACE_Reactor::instance());
	
	yankee.open(1234);
	
	ACE_Reactor::instance()->run_event_loop();
	
	return 0;
}

如果使用传统的网络模型来实现,开发者将不得不面对以下问题:

  1. 设置和清楚fd_sets
  2. 检测事件,并对信号中断做出响应
  3. 管理内部锁
  4. 将事件多路分离给相关联的事件处理器
  5. 分派对I/O,信号和定时器事件的处理函数

ACE_Reactor框架解决了这一切,开发者只需关心上层的内容即可。首先从HandleAccep::open进入,层层转进到ACE_WFMO_Reactor::register_handler_i(至于为什么走到ACE_WFMO_Reactor,稍后就会提到):

int
ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle,ACE_HANDLE io_handle,ACE_Event_Handler *event_handler,ACE_Reactor_Mask new_masks=READ_MASK)
{
    // .......................
    int found = this->handler_rep_.modify_network_events_i (io_handle,new_masks,old_masks,new_network_events,event_handle,delete_event,ACE_Reactor::ADD_MASK);
    // .......................
    int result = ::WSAEventSelect ((SOCKET) io_handle,new_network_events);
    if (found)
        return result;
    else if (result != SOCKET_ERROR &&
        this->handler_rep_.bind_i (1,event_handler,io_handle,delete_event) != -1)
    // .......................
}

首先调用 this->handler_rep_.modify_network_events_i(),在这里old_masks将获取event_handler感兴趣的事件集所对应的MASK,并增加new_masks对应的事件集FD,modify_network_events_i返回是否在active handles,suspended handles或者records to be added中找到io_handle,如果找到event_handle将指向这个io_handle。WSAEventSelect 将io_handle和event_handle绑定在一起。如果modify_network_events_i没有找到io_handle在记录中,则随后调用this->handler_rep_.bind_i负责插入一个新的Event_Handler入口到to_be_added_info_[]中。随后,to_be_added_info_[]的数据会被加入到current_handles_[]中,这一步将在ACE_WFMO_Reactor_Handler_Repository::make_changes_in_to_be_added_infos()中完成。


从ACE_Reactor::run_event_loop()进入,逐步分析。run_event_loop()走到ACE_Reactor::run_reactor_event_loop。程序进入一个while循环:

int
ACE_Reactor::run_reactor_event_loop (REACTOR_EVENT_HOOK eh)//typedef int (*REACTOR_EVENT_HOOK)(ACE_Reactor *);
{
  // .............................
  while (1)
    {
      int const result = this->implementation_->handle_events ();

      // .............................
}

this->implementation_是在ACE_Reactor::ACE_Reactor()中分配的,这是一种代理模式的用法,为的是确保windows和linux之间的移植性,具体的实现根据不同的平台而异,具体底层的各种操作,都是由这个this->implementation_做的。在windows下,this->implementation_由ACE_WFMO_Reactor在ACE_Reactor的构造中实现:

ACE_Reactor::ACE_Reactor (ACE_Reactor_Impl *impl,bool delete_implementation)
{
      // .....................
      ACE_NEW (impl,ACE_WFMO_Reactor);
      //.........................

      this->implementation (impl);
      // .........................
    }
}
接着前面的this->implementation_->handle_events ()调用,由这里进入,调用到ACE_WFMO_Reactor::event_handling,这里会调用wait_for_multiple_events和safe_dispatch,前者负责监听输入,后者负责分发事件。首先分析wait_for_multiple_events:
DWORD
ACE_WFMO_Reactor::wait_for_multiple_events (int timeout,int alertable)
{
  // ................................
#else
  return ::WaitForMultipleObjectsEx (this->handler_rep_.max_handlep1 (),this->handler_rep_.handles (),FALSE,timeout,alertable);
#endif /* ACE_HAS_PHARLAP */  //noblock
}
这里,调用WaitForMultipleObjectsEx非阻塞地去等待相关事件,第一个参数是等待对象的数组的size(对象句柄的最大数量是64。此参数不能是零),第二个参数制定一个等待处理的对象数组。this->handler_rep_是ACE_WFMO_Reactor_Handler_Repository对象,ACE_Reactor将所有要管理的handle都存储在这个对象中this->handler_rep_.handles()将返回ACE_WFMO_Reactor_Handler_Repository::current_handles_,这保存了当前记录的相关句柄。这里将会向上层返回一个有输入的对象数组下标。

再来分析数据分发的safe_dispatch,这个方法经过层层调用,执行到ACE_WFMO_Reactor::complex_dispatch_handler,这里通过之前获取的数组下标,获取相关handle:

int
ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,ACE_HANDLE event_handle)
{
  // This dispatch is used for I/O entires.

  ACE_WFMO_Reactor_Handler_Repository::Current_Info ¤t_info =
    this->handler_rep_.current_info ()[slot];
 // .......................................
然后调用upcall:
int
ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,ACE_HANDLE event_handle)
{
	// ..............
    problems |= this->upcall (current_info.event_handler_,current_info.io_handle_,events);
    // ..............
}
在upcall中,通过匹配事件描述符,决定执行的顶层HandleAccept::handle_input()方法,假设此时有个外部链接connect进来,则会造成一个FD_ACCEPT事件:
ACE_Reactor_Mask
ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler,WSANETWORKEVENTS &events)
{
       // .................
      if (ACE_BIT_ENABLED (actual_events,FD_ACCEPT))
      {
            action = event_handler->handle_input (io_handle);    //这里执行最顶层HandleAccept::handle_input方法
            if (action <= 0)
            {
                    ACE_CLR_BITS (actual_events,FD_ACCEPT);
                    if (action == -1)
                            ACE_SET_BITS (problems,ACE_Event_Handler::ACCEPT_MASK);
            }
      }
      // ..................
}

如果HandleAccept::handle_input()方法返回错误值-1,那么程序会将ACCEPT_MASK加入problems,这个值将导致后面执行handle_close()

int
ACE_WFMO_Reactor_Handler_Repository::remove_handler_i (size_t slot,ACE_Reactor_Mask to_be_removed_masks)
{
         // ......................
        else if (ACE_BIT_ENABLED (to_be_removed_masks,ACE_Event_Handler::DONT_CALL) == 0)
     {
          ACE_HANDLE handle = this->current_info_[slot].io_handle_;
          this->current_info_[slot].event_handler_->handle_close (handle,to_be_removed_masks);
     }

 return 0;
}
总之,ACE的Reactor框架充分利用了C++的封装性以及多态性,并通过代理,实现了跨平台底层的透明,对快速开发应用有重要的意义。

相关文章

react 中的高阶组件主要是对于 hooks 之前的类组件来说的,如...
我们上一节了解了组件的更新机制,但是只是停留在表层上,例...
我们上一节了解了 react 的虚拟 dom 的格式,如何把虚拟 dom...
react 本身提供了克隆组件的方法,但是平时开发中可能很少使...
mobx 是一个简单可扩展的状态管理库,中文官网链接。小编在接...
我们在平常的开发中不可避免的会有很多列表渲染逻辑,在 pc ...