如何处理在反应订阅者中抛出的异常,在 Flux 中以确保其他订阅者继续?

问题描述

我有多个订阅者,如果其中一个订阅失败,其他订阅者仍将继续。这里的问题是我没有控制订阅 Flux 的方法。因此,我无法在调用 subscribe()注册错误处理程序(参见 subscribingMethodInLibrary()subscribingMethodWithErrorHandlerInLibrary() 的对比)。

public class FailingSubscriberTest {
    @Test
    public void multipleSubscribers() {
        ConnectableFlux<Integer> flux = Flux.fromStream(IntStream.range(0,10).Boxed())
                                            // onErrorContinue() does not make any difference
                                            .onErrorContinue((e,o) -> System.out.println(e))
                                            .publish();

        flux.subscribe(it -> System.out.println("subscriber: " + it));
        subscribingMethodInLibrary(flux);
        // subscribingMethodWithErrorHandlerInLibrary(flux);

        flux.connect();
    }

    // result: exception is not caught and all subscribers are canceled
    private void subscribingMethodInLibrary(Flux<Integer> flux) {
        flux.subscribe(it -> {throw new IllegalStateException();});
    }

    // result: exception is caught and other subscribers continue
    private void subscribingMethodWithErrorHandlerInLibrary(Flux<Integer> flux) {
        flux.subscribe(it -> {throw new IllegalStateException();},System.out::println);
    }
}

因此我必须处理不断变化的异常,但我不知道如何处理。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)