问题描述
我正在为应用程序设计体系结构。通常,它通过特殊的通信协议与各种设备进行通信,并使用各种连接:udp,tcp,串行端口,ARINC429等。
目前,我有一个基于策略的类QSerialCommunicator
,具有用于连接的通用抽象。它使用序列化协议将用户定义的类型和类序列化为原始字节数组(和其他方向),然后通过抽象连接将其传输到设备。
串行协议(Encoder
和Decoder
)是模板参数;该类使用其Encode
和Decode
方法(可能是静态的)来处理输入和输出数据(从|到原始字节数组),并推导其方法dispatch
和{ {1}}。
在某些情况下,数据可能会分段接收,因此NextData
使用其内部状态来组装所有零件,直到可以安全地反序列化为止。
抽象连接的想法是,在某个时候我们可能决定与某种设备利用另一种类型的连接。有了这样的抽象,就不需要重新编译了,更改输入配置文件就可以了。但是,我考虑将连接作为模板策略提供,例如将通信协议作为附加版本。
现在,当创建该类的对象时,会将其移动到单独的Decoder
并基本上是线程安全的:它具有用于输入和接收数据的命令队列。它使用QThread
提供的信号SigReadyRead
进行操作并发出QAbstractConnection
。
SigDataReceived
基本上,我想要的主要是摆脱Qt信号和插槽。那是为了更改用于连接类的API,但是我不知道如何正确地做到这一点(它可以很好地与信号和插槽配合使用)。我考虑使用/*
* This class represents an abstraction for one-to-one connection (with a specific device).
* Basically it serves only one purpose (Within GenericDriver): send and receive byte arrays
*
* Todo: possible errors and ways to handle them
*/
class QAbstractConnection : public QObject
{
Q_OBJECT
public:
explicit QAbstractConnection(QObject *parent = nullptr)
: QObject(parent)
{}
// are these functions necessary here?
///////////////////////////////////////
virtual bool IsConnected() const = 0;
virtual DeviceError Connect() = 0;
virtual DeviceError disconnect() = 0;
///////////////////////////////////////
// Todo: throw exception?
virtual DeviceError ReadData(QByteArray& byteArray) = 0;
// Todo: throw exception?
// sends array in 512 bytes datagrams
virtual DeviceError WriteData(const char* data,size_t length) = 0;
DeviceError WriteData(const QByteArray& byteArray)
{
return WriteData(byteArray.data(),byteArray.size());
}
signals:
void SigReadyRead();
void SigConnectionLost();
};
// Todo: flags
enum class DecodeResult : int
{
success,in_progress,empty_package,Failed
};
// Todo: remove?
// Todo: flags
enum class EncodeResult : int
{
success
};
template <typename Decoder>
struct decoder_arg;
template <typename DecoderArg>
struct decoder_arg<DecodeResult(*)(const QByteArray&,DecoderArg)>
{
using type = remove_cvref_ptr<DecoderArg>;
};
template <typename Decoder,typename DecoderArg>
struct decoder_arg<DecodeResult(Decoder::*)(const QByteArray&,DecoderArg)>
{
using type = remove_cvref_ptr<DecoderArg>;
};
template <typename Encoder>
struct encoder_args;
template <typename ... EncoderArgs>
struct encoder_args<EncodeResult(*)(QByteArray&,EncoderArgs...)>
{
using type = std::tuple<remove_cvref_ptr<EncoderArgs>...>;
};
template<class Encoder,typename ... EncoderArgs>
struct encoder_args<EncodeResult(Encoder::*)(QByteArray &,EncoderArgs ...)>
{
using type = std::tuple<remove_cvref_ptr<EncoderArgs>...>;
};
class QSerialCommunicatorBase : public QObject
{
Q_OBJECT
public:
QSerialCommunicatorBase(QObject *parent = nullptr)
: QObject(parent)
{}
protected slots:
virtual void SlotCommandReceived() = 0;
virtual void SlotReadyRead() = 0;
signals:
// Connect to this signal via QueuedConnection not to block Communicator Thread
void SigDataReceived();
// Todo: use this function to recover from connection errors?
void ConnectionMalfunction();
};
struct void_v
{};
// "empty" class for SerialCommunicator in case no decoder protocol is needed
struct DecoderVoid
{
static DecodeResult Decode(const QByteArray&,void_v)
{
return DecodeResult::success;
}
};
// "empty" class for SerialCommunicator in case no encoder protocol is needed
struct EncoderVoid
{
static EncodeResult Encode(QByteArray&,void_v)
{
return EncodeResult::success;
}
};
template<class Decoder,class Encoder,typename DecoderArg = typename decoder_arg<decltype(&Decoder::Decode)>::type,typename TupleEncoderArgs = typename encoder_args<decltype(&Encoder::Encode)>::type>
class QSerialCommunicator;
/*
* Class QSerialCommunicator runs in its own QThread with an event loop
*/
template<typename DecoderT,typename EncoderT,typename DecoderArgT,typename ... EncodeArgs>
class QSerialCommunicator<DecoderT,EncoderT,DecoderArgT,std::tuple<EncodeArgs...>> : public QSerialCommunicatorBase
{
// Todo: compile time error if DecoderT does not implement DecoderT::Decode(const QByteArray&,Decoded&)
using Encoder = EncoderT;
using Decoder = DecoderT;
using DecodedArg = DecoderArgT;
static_assert(std::is_default_constructible<DecodedArg>::value,"Decoded data type must be default constructable");
std::unique_ptr<QAbstractConnection> connection_;
Encoder encoder_;
Decoder decoder_;
std::queue<std::function<EncodeResult(QByteArray &)>> commandsQueue_;
mutable std::mutex commandsMutex_;
std::queue<DecodedArg> decodedQueue_;
mutable std::mutex decodedMutex_;
public:
QSerialCommunicator(std::unique_ptr<QAbstractConnection> &&connection,Encoder &&encoder,Decoder &&decoder,QObject *parent = nullptr)
: QSerialCommunicatorBase(parent),connection_(std::move(connection)),encoder_(std::forward<Encoder>(encoder)),decoder_(std::forward<Decoder>(decoder))
{
InitConnection();
}
template<typename = typename
std::enable_if<std::is_default_constructible<Decoder>::value &&
std::is_default_constructible<Encoder>::value>::type>
explicit QSerialCommunicator(std::unique_ptr<QAbstractConnection> &&connection,connection_(std::move(connection))
{
InitConnection();
}
void dispatch(EncodeArgs ... args)
{
if (std::is_same<Encoder,EncoderVoid>::value)
return;
{
std::lock_guard<std::mutex> lg{commandsMutex_};
// commandsQueue_.emplace(std::forward_as_tuple(std::forward<EncodeArgs>(args)...));
commandsQueue_.emplace(bind_func_v(&EncoderT::Encode,&encoder_,std::placeholders::_1,std::forward<EncodeArgs>(args)...));
}
// QueuedConnection not to block calling thread
QMetaObject::invokeMethod(this,"SlotCommandReceived",Qt::QueuedConnection);
}
// optional would be nice here
DecodedArg NextData()
{
DecodedArg out;
{
std::lock_guard<std::mutex> lg{decodedMutex_};
out = std::move(decodedQueue_.front());
decodedQueue_.pop();
}
return out;
}
size_t CommandsPending() const
{
std::lock_guard<std::mutex> lg{commandsMutex_};
return commandsQueue_.size();
}
size_t DataPending() const
{
std::lock_guard<std::mutex> lg{decodedMutex_};
return decodedQueue_.size();
}
private:
void InitConnection()
{
assert(connection_ && "Connection is nullptr!");
connection_->setParent(this);
QObject::connect(connection_.get(),&QAbstractConnection::SigReadyRead,this,&QSerialCommunicator::SlotReadyRead);
}
void SlotCommandReceived() override
{
assert(CommandsPending() && "Uneqpected case: Command queue is empty");
QByteArray array;
EncodeResult res;
{
res = commandsQueue_.front()(array);
std::lock_guard<std::mutex> lg{commandsMutex_};
commandsQueue_.pop();
}
if (res != EncodeResult::success)
{
// Todo: log or what?
return;
}
// Todo: try block
DeviceError writeRes = connection_->WriteData(array);
if (writeRes != DeviceError::success)
{
// Todo: log or what? Handle connection errors?
}
}
void SlotReadyRead() override
{
QByteArray array;
if (std::is_same<Decoder,DecoderVoid>::value)
{
// Todo: is this a valid runtime case?
assert(false && "Unexpected case: Data received for DecoderVoid");
DeviceError readRes = connection_->ReadData(array);
// Todo" handle readRes?
return;
}
DeviceError readRes = connection_->ReadData(array);
if (readRes != DeviceError::success)
{
// Todo: log,handle connection errors?
return;
}
DecodedArg decoded;
DecodeResult decodeResult = decoder_.Decode(array,decoded);
if (decodeResult != DecodeResult::success)
{
// Todo: log,what else?
return;
}
{
std::lock_guard<std::mutex> lg{decodedMutex_};
decodedQueue_.emplace(std::move(decoded));
}
emit SigDataReceived();
}
};
template<typename Encoder>
using SerialSender = QSerialCommunicator<DecoderVoid,Encoder>;
template<typename Decoder>
using SerialReceiver = QSerialCommunicator<Decoder,EncoderVoid>;
来实现tcp,udp和串行连接,或者也许使用ASIO
。我一直在研究这些可能性,但总的来说,我不明白为什么将来不能同时使用这两种可能性。
我还希望通讯器类异步工作。
- 如果使用
zmq
库进行实现,如何重构这些类以及应提供什么方法AbstractConnection
API? - 用于原始字节数组而不是
ASIO
的类型是什么? - 如何使用此架构处理连接错误?例如,如果连接无效并且在哪里处理重新连接逻辑(连接实现是否负责重新连接?)?
- 是否可以使用一种模式来简化我的解决方案?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)