使用 Flowable.generate RxJava 进行错误处理

问题描述

我正在使用 Flowable 构建 Kafka 消费者 Flowable.generate,这是我的实现

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Flowable;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class RxKafkaUtils {

  /**
   * Returns a flowable impl for a Kafka Consumer
   */
  public static @NonNull Flowable < Object > source(Properties props,String[] topics) {
    final
    var consumer = new KafkaConsumer(props);
    return Flowable.generate(
      () -> {
        consumer.subscribe(Arrays.asList(topics));
        return ConsumerRecords.empty().iterator();
      },(state,emitter) -> {
        if (state.hasNext()) {
          emitter.onNext(state.next());
          return state;
        } else {
          try {
            return consumer.poll(1000).iterator();
          } catch (Throwable t) {
            consumer.close();
            emitter.onError(t);
            throw t;           // <------ HERE
          }
        }
      });
  }
}

由于 KafkaConsumer 可以抛出异常,因此我将 .poll 包装在 try-catch 中。

我的疑问是,我应该重新抛出 Throwable 吗?如果没有,编译器会从 catch 块中要求返回值,在这种情况下,我真的没有任何

实现此目的的正确方法是什么(换句话说 - 正常关闭)?

Flowable.generate 是否适合这样做?我尝试使用 Flowable.frompublisher 但状态管理一团糟,或者我做错了

解决方法

好吧,原来如此,

Flowable.generate 接受在 disposeState 之后调用的 onComplete/onError Consumer。当前状态(或我返回的最终状态被传递给 disposeState 消费者)