问题描述
Flux<Integer> flux2 = Flux.generate(AtomicInteger::new,(atomicInteger,synchronousSink) -> {
if (atomicInteger.get() == 10) {
synchronousSink.complete();
return atomicInteger;
}
synchronousSink.next(atomicInteger.getAndIncrement());
return atomicInteger;
}).cast(Integer.class)
.map(e -> {
if (e != 3) return e;
else {
try {
Thread.sleep(510L);
} catch (InterruptedException interruptedException) {
throw new RuntimeException(interruptedException);
}
System.out.println("sleeping");
return e;
}
})
.timeout(Duration.ofMillis(400L))
.doOnError(System.out::println)
.retrywhen(Retry.max(2).transientErrors(false));
出于某种原因,此代码不会在第二次重试时触发超时错误,适用于 Flux.just(1,2,3,4) 等静态数据,但在第一次重试后生成器不起作用。
15:15:45.518 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
0
1
2
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 400ms in 'map' (and no fallback has been configured)
0
1
2
sleeping
15:15:46.129 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 3
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 400ms in 'map' (and no fallback has been configured)
sleeping
15:15:46.549 [parallel-3] DEBUG reactor.core.publisher.Operators - onNextDropped: 3
0
1
2
sleeping
3
4
5
6
7
8
9
P.S 在订阅特定线程后,由于某种原因它起作用了。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)