Project Reactor 通量 conCat、flux mergeSequential、flux mergeOrdered 之间有什么区别

问题描述

如果我们提供相同的数据源,所有这些方法都会产生相同的结果。那么它们之间有什么区别?

解决方法

以以下(人为的)concat() 为例,其中两个发布者以 100 毫秒的间隔发出 3 个元素:

Flux<Integer> a = Flux.range(0,3).delayElements(Duration.ofMillis(100));
Flux<Integer> b = Flux.range(0,3).delayElements(Duration.ofMillis(100));

Flux.concat(a,b)
        .timed()
        .doOnNext(x -> System.out.println(x.get() + ": " + x.elapsed().toMillis()))
        .blockLast();

在这里您会看到类似于以下内容的输出:

0: 138
1: 107
2: 108
0: 111
1: 102
2: 107

所以我们以 100 毫秒的间隔发射了 6 个元素。订阅第一个发布者,以 100 毫秒的间隔发出 3 个元素,然后完成。然后订阅第二个发布者,以 100 毫秒的间隔发出它的 3 个元素,然后完成。

如果我们将 concat() 替换为 mergeSequential(),您将看到如下内容:

0: 118
1: 107
2: 107
0: 0
1: 0
2: 0

元素以相同的顺序发出 - 但看看最后 3 个的时间!这是因为行为略有不同 - 在这种情况下,两个 发布者都订阅了,因此开始以 100 毫秒的间隔发射元素。来自第一个发布者的元素在接收时发出,但来自第二个发布者的元素会被缓存,直到第一个发布者完成。

当第一个发布者完成后,第二个发布者接手——我们缓存的所有元素都会立即发出,因此没有延迟(所以时间为零。)我们发出了相同的元素,但速度要快得多.

这似乎是有利的,但您可能不想直接依赖 mergeSequential() 而不是 concat() 的主要原因有两个:

  • 在幕后缓存所有元素需要内存。在这个包含 3 个元素的示例中,几乎没有任何内存占用,但如果您开始处理数百万个元素(甚至可能永远不会完成的发布者),您可能很快就会耗尽内存。
  • 立即订阅可能会改变行为。以两个发布者为例 - 一个改变数据库中的值,另一个读取它。如果将它们连接起来,则写入将始终发生在读取之前。如果您同时订阅两者,则情况并非如此,您可能会在写之前阅读它(但不一定。)

出于上述两个原因,根据我的经验,您通常只想在实际使用中使用 concat() 而不是 mergeSequential()

至于 mergeOrdered(),在上面的例子中使用它,你会看到元素的实际顺序不同:

0: 127
0: 105
1: 17
1: 90
2: 15
2: 0

这里 mergeSequential() 的急切订阅部分是相同的,但有一个不同之处 - 每个发布者发出的值在发出时进行比较,最小的先发出。因此,您将看到(在本例中)一个有序的数字流:0,1,2,2。请注意,mergeSequential() 的时间似乎不同,因为它在您的最终输出中交错来自两个发布商的值,而不仅仅是按顺序合并它们。