java反应器重试时间和超时

问题描述


        Flux<Integer> flux2 = Flux.generate(AtomicInteger::new,(atomicInteger,synchronousSink) -> {
            if (atomicInteger.get() == 10) {
                synchronousSink.complete();
                return atomicInteger;
            }
            synchronousSink.next(atomicInteger.getAndIncrement());

            return atomicInteger;
        }).cast(Integer.class)
                .map(e -> {
                    if (e != 3) return e;
                    else {
                        try {
                            Thread.sleep(510L);
                        } catch (InterruptedException interruptedException) {
                            throw new RuntimeException(interruptedException);
                        }
                        System.out.println("sleeping");
                        return e;
                    }
                })
                .timeout(Duration.ofMillis(400L))
                .doOnError(System.out::println)
                .retrywhen(Retry.max(2).transientErrors(false));

出于某种原因,此代码不会在第二次重试时触发超时错误,适用于 Flux.just(1,2,3,4) 等静态数据,但在第一次重试后生成器不起作用。

15:15:45.518 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
0
1
2
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 400ms in 'map' (and no fallback has been configured)
0
1
2
sleeping
15:15:46.129 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 3
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 400ms in 'map' (and no fallback has been configured)
sleeping

15:15:46.549 [parallel-3] DEBUG reactor.core.publisher.Operators - onNextDropped: 3
0
1
2
sleeping
3
4
5
6
7
8
9

P.S 在订阅特定线程后,由于某种原因它起作用了。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)