问题描述
我正在尝试实现一个类似于 AWS 或 Azure 队列的 PubSub 客户端,但是,我在使用 gcloud cpp sdk 时遇到了问题。
更新:删除了不必要的细节。
首先,提供的 example 无法立即使用 - 我必须在 session.cancel()
之前休眠,否则消息不会被确认。有没有一种可靠的方法可以等到 ack() 操作完成并检查其状态?至少我想确定服务器收到了我的请求。
此外,c++ API 似乎只提供了不适合我的用例的异步方法。
我需要实现以下接口,该接口通过依赖注入插入到更大的系统中。该系统在其他云上的生产环境中工作,因此我无法更改架构。只需要实现接口即可。
template<typename TItem>
class Queue{
public:
/*!
* Dequeues message from the queue
* Returns true on success
*/
virtual bool Dequeue( TItem & item) = 0;
/*!
* discards(deletes) the item with from the cloud queue.
*/
virtual void discard(const TReceipt & receipt) = 0;
};
队列的实际实现将由一个序列化器提供,该序列化器将 TItem 序列化为 JSON 并返回。
AWS 和 Azure SDK 为每条出队的消息提供收据,以便我以后可以丢弃它。 pubsub SDK 的接收是 AckHandler 对象,该对象绑定到会话。
一个明显错误的解决方案是保持会话打开并在 lambda 中等待另一个条件变量,直到下一次调用 Dequeue 方法。然而,这看起来像一个快速而肮脏的解决方案。 使用 Pub/Sub 实现此功能的正确方法是什么?
解决方法
有没有可靠的方法可以等到 ack() 操作完成并检查其状态?
并非如此,因为 ack()
操作是尽力而为。即使您等待 ack()
到达服务器,也不能保证消息不会被重新发送。是的,即使在成功 ack()
之后,该服务也可能会重新发送消息。
请注意,在您明确ack()
或nack()
之前,库会自动延长消息的租期以避免重新发送,因此ack()
中的延迟不会影响正确性除非 您在 ack()
后不久关闭应用程序。
AWS 和 Azure SDK 为每条出队的消息提供了一个收据,以便我以后可以丢弃它,但我不知道如何使用 pub/sub 来做到这一点。
您可以在 nack()
上调用 AckHandler
来丢弃消息,这意味着该消息将重新发送到另一个实例。这就是“丢弃”的意思吗?
使用 Pub/Sub 实现此功能的正确方法是什么?
嗯,我不确定我是否遵循。我可以猜测 Dequeue()
和 Discard()
的语义,如果我猜错了,请见谅。无论如何,不清楚这是否真的适用于任何类型?喜欢TReceipt == int
吗?
需要注意的是,这里有很多猜测,您可以这样做:
class PubsubBuffer {
private:
std::mutex mu_;
std::dequeue<std::pair<pubsub::Message,AckHandlerWrapper>> queue_;
public:
virtual bool Dequeue(pubsub::Message& item,AckHandlerWrapper& receipt) {
std::unique_lock<std::mutex> lk(mu_);
if (queue_.empty()) return false;
auto& f = queue_.front();
item = std::move(f.first);
receipt = std::move(.second);
queue_.pop_front();
return true;
}
virtual void Discard(AckHandlerWrapper const& receipt) {
std::move(receipt.ack_handler).ack();
}
void Push(pubsub::Message m,pubsub::AckHandler h) {
std::unique_lock<std::mutex> lk(mu_);
queue_.push_back(
std::make_pair(std::move(m),AckHandlerWrapper(std::move(h)));
}
};
std::shared_ptr<PubsubBuffer> F(pubsub::Subscriber s) {
auto buffer = std::make_shared<PubsubBuffer>();
auto handler = [buffer](pubsub::Message m,pubsbu::AckHandler h) {
buffer.Push(std::move(m),std::move(h));
};
return buffer;
}