gRPC:长期运行流传输的最佳实践是什么?

问题描述

我们已经实现了一种Java gRPC服务,该服务在云中运行,并且具有单向(客户端到服务器)流式RPC,如下所示:

rpc PushUpdates(stream Update) returns (Ack);

C ++客户端(移动设备)在启动时会立即调用此rpc,以便在设备启动并运行后永久地每30秒钟持续发送一次更新。

ChannelArguments chan_args;
// this will be secure channel eventually 
auto channel_p = CreateCustomChannel(remote_addr,InsecureChannelCredentials(),chan_args);
auto stub_p    = DialTcc::NewStub(channel_p);
// ...

Ack ack;
auto strm_ctxt_p = make_unique<ClientContext>();
auto strm_p      = stub_p->PushUpdates(strm_ctxt_p.get(),&ack);
// ...

While(true) {
    // wait until we are ready to send a new update
    Update updt;
    // populate updt;
    if(!strm_p->Write(updt)) {
        // stream is not kosher,create a new one and restart
        break;
    }
}

现在,发生这种情况时会发生各种不同的网络中断:

  • 在云中运行的gRPC服务可能会关闭(出于维护目的)或可能变得无法访问。
  • 设备本身的IP地址随着移动设备的变化而不断变化。

我们已经看到,在此类事件中,通道和Write() API均无法可靠地检测网络断开连接。有时,客户端继续调用Write()(不会返回false),但是服务器没有收到任何数据(wireshark在客户端设备的传出端口上没有显示任何活动)。

在这种情况下要恢复的最佳实践是什么,以便服务器在发生此类事件后的 X秒内开始接收更新?可以理解的是,每当发生此类事件时,就会丢失价值 X秒的数据,但是我们希望在X秒内可靠地恢复。

gRPC版本:1.30.2,客户端:C ++-14 / Linux,服务器:Java / Linux

解决方法

这是我们如何破解此漏洞的方法。我想检查一下是否可以做得更好,或者gRPC的任何人都可以指导我寻求更好的解决方案。

用于我们服务的protobuf如下所示。它具有用于ping服务的RPC,经常用于测试连接性。

// Message used in IsAlive RPC
message Empty {}

// Acknowledgement sent by the service for updates received
message UpdateAck {}

// Messages streamed to the service by the client
message Update {
...
...
}

service GrpcService {
  // for checking if we're able to connect
  rpc Ping(Empty) returns (Empty); 

  // streaming RPC for pushing updates by client
  rpc PushUpdate(stream Update) returns (UpdateAck);
}

这是c ++客户端的外观,它执行以下操作:

  • Connect()

    • 如果存根为nullptr,则创建用于调用RPC的存根。
    • 定期调用Ping()直到成功。
    • 在成功调用PushUpdate(...) RPC后创建新的流。
    • 失败时,将流重置为nullptr
  • Stream():执行以下while(true)循环:

    • 获取要推送的更新。
    • 在流上调用Write(...)并推送更新。
    • 如果Write(...)出于任何原因失败,则控制权返回到Connect()
    • 每30分钟(或每隔固定的时间间隔)将所有内容(桩,频道,流)重置为nullptr,以重新开始。这是必需的,因为即使客户端与服务之间没有连接,有时Write(...)也不会失败。 Write(...)呼叫成功,但客户端上的传出端口在Wireshark上未显示任何活动!

代码如下:

constexpr GRPC_TIMEOUT_S = 10;
constexpr RESTART_INTERVAL_M = 15;
constexpr GRPC_KEEPALIVE_TIME_MS = 10000;
string root_ca,tls_key,tls_cert; // for SSL
string remote_addr = "https://remote.com:5445";
...
...
void ResetStreaming() {
  if (stub_p != nullptr) {
    strm_p      = nullptr;
    strm_ctxt_p = nullptr;
    stub_p      = nullptr;
    channel_p   = nullptr;
  }
}

void CreateStub() {
  if (stub_p == nullptr) {
    ChannelArguments chan_args;
    chan_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS,GRPC_KEEPALIVE_TIME_MS);
    channel_p = CreateCustomChannel(
        remote_addr,SslCredentials(SslCredentialsOptions{root_ca,tls_cert}),chan_args);
    stub_p = GrpcService::NewStub(m_channel_p);
  }
}

void Stream() {
  const auto restart_time = steady_clock::now() + minutes(RESTART_INTERVAL_M);
  while (!stop) {
    // restart every RESTART_INTERVAL_M (15m) even if ALL IS WELL!!
    if (steady_clock::now() > restart_time) {
      break;
    }
    Update updt = GetUpdate(); // get the update to be sent
    if (!stop) {
      if (channel_p->GetState(true) == GRPC_CHANNEL_SHUTDOWN ||
                 !strm_p->Write(updt)) {
        // could not write!!
        return;  // we will Connect() again
      }
    }
  }
  // stopped due to stop = true or interval to create new stream has expired
  ResetStreaming();  // channel,stub,stream are recreated once in every 15m
}

bool PingRemote() {
  ClientContext ctxt;
  ctxt.set_deadline(system_clock::now() + seconds(GRPC_TIMEOUT_S));
  Empty req,resp;
  CreateStub();
  if (stub_p->Ping(&ctxt,req,&resp).ok()) {
    static UpdateAck ack;
    strm_ctxt_p = make_unique<ClientContext>();  // need new context
    strm_p      = stub_p->PushUpdate(strm_ctxt_p.get(),&ack);
    return true;
  }
  if (strm_p) {
    strm_p      = nullptr;
    strm_ctxt_p = nullptr;
  }
  return false;
}

void Connect() {
  while (!stop) {
    if (PingRemote() || stop) {
      break;
    }
    sleep_for(seconds(5)); // wait before retrying
  }
}

// set to true from another thread when we want to stop
atomic<bool> stop = false;

void StreamUntilStopped() {
  if (stop) {
    return;
  }
  strm_thread_p = make_unique<thread>([&] {
    while (!stop) {
      Connect();
      Stream();
    }
  });
}

// called by the thread that sets stop = true
void Finish() {
  strm_thread_p->join();
}

有了这个,我们看到只要任何原因中断,流就会在 15分钟内(或RESTART_INTERVAL_M)恢复。这段代码运行很快,所以我很想知道是否可以做得更好。