问题描述
Flux<String> flux = Flux.<String>never()
.doOnRequest(n -> System.out.println("Requested " + n));
它是一个从不发出任何信号,但向控制台报告需求的 Flux。
以下 3 行中的每一行
flux.take(3).next().block();
flux.next().block();
flux.blockFirst();
产生这个输出:
Requested 9223372036854775807
BlockingSingleSubscriber
(适用于 Flux#blockFirst()
和 Mono#block()
:
public final void onSubscribe(Subscription s) {
this.s = s;
if (!cancelled) {
s.request(Long.MAX_VALUE);
}
}
MonoNext.NextSubscriber
:
public void request(long n) {
if (WIP.compareAndSet(this,1)) {
s.request(Long.MAX_VALUE);
}
}
FluxTake.TakeSubscriber
:
public void request(long n) {
if (wip == 0 && WIP.compareAndSet(this,1)) {
if (n >= this.n) {
s.request(Long.MAX_VALUE);
} else {
s.request(n);
}
return;
}
s.request(n);
}
所以 Flux#blockFirst()
、Flux#next()
和 Mono#block()
总是向上游发出无限需求信号,Flux#take()
在某些情况下也可以这样做。
但是 Flux#blockFirst()
、Flux#next()
和 Mono#block()
都需要来自上游的最多一个元素,而 Flux#take()
最多需要 this.n.
此外,Flux#take()
javadoc 还说:
请注意,此运算符不会操作背压请求量。 相反,它只是让来自下游的请求按原样传播并取消一次 已发射 N 个元素。因此,该源可能会产生大量 在此期间的无关元素。如果这种行为是不受欢迎的,而你确实这样做了 不拥有来自下游的请求(例如预取运算符),请考虑 改用 {@link #limitRequest(long)}。
问题是:当他们预先知道限制时,为什么他们会发出无限的需求信号?我的印象是反应式背压只是询问您准备消费的内容。但在现实中,往往是这样的:对上游喊“尽你所能”,满意后再取消订阅。如果上游产生大量记录的成本很高,这似乎只是浪费。
解决方法
tl;dr - 只请求你需要的东西在基于拉的系统中通常是理想的,但在基于推送的系统中很少是理想的。
我的印象是反应式背压只是询问您准备消费的内容。
不完全是,这是您能够消费的。区别很微妙,但很重要。
在基于拉取的系统中,您是完全正确的 - 请求比您知道的更多的值几乎永远不会是一件好事,因为您请求的值越多,产生的工作就越多那些价值观。
但请注意,响应式流本质上是基于推送的,而不是基于拉的。大多数反应式框架,包括反应器,都是考虑到这一点而构建的——虽然混合或基于拉的语义是可能的(例如,使用 Flux.generate()
按需生成一个元素),这在很大程度上是次要的案件。通常情况下,发布商拥有大量需要卸载的数据,并且“希望”尽快将其推送给您以将其删除。
这很重要,因为它从请求的角度翻转了理想情况的看法。这不再是“我最需要什么”的问题,而是“我能处理的最多的是什么”——数字越大越好。
举个例子,假设我有一个数据库查询返回 2000 条连接到通量的记录 - 但我只想要 1。如果我有一个发布这 2000 条记录的发布者,我调用 request(1)
,那么我根本不是“帮助”事情 - 我没有减少数据库方面的处理,这些记录已经在那里等待。但是,由于我只请求了 1 个,因此发布者必须决定它是否可以缓冲剩余的记录,或者最好跳过其中的部分或全部,或者如果无法跟上,则应该抛出异常,或者其他什么完全。无论它做什么,我实际上都在导致更多的工作,在某些情况下甚至可能通过请求更少记录而导致异常。
当然,这不是总是可取的 - 也许 Flux 中的那些额外元素确实会导致额外的浪费处理,也许网络带宽是主要问题,等等。在这种情况下,你会想显式调用 limitRequest()
。但在大多数情况下,这可能不是您所追求的行为。
(为了完整起见,最佳方案当然是限制源数据 - 例如,如果您只需要一个值,则在数据库查询中放置一个 LIMIT 1
。然后您不必担心任何这些事情。但是,当然,在实际使用中,这并不总是可行的。)