如果自流中的上一个项目起有间隔,则发出一个新项目

问题描述

我有一连串的外发消息。它们可以任意间隔发生。如果在发送最后一条消息后的一段时间内没有任何消息,我想发出一条新消息,该消息充当保持活动状态或心跳信号。

marbles diagram

这是我尝试过的代码示例。假设我想在“ C”之后直到“ D”之后每1s发出一次心跳消息。

Flux.concat(
        Flux.just("A","B","C").delayElements(Duration.ofMillis(500)),Flux.just("D").delaySequence(Duration.ofSeconds(5))
    )
    .windowTimeout(1,Duration.ofSeconds(1))
    .flatMap(window -> window.switchIfEmpty(Mono.just("*")))
    .log()
    .blockLast();

这是输出

14:30:14.659 [parallel-2] INFO reactor.Flux.FlatMap.1 - onNext(A)
14:30:15.162 [parallel-3] INFO reactor.Flux.FlatMap.1 - onNext(B)
14:30:15.663 [parallel-4] INFO reactor.Flux.FlatMap.1 - onNext(C)
14:30:16.664 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:17.665 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:18.664 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:19.670 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:20.665 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:20.676 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(D)
14:30:20.677 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)    // Why?
14:30:20.679 [parallel-1] INFO reactor.Flux.FlatMap.1 - onComplete()

在此示例中,即使我指定了5秒,D也会跟随C 5.013秒,因此,如果在中间发出4或5个项目/心跳,我就不会感到烦恼。不需要那么精确。

但是为什么在D之后省略另一个项目?有办法解决吗?也许我使用了错误的操作。

我想我可以使用处理器来实现它,但是the documentation says

大多数时候,您应该避免使用处理器。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)