问题描述
大家。我正在尝试编写一些代码来通过参与者模式实现并发系统。演员的界面是这样的:
template <class SourceMessageType,class MessageType>
class actor{
public:
using value_type = MessageType;
void process_message(SourceMessageType&& message);
template <class EmitFunction>
void on_message(EmitFunction emit);
private:
std::function<void(MessageType&&)> m_emit;
};
在我的程序中,有两个actor,一个是Service Actor,另一个是Sink Actor。他们的定义如下(我将在这个问题的底部放一个最小的工作示例):
class service {
public:
using value_type = std::string;
explicit service(boost::asio::io_service& service,unsigned short port = 42042);
service(const service&) = delete;
service(service&& other) = default;
template <class EmitFunction>
void on_message(EmitFunction emit) {
m_emit = emit;
do_accept();
}
private:
void do_accept();
tcp::acceptor m_acceptor;
tcp::socket m_socket;
std::function<void(std::string&&)> m_emit;
};
template <class Sender,class Function,class MessageType = typename Sender::value_type>
class sink_impl {
public:
using value_type = MessageType;
sink_impl(Sender&& sender,Function function)
: m_sender(std::move(sender)),m_function(function) {
m_sender.on_message([this](MessageType&& message) {
process_message(std::move(message));
});
}
void process_message(MessageType&& message) const {
std::invoke(m_function,std::move(message));
}
private:
Sender m_sender;
Function m_function;
};
如您所见,Service Actor 的唯一职责是向其他 Actor 发送消息,但它从不接收来自他们的任何消息。 Sink Actor 恰恰相反。为了测试我的演员,我在下面写了主要功能并使用 telnet 来模拟客户端。
template <class Function>
struct sink_helper {
Function function;
};
template <class Function>
auto sink(Function&& function) {
return detail::sink_helper<Function>{std::forward<Function>(function)};
}
template <class Sender,class Function>
auto operator|(Sender&& sender,detail::sink_helper<Function> sink) {
return detail::sink_impl<Sender,Function>(std::forward<Sender>(sender),sink.function);
}
int main(){
boost::asio::io_service event_loop;
auto sink_to_cerr =
sink([](const auto& message) { std::cerr << message << '\n'; });
// Starting the Boost.ASIO service
auto pipeline = service(event_loop) | sink_to_cerr;
cerr << "Service is running...\n";
event_loop.run();
}
到目前为止一切顺利,我的程序可以接收我通过 telnet 发送给它的任何消息。而且你知道,异步操作总是很难自动测试,所以我想在我的程序中添加一个 mock_service 类来测试。我的 mock_service
如下:
class message_source{
public:
message_source(std::initializer_list<std::string>&& inits) : m_source{inits}{}
message_source(const std::vector<std::string>& other) : m_source(other){}
message_source(const message_source&) = delete;
message_source(message_source&& other) : m_source(std::move(other.m_source)){}
template <class EmitFunction>
void on_message(EmitFunction emit) {
m_emit = emit;
}
private:
std::function<void(std::string&&)> m_emit;
std::vector<std::string> m_source;
};
// omit some irrelevant codes
int main(){
boost::asio::io_service event_loop;
auto sink_to_cerr =
sink([](const auto& message) { std::cerr << message << '\n'; });
// Starting the Boost.ASIO service
auto pipeline = mock_service(event_loop) | sink_to_cerr;
cerr << "Service is running...\n";
event_loop.run();
}
g++ mwe.cc -std=c++17 -lpthread
mwe.cc: In instantiation of ‘auto operator|(Sender&&,sink_helper<Function>) [with Sender = mock_service&; Function = main(int,const char**)::<lambda(const auto:1&)>]’:
mwe.cc:134:30: required from here
mwe.cc:123:12: error: ‘mock_service&’ is not a class,struct,or union type
123 | return sink_impl<Sender,| ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
124 | sink.function);
| ~~~~~~~~~~~~~~
mwe.cc: In function ‘int main(int,const char**)’:
mwe.cc:134:10: error: ‘void pipeline’ has incomplete type
134 | auto pipeline = source | sink_to_cerr;
这个错误信息似乎没有给我提供任何有用的信息,因为 mock_service 正是类类型。所以我的问题是为什么会发生这个错误以及如何编译?
为了您的方便,这里是一个最小的工作示例。
#include <boost/asio.hpp>
#include <functional>
#include <iostream>
using boost::asio::ip::tcp;
template <typename EmitFunction>
class session : public std::enable_shared_from_this<session<EmitFunction>> {
public:
session(tcp::socket&& socket,EmitFunction emit)
: m_socket(std::move(socket)),m_emit(emit) {}
void start() { do_read(); }
private:
using shared_session = std::enable_shared_from_this<session<EmitFunction>>;
void do_read() {
auto self = shared_session::shared_from_this();
boost::asio::async_read_until(
m_socket,m_data,'\n',[this,self](const boost::system::error_code& error,std::size_t size) {
if (!error) {
std::istream is(&m_data);
std::string line;
std::getline(is,line);
m_emit(std::move(line));
do_read();
}
});
}
tcp::socket m_socket;
boost::asio::streambuf m_data;
EmitFunction m_emit;
};
template <class Socket,class EmitFunction>
auto make_shared_session(Socket&& socket,EmitFunction&& emit) {
return std::make_shared<session<EmitFunction>>(
std::forward<Socket>(socket),std::forward<EmitFunction>(emit));
}
class service {
public:
using value_type = std::string;
explicit service(boost::asio::io_service& service,unsigned short port = 42042)
:m_acceptor(service,tcp::endpoint(tcp::v4(),port)),m_socket(service) {}
service(const service&) = delete;
service(service&& other) = default;
template <class EmitFunction>
void on_message(EmitFunction emit) {
m_emit = emit;
do_accept();
}
private:
void do_accept(){
m_acceptor.async_accept(
m_socket,[this](const boost::system::error_code& error) {
if (!error) {
make_shared_session(std::move(m_socket),m_emit)->start();
} else {
std::cerr << error.message() << '\n';
}
do_accept();
});
}
tcp::acceptor m_acceptor;
tcp::socket m_socket;
std::function<void(std::string&&)> m_emit;
friend std::ostream& operator<<(std::ostream& out,const service& service) {
return out << "service object";
}
};
class mock_service{
public:
mock_service(std::initializer_list<std::string>&& inits) : m_source{inits}{}
mock_service(const std::vector<std::string>& other) : m_source(other){}
mock_service(const mock_service& other) : m_source(other.m_source){}
mock_service(mock_service&& other) : m_source(std::move(other.m_source)){}
template <class EmitFunction>
void on_message(EmitFunction emit) {
m_emit = emit;
}
private:
std::function<void(std::string&&)> m_emit;
std::vector<std::string> m_source;
};
template <class Sender,m_function(function) {
m_sender.on_message([this](MessageType&& message) {
process_message(std::move(message));
});
}
void process_message(MessageType&& message) const {
std::invoke(m_function,std::move(message));
}
private:
Sender m_sender;
Function m_function;
};
template <class Function>
struct sink_helper {
Function function;
};
template <class Function>
auto sink(Function&& function) {
return sink_helper<Function>{std::forward<Function>(function)};
}
template <class Sender,sink_helper<Function> sink) {
return sink_impl<Sender,sink.function);
}
int main(int argc,char const* argv[]) {
boost::asio::io_service event_loop;
auto sink_to_cerr =
sink([](const auto& message) { std::cerr << message << '\n'; });
auto pipeline = service(event_loop) | sink_to_cerr;
//auto pipeline = mock_service{""," ","\n","hello world","hello world\n"} | sink_to_cerr; //Compile Error
std::cerr << "Service is running...\n";
event_loop.run();
}
编译命令是g++ mwe.cc -std=c++17 -lpthread
顺便说一下,我的环境是:
操作系统:Ubuntu-20.04
编译器:GCC 9.3.0 版
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)