问题描述
我正在使用 Flowable
构建 Kafka 消费者 Flowable.generate
,这是我的实现
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Flowable;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class RxKafkaUtils {
/**
* Returns a flowable impl for a Kafka Consumer
*/
public static @NonNull Flowable < Object > source(Properties props,String[] topics) {
final
var consumer = new KafkaConsumer(props);
return Flowable.generate(
() -> {
consumer.subscribe(Arrays.asList(topics));
return ConsumerRecords.empty().iterator();
},(state,emitter) -> {
if (state.hasNext()) {
emitter.onNext(state.next());
return state;
} else {
try {
return consumer.poll(1000).iterator();
} catch (Throwable t) {
consumer.close();
emitter.onError(t);
throw t; // <------ HERE
}
}
});
}
}
由于 KafkaConsumer
可以抛出异常,因此我将 .poll
包装在 try-catch 中。
我的疑问是,我应该重新抛出 Throwable
吗?如果没有,编译器会从 catch 块中要求返回值,在这种情况下,我真的没有任何
Flowable.generate
是否适合这样做?我尝试使用 Flowable.frompublisher
但状态管理一团糟,或者我做错了
解决方法
好吧,原来如此,
Flowable.generate
接受在 disposeState
之后调用的 onComplete/onError
Consumer。当前状态(或我返回的最终状态被传递给 disposeState
消费者)