问题描述
||
我的想法是创建X个线程,使用KeepRunning方法运行该线程,该方法具有无限循环调用_io_service.run(),并在async_accept处理程序中使用_io_service.poll()接收到新连接时将任务发送到_io_service。
我使用以下代码运行服务器:
oh::msg::OHServer s(\"0.0.0.0\",\"9999\",200);
ConsoleStopServer = boost::bind(&oh::msg::OHServer::Stop,&s);
SetConsoleCtrlHandler(bConsoleHandler,TRUE);
s.Run();
但是当我收到一个连接时,然后使用MsgWorker类中的阻止读/写操作在Post()方法中提供服务,则所有线程都被关闭。
我有下面的代码(这是来自http server3 asio示例和我的混合):
OHServer::OHServer(const std::string& sAddress,const std::string& sPort,std::size_t tps)
: _nThreadPoolSize(tps),_acceptor(_io_service),_sockClient(new boost::asio::ip::tcp::socket(_io_service))
{
// Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
boost::asio::ip::tcp::resolver resolver(_io_service);
boost::asio::ip::tcp::resolver::query query(sAddress,sPort);
boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
_acceptor.open(endpoint.protocol());
_acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
_acceptor.bind(endpoint);
_acceptor.listen();
_acceptor.async_accept(
*_sockClient,boost::bind(
&OHServer::AcceptConnection,this,boost::asio::placeholders::error
)
);
}
void OHServer::KeepRunning()
{
global_stream_lock.lock();
std::cout << \"[\" << boost::this_thread::get_id()
<< \"] Thread Start\" << std::endl;
global_stream_lock.unlock();
while( true )
{
try
{
boost::system::error_code ec;
_io_service.run( ec );
if( ec )
{
global_stream_lock.lock();
std::cout << \"[\" << boost::this_thread::get_id()
<< \"] Error: \" << ec << std::endl;
global_stream_lock.unlock();
}
break;
}
catch( std::exception & ex )
{
global_stream_lock.lock();
std::cout << \"[\" << boost::this_thread::get_id()
<< \"] Exception: \" << ex.what() << std::endl;
global_stream_lock.unlock();
}
}
global_stream_lock.lock();
std::cout << \"[\" << boost::this_thread::get_id()
<< \"] Thread Finish\" << std::endl;
global_stream_lock.unlock();
}
void OHServer::Run()
{
// Create a pool of threads to run all of the io_services.
for (std::size_t i = 0; i < _nThreadPoolSize; ++i)
{
boost::shared_ptr<boost::thread> thread(new boost::thread(
boost::bind(&OHServer::KeepRunning,this)));
threads.push_back(thread);
}
cout << \"Hit enter to close server\" << endl;
cin.get();
}
void OHServer::Stop()
{
boost::system::error_code ec;
_acceptor.close(ec);
_sockClient->shutdown( boost::asio::ip::tcp::socket::shutdown_both,ec );
_sockClient->close( ec );
_io_service.stop();
// Wait for all threads in the pool to exit.
for (std::size_t i = 0; i < threads.size(); ++i)
{
threads[i]->join();
cout << \"threads[ \"<< i << \"]->join();\" << endl;
}
}
void OHServer::Post()
{
std::cout << \"Accepted new connection.\" << std::endl;
CMsgWorker *msgWorker = new CMsgWorker(_sockClient);
msgWorker->Start();
delete msgWorker;
}
void OHServer::AcceptConnection(const boost::system::error_code& e)
{
if (!e)
{
_io_service.post(boost::bind(&OHServer::Post,this));
_acceptor.async_accept(
*_sockClient,boost::bind(
&OHServer::AcceptConnection,boost::asio::placeholders::error
)
);
}
}
我应该怎么做才能使线程仍在等待_io_service的工作?
谢谢你的帮助!
解决方法
一探究竟:
// Kick off 5 threads
for (size_t i = 0; i < 5; ++i) {
boost::thread* t = threads.create_thread(boost::bind(&boost::asio::io_service::run,&io));
std::cout << \"Creating thread \" << i << \" with id \" << t->get_id() << std::endl;
}
有关如何执行此操作的想法,请参见此处的timer.cc示例:https://github.com/sean-/Boost.Examples/tree/master/asio/timer
,最后,我得到了一些易于使用的服务器版本:
用法:
boost::shared_ptr<CTCPServer> _serverPtr;
void CMyServer::Start()
{
//First we must create a few threads
thread* t = 0;
for (int i = 0; i < COHConfig::_iThreads; ++i)
{
t =_threads.create_thread(bind(&io_service::run,&_io_service));
}
//Then we create a server object
_serverPtr.reset( new CTCPServer(&_io_service,PORT_NUMBER) );
//And finally run the server through io_service
_io_service.post(boost::bind(&CMyServer::RunServer,_serverPtr,&CMyServer::HandleMessage));
}
//This is the function which is called by io_service to start our server
void CMyServer::RunServer(CTCPServer* s,void (*HandleFunction)(shared_ptr<ip::tcp::socket>,deadline_timer*))
{
s->Run(HandleFunction);
}
//And this is our connection handler
void CMyServer::HandleMessage(shared_ptr< ip::tcp::socket > sockClient,deadline_timer* timer)
{
cout << \"Handling connection from: \" << sockClient->remote_endpoint().address().to_string() << \":\" << sockClient->remote_endpoint().port() << endl;
//This is some class which gets socket in its constructor and handles the connection
scoped_ptr<CMyWorker> myWorker( new CMyWorker(sockClient) );
msgWorker->Start();
}
//Thanks to this function we can stop our server
void CMyServer::Stop()
{
_serverPtr->Stop();
}
TCPServer.hpp文件:
#ifndef TCPSERVER_HPP
#define TCPSERVER_HPP
#if defined(_WIN32)
#define BOOST_THREAD_USE_LIB
#endif
#include <boost/asio.hpp>
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include <string>
#include <vector>
class CTCPServer: private boost::noncopyable
{
private:
bool bKeepRunning;
boost::asio::io_service* _io_service;
std::string _sPort;
boost::asio::ip::tcp::acceptor _acceptor;
boost::shared_ptr< boost::asio::ip::tcp::socket > _sockClient;
boost::asio::deadline_timer _timer;
bool _bIPv6;
std::string SessionID();
public:
CTCPServer(boost::asio::io_service* ios,const std::string& sPort,bool bIPv6=false):
_sPort(sPort),_acceptor(*ios),_timer(*ios),_bIPv6(bIPv6)
{
_io_service = ios;
bKeepRunning = false;
};
void Run(void (*HandleFunction)(boost::shared_ptr< boost::asio::ip::tcp::socket > sock,boost::asio::deadline_timer* timer));
void AsyncAccept(void (*HandleFunction)(boost::shared_ptr< boost::asio::ip::tcp::socket >,boost::asio::deadline_timer* ));
void AcceptHandler(const boost::system::error_code& e,void (*HandleFunction)(boost::shared_ptr< boost::asio::ip::tcp::socket >,boost::asio::deadline_timer* ));
void Stop();
void Stop(void (*StopFunction)());
};
#endif
TCPServer.cpp文件:
#include \"TCPServer.hpp\"
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread/mutex.hpp>
#include <iostream>
using namespace std;
string CTCPServer::SessionID()
{
ostringstream outs;
outs << \"[\" << boost::this_thread::get_id() << \"] \";
return outs.str();
}
void CTCPServer::Run(void (*HandleFunction)(boost::shared_ptr< boost::asio::ip::tcp::socket >,boost::asio::deadline_timer* ))
{
try
{
boost::asio::ip::tcp::resolver resolver(*_io_service);
boost::asio::ip::tcp::endpoint endpoint;
if(_bIPv6)
{
boost::asio::ip::tcp::resolver::query queryv6(boost::asio::ip::tcp::v6(),_sPort);
endpoint = *resolver.resolve(queryv6);
}
else
{
boost::asio::ip::tcp::resolver::query queryv4(boost::asio::ip::tcp::v4(),_sPort);
endpoint = *resolver.resolve(queryv4);
}
_acceptor.open(endpoint.protocol());
_acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
_acceptor.set_option(boost::asio::socket_base::enable_connection_aborted(true));
_acceptor.bind(endpoint);
_acceptor.listen();
boost::system::error_code ec;
bKeepRunning = true;
AsyncAccept(HandleFunction);
}
catch(std::exception& e)
{
if(!_bIPv6)
std::cerr << \"Exception wile creating IPv4 TCP socket on port \"<< _sPort<< \": \" << e.what() << std::endl;
else
std::cerr << \"Exception wile creating IPv6 TCP socket on port \"<< _sPort<< \": \" << e.what() << std::endl;
}
}
void CTCPServer::AsyncAccept(void (*HandleFunction)(boost::shared_ptr< boost::asio::ip::tcp::socket >,boost::asio::deadline_timer* ))
{
if(bKeepRunning)
{
try
{
_sockClient.reset(new boost::asio::ip::tcp::socket(*_io_service));
cout << SessionID() << \"Waiting for connection on port: \" << _sPort << endl;
_acceptor.async_accept(*_sockClient,boost::bind(&CTCPServer::AcceptHandler,this,boost::asio::placeholders::error,HandleFunction));
}
catch(exception& e)
{
string sWhat = e.what();
cout << SessionID() << \"Error while accepting connection: \" << e.what() << endl;
}
}
}
void CTCPServer::AcceptHandler(const boost::system::error_code& e,boost::asio::deadline_timer* ))
{
if(!e)
{
try
{
(*_io_service).post(boost::bind(HandleFunction,_sockClient,&_timer));
AsyncAccept(HandleFunction);
}
catch(exception& e)
{
cout << SessionID() << \"Exception: \" << e.what() << endl;
}
}
}
void CTCPServer::Stop()
{
cout << SessionID() << \"STOP port \" << _sPort << endl;
if(!bKeepRunning)
return;
bKeepRunning = false;
try
{
_sockClient->close();
}
catch(exception& e)
{
cout << SessionID() << \"Exception: \" << e.what() << endl;
}
try
{
_acceptor.cancel();
}
catch(exception& e)
{
cout << SessionID() << \"Exception: \" << e.what() << endl;
}
try
{
_acceptor.close();
}
catch(exception& e)
{
cout << SessionID() << \"Exception: \" << e.what() << endl;
}
}
void CTCPServer::Stop(void (*StopFunction)())
{
Stop();
StopFunction();
}
修改使其与IPv6兼容也非常容易。
它已经过测试并且运行良好。只需复制并使用!