使用OkHttp3和ReactiveX Java实施长轮询的正确方法

问题描述

如何使用OkHttp3(v4.4.1)实施长时间轮询以获取响应的每一行的RxJava(v2.2.11)?可以在不阻塞线程保持读取行的情况下完成此操作吗?如果我需要阻塞某个线程,那么我应该阻塞哪个线程?关于使用OkHttp3实现长轮询的任何常规示例吗? Google对这个话题一直很害羞...

TL; DR

我将OKHttp3用作HTTP客户端,并将其包装在makeGetobservable方法调用中,该方法调用使用newCall回调返回Observable Response来向Observable发出事件。现在,我正在尝试增加对长轮询服务的支持,并且我担心线程问题。

下面的代码演示了我正在尝试做的事情(似乎可以正常工作),但是我很确定这是不正确的。

// return Observable<Response>
makeGetobservable("http://my.service.com/api/events")
  // check for error and map to Observable<ResponseBody>
  .map(this::mapRespBodyOrError)
  // flat map to Observable<String> representing line of long polling response
  .flatMap(respBody -> Observable.create(emitter -> {
    // open reader on response body stream
    try (BufferedReader reader = new BufferedReader(respBody.charStream())) {
      String line;
      // block and wait to read a line from input
      while((line = reader.readLine()) !=null) {
        // once line was read from response body input stream emit it as observable event
        emitter.onNext(line);
      }
    }
  }));

解决方法

经过一番研究,我发现被阻塞的线程是OkHttp客户线程。这是由于我的makeGetObservable的实现从OkHttp客户端的newCall..enqueue回调发出的。 OkHttp客户端默认情况下具有5个线程池,用于5个资源的并发连接。因此,每次我订阅另一种长轮询资源时,我都会阻塞其中一个线程。在5个阻塞的线程之后,OkHttp客户端由于没有线程来处理响应而停止工作。

按照@Progman的建议,我使用IO调度程序来使用subscribeOn,该调度程序为每个阻塞的IO操作生成了新线程。必须谨慎使用此调度程序来正确处理资源。

我的实现当前如下所示(添加了调度程序,完成和错误事件)

// return Observable<Response>
makeGetObservable("http://my.service.com/api/events")
  // check for error and map to Observable<ResponseBody>
  .map(this::mapRespBodyOrError)
  // use IO scheduler that spawns new threads to take care of blocking operations
  .observeOn(Schedulers.io())
  // flat map to Observable<String> representing line of long polling response
  .flatMap(respBody -> Observable.create(emitter -> {
    try (BufferedReader reader = new BufferedReader(respBody.charStream())) {
      // blocking read lines while available
      String line;
      while ((line = reader.readLine()) != null) {
        // emit event for every line
        emitter.onNext(line);
      }
      // emit the completion event to indicate we are done
      emitter.onComplete();
    } catch (IOException | RuntimeException err) {
      // emit any error that might have occurred
      emitter.onError(err);
    }
  }));