为什么一些 Reactor 操作员请求的元素比他们感兴趣的要多得多?

问题描述

我有以下代码

Flux<String> flux = Flux.<String>never()
        .doOnRequest(n -> System.out.println("Requested " + n));

它是一个从不发出任何信号,但向控制台报告需求的 Flux。

以下 3 行中的每一行

flux.take(3).next().block();
flux.next().block();
flux.blockFirst();

产生这个输出

Requested 9223372036854775807

代码,我看到以下内容

BlockingSingleSubscriber(适用于 Flux#blockFirst()Mono#block()

public final void onSubscribe(Subscription s) {
    this.s = s;
    if (!cancelled) {
        s.request(Long.MAX_VALUE);
    }
}

MonoNext.NextSubscriber

public void request(long n) {
    if (WIP.compareAndSet(this,1)) {
        s.request(Long.MAX_VALUE);
    }
}

FluxTake.TakeSubscriber

public void request(long n) {
    if (wip == 0 && WIP.compareAndSet(this,1)) {
        if (n >= this.n) {
            s.request(Long.MAX_VALUE);
        } else {
            s.request(n);
        }
        return;
    }

    s.request(n);
}

所以 Flux#blockFirst()Flux#next()Mono#block() 总是向上游发出无限需求信号,Flux#take() 在某些情况下也可以这样做。

但是 Flux#blockFirst()Flux#next()Mono#block() 都需要来自上游的最多一个元素,而 Flux#take() 最多需要 this.n.

此外,Flux#take() javadoc 还说:

请注意,此运算符不会操作背压请求量。 相反,它只是让来自下游的请求按原样传播并取消一次 已发射 N 个元素。因此,该源可能会产生大量 在此期间的无关元素。如果这种行为是不受欢迎的,而你确实这样做了 不拥有来自下游的请求(例如预取运算符),请考虑 改用 {@link #limitRequest(long)}。

问题是:当他们预先知道限制时,为什么他们会发出无限的需求信号?我的印象是反应式背压只是询问您准备消费的内容。但在现实中,往往是这样的:对上游喊“尽你所能”,满意后再取消订阅。如果上游产生大量记录的成本很高,这似乎只是浪费。

解决方法

tl;dr - 只请求你需要的东西在基于拉的系统中通常是理想的,但在基于推送的系统中很少是理想的。

我的印象是反应式背压只是询问您准备消费的内容。

不完全是,这是您能够消费的。区别很微妙,但很重要。

在基于拉取的系统中,您是完全正确的 - 请求比您知道的更多的值几乎永远不会是一件好事,因为您请求的值越多,产生的工作就越多那些价值观。

但请注意,响应式流本质上是基于推送的,而不是基于拉的。大多数反应式框架,包括反应器,都是考虑到这一点而构建的——虽然混合或基于拉的语义是可能的(例如,使用 Flux.generate() 按需生成一个元素),这在很大程度上是次要的案件。通常情况下,发布商拥有大量需要卸载的数据,并且“希望”尽快将其推送给您以将其删除。

这很重要,因为它从请求的角度翻转了理想情况的看法。这不再是“我最需要什么”的问题,而是“我能处理的最多的是什么”——数字越大越好。

举个例子,假设我有一个数据库查询返回 2000 条连接到通量的记录 - 但我只想要 1。如果我有一个发布这 2000 条记录的发布者,我调用 request(1),那么我根本不是“帮助”事情 - 我没有减少数据库方面的处理,这些记录已经在那里等待。但是,由于我只请求了 1 个,因此发布者必须决定它是否可以缓冲剩余的记录,或者最好跳过其中的部分或全部,或者如果无法跟上,则应该抛出异常,或者其他什么完全。无论它做什么,我实际上都在导致更多的工作,在某些情况下甚至可能通过请求更少记录而导致异常。

当然,这不是总是可取的 - 也许 Flux 中的那些额外元素确实会导致额外的浪费处理,也许网络带宽是主要问题,等等。在这种情况下,你会想显式调用 limitRequest()。但在大多数情况下,这可能不是您所追求的行为。

(为了完整起见,最佳方案当然是限制源数据 - 例如,如果您只需要一个值,则在数据库查询中放置一个 LIMIT 1。然后您不必担心任何这些事情。但是,当然,在实际使用中,这并不总是可行的。)