将助焊剂分为单声道头部和尾部

问题描述

我想将Flux分为两部分:第一个元素的Mono(头部),其他所有元素的Flux(尾部)。
在此过程中,不应重新订阅基本Flux

不起作用的示例:

final Flux<Integer> baseFlux = Flux.range(0,3).log();
final Mono<Integer> head = baseFlux.next();
final Flux<Integer> tail = baseFlux.skip(1L);
assertThat(head.block()).isEqualTo(0);
assertThat(tail.collectList().block()).isEqualTo(Arrays.asList(1,2));

有关此日志,如下所示,您将看到基本Flux将被重新订阅两次:

[main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(unbounded)
[main] INFO reactor.Flux.Range.1 - | onNext(0)
[main] INFO reactor.Flux.Range.1 - | cancel()
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(unbounded)
[main] INFO reactor.Flux.Range.1 - | onNext(0)
[main] INFO reactor.Flux.Range.1 - | onNext(1)
[main] INFO reactor.Flux.Range.1 - | onNext(2)
[main] INFO reactor.Flux.Range.1 - | onComplete()
[main] INFO reactor.Flux.Range.1 - | request(1)

我的实际情况是我的基本Flux包含CSV文件的行,第一行是文件的标题,解析所有后续行都需要。基本Flux仅基于InputStream

只能预订一次

我为此找到的唯一相关资源是this question,但是我发现这有点不符合我的需求。

解决方法

由于comments中的建议,我得以设计出以下解决方案:

final Flux<Integer> baseFlux = Flux.range(0,3).log();
final Flux<? extends Tuple2<? extends Integer,Integer>> zipped = baseFlux
    .switchOnFirst((signal,flux) -> (signal.hasValue()
        ? Flux.zip(Flux.just(signal.get()).repeat(),flux.skip(1L))
        : Flux.empty()));
final List<? extends Tuple2<? extends Integer,Integer>> list = zipped.collectList().block();
assertThat(list.stream().map(Tuple2::getT1)).isEqualTo(Arrays.asList(0,0));
assertThat(list.stream().map(Tuple2::getT2)).isEqualTo(Arrays.asList(1,2));

它将第一个元素之后的基本Flux转换为原始元素的尾部,重复压缩该元素。并且它只订阅一次baseFlux

我不确定这是最好的解决方案,因为与有状态(“热”)通量的解决方案相比,它将创建很多Tuple2对象,这些对象最终将进行GC处理基于baseFlux,可以保持原始订阅有效。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...