延迟确认 GCloud Pub/Sub 消息

问题描述

我正在尝试实现一个类似于 AWS 或 Azure 队列的 PubSub 客户端,但是,我在使用 gcloud cpp sdk 时遇到了问题。

更新:删除了不必要的细节。

首先,提供的 example 无法立即使用 - 我必须在 session.cancel() 之前休眠,否则消息不会被确认。有没有一种可靠的方法可以等到 ack() 操作完成并检查其状态?至少我想确定服务器收到了我的请求。

此外,c++ API 似乎只提供了不适合我的用例的异步方法

我需要实现以下接口,该接口通过依赖注入插入到更大的系统中。该系统在其他云上的生产环境中工作,因此我无法更改架构。只需要实现接口即可。

template<typename TItem>
class Queue{
public:
    /*!
    * Dequeues message from the queue 
    * Returns true on success
    */
    virtual bool Dequeue( TItem & item) = 0;

    /*!
    * discards(deletes) the item with from the cloud queue.
    */
    virtual void discard(const TReceipt & receipt) = 0;
};

队列的实际实现将由一个序列化器提供,该序列化器将 TItem 序列化为 JSON 并返回。

AWS 和 Azure SDK 为每条出队的消息提供收据,以便我以后可以丢弃它。 pubsub SDK 的接收是 AckHandler 对象,该对象绑定到会话。

一个明显错误解决方案是保持会话打开并在 lambda 中等待另一个条件变量,直到下一次调用 Dequeue 方法。然而,这看起来像一个快速而肮脏的解决方案。 使用 Pub/Sub 实现此功能的正确方法是什么?

解决方法

有没有可靠的方法可以等到 ack() 操作完成并检查其状态?

并非如此,因为 ack() 操作是尽力而为。即使您等待 ack() 到达服务器,也不能保证消息不会被重新发送。是的,即使在成功 ack() 之后,该服务也可能会重新发送消息。

请注意,在您明确ack()nack()之前,库会自动延长消息的租期以避免重新发送,因此ack()中的延迟不会影响正确性除非 您在 ack() 后不久关闭应用程序。

AWS 和 Azure SDK 为每条出队的消息提供了一个收据,以便我以后可以丢弃它,但我不知道如何使用 pub/sub 来做到这一点。

您可以在 nack() 上调用 AckHandler 来丢弃消息,这意味着该消息将重新发送到另一个实例。这就是“丢弃”的意思吗?

使用 Pub/Sub 实现此功能的正确方法是什么?

嗯,我不确定我是否遵循。我可以猜测 Dequeue()Discard() 的语义,如果我猜错了,请见谅。无论如何,不​​清楚这是否真的适用于任何类型?喜欢TReceipt == int吗?

需要注意的是,这里有很多猜测,您可以这样做:

class PubsubBuffer {
 private:
  std::mutex mu_;
  std::dequeue<std::pair<pubsub::Message,AckHandlerWrapper>> queue_;

 public:
  virtual bool Dequeue(pubsub::Message& item,AckHandlerWrapper& receipt) {
    std::unique_lock<std::mutex> lk(mu_);
    if (queue_.empty()) return false;
    auto& f = queue_.front();
    item = std::move(f.first);
    receipt = std::move(.second);
    queue_.pop_front();
    return true;
  }
  virtual void Discard(AckHandlerWrapper const& receipt) {
    std::move(receipt.ack_handler).ack();
  }
  void Push(pubsub::Message m,pubsub::AckHandler h) {
    std::unique_lock<std::mutex> lk(mu_);
    queue_.push_back(
      std::make_pair(std::move(m),AckHandlerWrapper(std::move(h)));
  }
};


std::shared_ptr<PubsubBuffer> F(pubsub::Subscriber s) {
  auto buffer = std::make_shared<PubsubBuffer>();
  auto handler = [buffer](pubsub::Message m,pubsbu::AckHandler h) {
    buffer.Push(std::move(m),std::move(h));
  };
  return buffer;
}