问题描述
我有一连串的外发消息。它们可以任意间隔发生。如果在发送最后一条消息后的一段时间内没有任何消息,我想发出一条新消息,该消息充当保持活动状态或心跳信号。
这是我尝试过的代码示例。假设我想在“ C”之后直到“ D”之后每1s发出一次心跳消息。
Flux.concat(
Flux.just("A","B","C").delayElements(Duration.ofMillis(500)),Flux.just("D").delaySequence(Duration.ofSeconds(5))
)
.windowTimeout(1,Duration.ofSeconds(1))
.flatMap(window -> window.switchIfEmpty(Mono.just("*")))
.log()
.blockLast();
这是输出
14:30:14.659 [parallel-2] INFO reactor.Flux.FlatMap.1 - onNext(A)
14:30:15.162 [parallel-3] INFO reactor.Flux.FlatMap.1 - onNext(B)
14:30:15.663 [parallel-4] INFO reactor.Flux.FlatMap.1 - onNext(C)
14:30:16.664 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:17.665 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:18.664 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:19.670 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:20.665 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:20.676 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(D)
14:30:20.677 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*) // Why?
14:30:20.679 [parallel-1] INFO reactor.Flux.FlatMap.1 - onComplete()
在此示例中,即使我指定了5秒,D也会跟随C 5.013秒,因此,如果在中间发出4或5个项目/心跳,我就不会感到烦恼。不需要那么精确。
但是为什么在D之后省略另一个项目?有办法解决吗?也许我使用了错误的操作。
我想我可以使用处理器来实现它,但是the documentation says
大多数时候,您应该避免使用处理器。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)