问题描述
我有多个订阅者,如果其中一个订阅失败,其他订阅者仍将继续。这里的问题是我没有控制订阅 Flux 的方法。因此,我无法在调用 subscribe()
时注册错误处理程序(参见 subscribingMethodInLibrary()
与 subscribingMethodWithErrorHandlerInLibrary()
的对比)。
public class FailingSubscriberTest {
@Test
public void multipleSubscribers() {
ConnectableFlux<Integer> flux = Flux.fromStream(IntStream.range(0,10).Boxed())
// onErrorContinue() does not make any difference
.onErrorContinue((e,o) -> System.out.println(e))
.publish();
flux.subscribe(it -> System.out.println("subscriber: " + it));
subscribingMethodInLibrary(flux);
// subscribingMethodWithErrorHandlerInLibrary(flux);
flux.connect();
}
// result: exception is not caught and all subscribers are canceled
private void subscribingMethodInLibrary(Flux<Integer> flux) {
flux.subscribe(it -> {throw new IllegalStateException();});
}
// result: exception is caught and other subscribers continue
private void subscribingMethodWithErrorHandlerInLibrary(Flux<Integer> flux) {
flux.subscribe(it -> {throw new IllegalStateException();},System.out::println);
}
}
因此我必须处理不断变化的异常,但我不知道如何处理。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)