epoll实现Reactor模式

转自:http://blog.csdn.net/analogous_love/article/details/53319815

最近一直在看游双的《高性能Linux服务器编程》一书,下载链接http://download.csdn.net/detail/analogous_love/9673008

书上是这么介绍Reactor模式的:




按照这个思路,我写个简单的练习:

  1. /**
  2. *@desc:用reactor模式练习服务器程序,main.cpp
  3. *@author:zhangyl
  4. *@date:2016.11.23
  5. */
  6. #include<iostream>
  7. #include<string.h>
  8. #include<sys/types.h>
  9. #include<sys/socket.h>
  10. #include<netinet/in.h>
  11. #include<arpa/inet.h>//forhtonl()andhtons()
  12. #include<unistd.h>
  13. #include<fcntl.h>
  14. #include<sys/epoll.h>
  15. #include<signal.h>//forsignal()
  16. #include<pthread.h>
  17. #include<semaphore.h>
  18. #include<list>
  19. #include<errno.h>
  20. #include<time.h>
  21. #include<sstream>
  22. #include<iomanip>//forstd::setw()/setfill()
  23. #include<stdlib.h>
  24. #defineWORKER_THREAD_NUM5
  25. #definemin(a,b)((a<=b)?(a):(b))
  26. intg_epollfd=0;
  27. boolg_bStop=false;
  28. intg_listenfd=0;
  29. pthread_tg_acceptthreadid=0;
  30. pthread_tg_threadid[WORKER_THREAD_NUM]={0};
  31. pthread_cond_tg_acceptcond;
  32. pthread_mutex_tg_acceptmutex;
  33. pthread_cond_tg_cond/*=PTHREAD_COND_INITIALIZER*/;
  34. pthread_mutex_tg_mutex/*=PTHREAD_MUTEX_INITIALIZER*/;
  35. pthread_mutex_tg_clientmutex;
  36. std::list<int>g_listClients;
  37. voidprog_exit(intsigno)
  38. {
  39. ::signal(SIGINT,SIG_IGN);
  40. ::signal(SIGKILL,SIG_IGN);
  41. ::signal(SIGTERM,248); line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> std::cout<<"programrecvsignal"<<signo<<"toexit."<<std::endl;
  42. g_bStop=true;
  43. ::epoll_ctl(g_epollfd,EPOLL_CTL_DEL,g_listenfd,NULL);
  44. //Todo:是否需要先调用shutdown()一下?
  45. ::shutdown(g_listenfd,SHUT_RDWR);
  46. ::close(g_listenfd);
  47. ::close(g_epollfd);
  48. ::pthread_cond_destroy(&g_acceptcond);
  49. ::pthread_mutex_destroy(&g_acceptmutex);
  50. ::pthread_cond_destroy(&g_cond);
  51. ::pthread_mutex_destroy(&g_mutex);
  52. ::pthread_mutex_destroy(&g_clientmutex);
  53. }
  54. boolcreate_server_listener(constchar*ip,shortport)
  55. g_listenfd=::socket(AF_INET,SOCK_STREAM|SOCK_NONBLOCK,0);
  56. if(g_listenfd==-1)
  57. returninton=1;
  58. ::setsockopt(g_listenfd,SOL_SOCKET,SO_REUSEADDR,(char*)&on,sizeof(on));
  59. ::setsockopt(g_listenfd,SO_REUSEPORT,153); background-color:inherit; font-weight:bold">sizeof(on));
  60. structsockaddr_inservaddr;
  61. memset(&servaddr,sizeof(servaddr));
  62. servaddr.sin_family=AF_INET;
  63. servaddr.sin_addr.s_addr=inet_addr(ip);
  64. servaddr.sin_port=htons(port);
  65. if(::bind(g_listenfd,(sockaddr*)&servaddr,153); background-color:inherit; font-weight:bold">sizeof(servaddr))==-1)
  66. if(::listen(g_listenfd,50)==-1)
  67. false;
  68. g_epollfd=::epoll_create(1);
  69. if(g_epollfd==-1)
  70. structepoll_evente;
  71. memset(&e,153); background-color:inherit; font-weight:bold">sizeof(e));
  72. e.events=EPOLLIN|EPOLLRDHUP;
  73. e.data.fd=g_listenfd;
  74. if(::epoll_ctl(g_epollfd,EPOLL_CTL_ADD,&e)==-1)
  75. }
  76. voidrelease_client(intclientfd)
  77. {
  78. ottom:none; border-left:3px solid rgb(108,clientfd,NULL)==-1)
  79. std::cout<<"releaseclientsocketFailedascallepoll_ctlFailed"<<std::endl;
  80. ::close(clientfd);
  81. void*accept_thread_func(void*arg)
  82. while(!g_bStop)
  83. ::pthread_mutex_lock(&g_acceptmutex);
  84. ::pthread_cond_wait(&g_acceptcond,&g_acceptmutex);
  85. //::pthread_mutex_lock(&g_acceptmutex);
  86. //std::cout<<"runloopinaccept_thread_func"<<std::endl;
  87. structsockaddr_inclientaddr;
  88. socklen_taddrlen;
  89. intnewfd=::accept(g_listenfd,(structsockaddr*)&clientaddr,&addrlen);
  90. ::pthread_mutex_unlock(&g_acceptmutex);
  91. if(newfd==-1)
  92. continue;
  93. std::cout<<"newclientconnected:"<<::inet_ntoa(clientaddr.sin_addr)<<":"<<::ntohs(clientaddr.sin_port)<<std::endl;
  94. //将新socket设置为non-blocking
  95. intoldflag=::fcntl(newfd,F_GETFL,0);
  96. intnewflag=oldflag|O_NONBLOCK;
  97. if(::fcntl(newfd,F_SETFL,newflag)==-1)
  98. std::cout<<"fcntlerror,oldflag="<<oldflag<<",newflag="<<newflag<<std::endl;
  99. e.events=EPOLLIN|EPOLLRDHUP|EPOLLET;
  100. e.data.fd=newfd;
  101. std::cout<<"epoll_ctlerror,fd="<<newfd<<std::endl;
  102. returnNULL;
  103. void*worker_thread_func(intclientfd;
  104. ::pthread_mutex_lock(&g_clientmutex);
  105. while(g_listClients.empty())
  106. ::pthread_cond_wait(&g_cond,&g_clientmutex);
  107. clientfd=g_listClients.front();
  108. g_listClients.pop_front();
  109. pthread_mutex_unlock(&g_clientmutex);
  110. //gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来
  111. std::cout<<std::endl;
  112. std::stringstrclientmsg;
  113. charbuff[256];
  114. boolbError=while(true)
  115. memset(buff,153); background-color:inherit; font-weight:bold">sizeof(buff));
  116. intnRecv=::recv(clientfd,buff,256,0);
  117. if(nRecv==-1)
  118. if(errno==EWOULDBLOCK)
  119. break;
  120. else
  121. std::cout<<"recverror,clientdisconnected,fd="<<clientfd<<std::endl;
  122. release_client(clientfd);
  123. bError=true;
  124. //对端关闭了socket,这端也关闭
  125. elseif(nRecv==0)
  126. std::cout<<"peerclosed,108); list-style-type:decimal-leading-zero; color:inherit; line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> strclientmsg+=buff;
  127. //出错了,就不要再继续往下执行了
  128. if(bError)
  129. std::cout<<"clientmsg:"<<strclientmsg;
  130. //将消息加上时间标签后发回
  131. time_tNow=time(NULL);
  132. structtm*Nowstr=localtime(&Now);
  133. std::ostringstreamostimestr;
  134. ostimestr<<"["<<Nowstr->tm_year+1900<<"-"
  135. <<std::setw(2)<<std::setfill('0')<<Nowstr->tm_mon+1<<"-"
  136. <<std::setw(2)<<std::setfill('0')<<Nowstr->tm_mday<<""
  137. <<std::setw(2)<<std::setfill('0')<<Nowstr->tm_hour<<":"
  138. <<std::setw(2)<<std::setfill('0')<<Nowstr->tm_min<<":"
  139. <<std::setw(2)<<std::setfill('0')<<Nowstr->tm_sec<<"]serverreply:";
  140. strclientmsg.insert(0,ostimestr.str());
  141. intnSent=::send(clientfd,strclientmsg.c_str(),strclientmsg.length(),153); background-color:inherit; font-weight:bold">if(nSent==-1)
  142. if(errno==EWOULDBLOCK)
  143. ::sleep(10);
  144. continue;
  145. std::cout<<"senderror,153); background-color:inherit; font-weight:bold">break;
  146. std::cout<<"send:"<<strclientmsg;
  147. strclientmsg.erase(0,nSent);
  148. if(strclientmsg.empty())
  149. returnNULL;
  150. voiddaemon_run()
  151. intpid;
  152. signal(SIGCHLD,0); background-color:inherit">//1)在父进程中,fork返回新创建子进程的进程ID;
  153. //2)在子进程中,fork返回0;
  154. //3)如果出现错误,fork返回一个负值;
  155. pid=fork();
  156. if(pid<0)
  157. std::cout<<"forkerror"<<std::endl;
  158. exit(-1);
  159. //父进程退出,子进程独立运行
  160. if(pid>0){
  161. exit(0);
  162. //之前parent和child运行在同一个session里,parent是会话(session)的领头进程,
  163. //parent进程作为会话的领头进程,如果exit结束执行的话,那么子进程会成为孤儿进程,并被init收养。
  164. //执行setsid()之后,child将重新获得一个新的会话(session)id。
  165. //这时parent退出之后,将不会影响到child了。
  166. setsid();
  167. intfd;
  168. fd=open("/dev/null",O_RDWR,153); background-color:inherit; font-weight:bold">if(fd!=-1)
  169. dup2(fd,STDIN_FILENO);
  170. dup2(fd,STDOUT_FILENO);
  171. ottom:none; border-left:3px solid rgb(108,STDERR_FILENO);
  172. if(fd>2)
  173. close(fd);
  174. intmain(intargc,87); background-color:inherit; font-weight:bold">char*argv[])
  175. shortport=0;
  176. intch;
  177. boolbdaemon=while((ch=getopt(argc,argv,"p:d"))!=-1)
  178. switch(ch)
  179. case'd':
  180. bdaemon=case'p':
  181. port=atol(optarg);
  182. if(bdaemon)
  183. daemon_run();
  184. if(port==0)
  185. port=12345;
  186. if(!create_server_listener("0.0.0.0",port))
  187. std::cout<<"Unabletocreatelistenserver:ip=0.0.0.0,port="<<port<<"."<<std::endl;
  188. return-1;
  189. //设置信号处理
  190. ottom:none; border-left:3px solid rgb(108,SIG_DFL);
  191. signal(SIGPIPE,248); line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> signal(SIGINT,prog_exit);
  192. signal(SIGKILL,prog_exit);
  193. signal(SIGTERM,248); line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> ::pthread_cond_init(&g_acceptcond,108); list-style-type:decimal-leading-zero; color:inherit; line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> ::pthread_mutex_init(&g_acceptmutex,NULL);
  194. ::pthread_cond_init(&g_cond,248); line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> ::pthread_mutex_init(&g_mutex,248); line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> ::pthread_mutex_init(&g_clientmutex,248); line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> ::pthread_create(&g_acceptthreadid,NULL,accept_thread_func,0); background-color:inherit">//启动工作线程
  195. for(inti=0;i<WORKER_THREAD_NUM;++i)
  196. ::pthread_create(&g_threadid[i],worker_thread_func,153); background-color:inherit; font-weight:bold">structepoll_eventev[1024];
  197. intn=::epoll_wait(g_epollfd,ev,1024,10);
  198. if(n==0)
  199. if(n<0)
  200. std::cout<<"epoll_waiterror"<<std::endl;
  201. intm=min(n,1024);
  202. inti=0;i<m;++i)
  203. //通知接收连接线程接收新连接
  204. if(ev[i].data.fd==g_listenfd)
  205. pthread_cond_signal(&g_acceptcond);
  206. //通知普通工作线程接收数据
  207. else
  208. pthread_mutex_lock(&g_clientmutex);
  209. g_listClients.push_back(ev[i].data.fd);
  210. pthread_mutex_unlock(&g_clientmutex);
  211. pthread_cond_signal(&g_cond);
  212. //std::cout<<"signal"<<std::endl;
  213. return0;
  214. }


程序的功能一个简单的echo服务:客户端连接上服务器之后,给服务器发送信息,服务器加上时间戳等信息后返回给客户端。使用到的知识点有:

1. 条件变量

2.epoll的边缘触发模式


程序的大致框架是:

1. 主线程只负责监听侦听socket上是否有新连接,如果有新连接到来,交给一个叫accept的工作线程去接收新连接,并将新连接socket绑定到主线程使用epollfd上去。

2. 主线程如果侦听到客户端的socket上有可读事件,则通知另外五个工作线程去接收处理客户端发来的数据,并将数据加上时间戳后发回给客户端。

3. 可以通过传递-p port来设置程序的监听端口号;可以通过传递-d来使程序以daemon模式运行在后台。这也是标准linux daemon模式的书写方法


程序难点和需要注意的地方是:

1. 条件变量为了防止虚假唤醒,一定要在一个循环里面调用pthread_cond_wait()函数,我在worker_thread_func()中使用了:

copy

在accept_thread_func()函数里面我没有使用循环,这样会有问题吗?

2. 使用条件变量pthread_cond_wait()函数的时候一定要先获得与该条件变量相关的mutex,即像下面这样的结构:


因为pthread_cond_wait()如果阻塞的话,它解锁相关mutex和阻塞当前线程这两个动作加在一起是原子的。


3. 作为服务器端程序最好对侦听socket调用setsocketopt()设置SO_REUSEADDR和SO_REUSEPORT两个标志,因为服务程序有时候会需要重启(比如调试的时候就会不断重启),如果不设置这两个标志的话,绑定端口时就会调用失败。因为一个端口使用后,即使不再使用,因为四次挥手该端口处于TIME_WAIT状态,有大约2min的MSL(Maximum Segment Lifetime,最大存活期)。这2min内,该端口是不能被重复使用的。你的服务器程序上次使用了这个端口号,接着重启,因为这个缘故,你再次绑定这个端口就会失败(bind函数调用失败)。要不你就每次重启时需要等待2min后再试(这在频繁重启程序调试是难以接收的),或者设置这种SO_REUSEADDR和SO_REUSEPORT立即回收端口使用。

其实,SO_REUSEADDR在windows上和Unix平台上还有些细微的区别,我在libevent源码中看到这样的描述:

注意注释部分, 在Unix平台上设置这个选项意味着,任意进程可以复用该地址;而在windows,不要阻止其他进程复用该地址。也就是在在Unix平台上,如果不设置这个选项,任意进程在一定时间内,不能bind该地址;在windows平台上,在一定时间内,其他进程不能bind该地址,而本进程却可以再次bind该地址。


4. epoll_wait对新连接socket使用的是边缘触发模式EPOLLET(edge trigger),而不是默认的水平触发模式(level trigger)。因为如果采取水平触发模式的话,主线程检测到某个客户端socket数据可读时,通知工作线程去收取该socket上的数据,这个时候主线程继续循环,只要在工作线程没有将该socket上数据全部收完,或者在工作线程收取数据的过程中,客户端有新数据到来,主线程会继续发通知(通过pthread_cond_signal())函数,再次通知工作线程收取数据。这样会可能导致多个工作线程同时调用recv函数收取该客户端socket上的数据,这样产生的结果将会导致数据错乱。

相反,采取边缘触发模式,只有等某个工作线程将那个客户端socket上数据全部收取完毕,主线程的epoll_wait才可能会再次触发来通知工作线程继续收取那个客户端socket新来的数据。


5. 代码中有这样一行:


如果不加上这一行,正常运行服务器程序,程序中要打印到控制台的信息都会打印出来,但是如果用gdb调试状态下,程序的所有输出就不显示了。我不知道这是不是gdb的一个bug,所以这里加上std::endl来输出一个换行符并flush标准输出,让输出显示出来。(std::endl不仅是输出一个换行符而且是同时刷新输出,相当于fflush()函数)。


程序我部署起来了,你可以使用linux的nc命令或自己写程序连接服务器来查看程序效果,当然也可以使用telnet命令,方法:

linux:

nc120.55.94.78 12345

telnet120.55.94.78 12345

然后就可以给服务器自由发送数据了,服务器会给你发送的信息加上时间戳返回给你。效果如图:



另外我将这个代码改写了成纯C++11版本,使用CMake编译,为了支持编译必须加上这-std=c++11:

CMakeLists.txt代码如下:


myreactor.h文件内容:


myreactor.cpp文件内容:

copy
*@desc:myreactor实现文件,myreactor.cpp
  • #include"myreactor.h"
  • #include<fcntl.h>
  • #include<sys/epoll.h>
  • #include<unistd.h>
  • #definemin(a,b)((a<=b)?(a):(b))
  • CMyReactor::CMyReactor()
  • //m_listenfd=0;
  • //m_epollfd=0;
  • //m_bStop=false;
  • CMyReactor::~CMyReactor()
  • boolCMyReactor::init(shortnport)
  • if(!create_server_listener(ip,nport))
  • std::cout<<"Unabletobind:"<<ip<<":"<<nport<<"."<<std::endl;
  • std::cout<<"mainthreadid="<<std::this_thread::get_id()<<std::endl;
  • //启动接收新连接的线程
  • m_acceptthread.reset(newstd::thread(CMyReactor::accept_thread_proc,153); background-color:inherit; font-weight:bold">this));
  • //启动工作线程
  • for(auto&t:m_workerthreads)
  • t.reset(thread(CMyReactor::worker_thread_proc,153); background-color:inherit; font-weight:bold">this));
  • boolCMyReactor::uninit()
  • m_bStop= m_acceptcond.notify_one();
  • m_workercond.notify_all();
  • m_acceptthread->join();
  • t->join();
  • ::epoll_ctl(m_epollfd,m_listenfd,108); list-style-type:decimal-leading-zero; color:inherit; line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> ::shutdown(m_listenfd,248); line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> ::close(m_listenfd);
  • ::close(m_epollfd);
  • boolCMyReactor::close_client(intclientfd)
  • if(::epoll_ctl(m_epollfd,NULL)==-1)
  • std::cout<<"closeclientsocketfailedascallepoll_ctlfailed"<<std::endl;
  • //returnfalse;
  • ::close(clientfd);
  • void*CMyReactor::main_loop(void*p)
  • CMyReactor*pReatcor=static_cast<CMyReactor*>(p);
  • while(!pReatcor->m_bStop)
  • intn=::epoll_wait(pReatcor->m_epollfd,153); background-color:inherit; font-weight:bold">if(ev[i].data.fd==pReatcor->m_listenfd)
  • pReatcor->m_acceptcond.notify_one();
  • std::unique_lock<std::mutex>guard(pReatcor->m_workermutex);
  • pReatcor->m_listClients.push_back(ev[i].data.fd);
  • pReatcor->m_workercond.notify_one();
  • }//endif
  • //endfor-loop
  • }//endwhile
  • std::cout<<"mainloopexit..."<<std::endl;
  • voidCMyReactor::accept_thread_proc(CMyReactor*pReatcor)
  • std::cout<<"acceptthread,threadid="<<std::this_thread::get_id()<<std::endl;
  • intnewfd;
  • structsockaddr_inclientaddr;
  • socklen_taddrlen;
  • std::unique_lock<std::mutex>guard(pReatcor->m_acceptmutex);
  • pReatcor->m_acceptcond.wait(guard);
  • if(pReatcor->m_bStop)
  • //std::cout<<"runloopinaccept_thread_proc"<<std::endl;
  • newfd=::accept(pReatcor->m_listenfd,(if(newfd==-1)
  • std::cout<<"newclientconnected:"<<::inet_ntoa(clientaddr.sin_addr)<<":"<<::ntohs(clientaddr.sin_port)<<std::endl;
  • //将新socket设置为non-blocking
  • intnewflag=oldflag|O_NONBLOCK;
  • std::cout<<"fcntlerror,newflag="<<newflag<<std::endl;
  • structepoll_evente;
  • memset(&e,153); background-color:inherit; font-weight:bold">sizeof(e));
  • e.events=EPOLLIN|EPOLLRDHUP|EPOLLET;
  • e.data.fd=newfd;
  • if(::epoll_ctl(pReatcor->m_epollfd,&e)==-1)
  • std::cout<<"epoll_ctlerror,fd="<<newfd<<std::endl;
  • std::cout<<"acceptthreadexit..."<<std::endl;
  • voidCMyReactor::worker_thread_proc(CMyReactor*pReatcor)
  • std::cout<<"newworkerthread,153); background-color:inherit; font-weight:bold">while(pReatcor->m_listClients.empty())
  • if(pReatcor->m_bStop)
  • std::cout<<"workerthreadexit..."<<std::endl;
  • return;
  • pReatcor->m_workercond.wait(guard);
  • clientfd=pReatcor->m_listClients.front();
  • pReatcor->m_listClients.pop_front();
  • pReatcor->close_client(clientfd);
  • std::this_thread::sleep_for(std::chrono::milliseconds(10));
  • boolCMyReactor::create_server_listener(shortport)
  • m_listenfd=::socket(AF_INET,0);
  • if(m_listenfd==-1)
  • inton=1;
  • ::setsockopt(m_listenfd,108); list-style-type:decimal-leading-zero; color:inherit; line-height:18px; margin:0px!important; padding:0px 3px 0px 10px!important; list-style-position:outside!important"> ::setsockopt(m_listenfd,153); background-color:inherit; font-weight:bold">structsockaddr_inservaddr;
  • memset(&servaddr,153); background-color:inherit; font-weight:bold">sizeof(servaddr));
  • servaddr.sin_family=AF_INET;
  • servaddr.sin_addr.s_addr=inet_addr(ip);
  • servaddr.sin_port=htons(port);
  • if(::bind(m_listenfd,153); background-color:inherit; font-weight:bold">sizeof(servaddr))==-1)
  • if(::listen(m_listenfd,50)==-1)
  • m_epollfd=::epoll_create(1);
  • if(m_epollfd==-1)
  • e.events=EPOLLIN|EPOLLRDHUP;
  • e.data.fd=m_listenfd;
  • }

  • main.cpp文件内容:


    完整实例代码下载地址:

    普通版本:https://pan.baidu.com/s/1o82Mkno

    C++11版本:https://pan.baidu.com/s/1dEJdrih

    相关文章

    react 中的高阶组件主要是对于 hooks 之前的类组件来说的,如...
    我们上一节了解了组件的更新机制,但是只是停留在表层上,例...
    我们上一节了解了 react 的虚拟 dom 的格式,如何把虚拟 dom...
    react 本身提供了克隆组件的方法,但是平时开发中可能很少使...
    mobx 是一个简单可扩展的状态管理库,中文官网链接。小编在接...
    我们在平常的开发中不可避免的会有很多列表渲染逻辑,在 pc ...