问题描述
我正在将Spring Reactor与Spring Cloud Stream(GCP Pub / Sub Binder)结合使用,并遇到错误处理问题。我可以通过一个非常简单的示例重现该问题:
@Bean
public Function<Flux<String>,Mono<Void>> consumer() {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}",msg))
.map(msg -> {
if (true) {
throw new RuntimeException("exception encountered!");
}
return msg;
})
.doOnError(throwable -> log.error("Failed to consume message",throwable))
.then();
}
我期望的行为是看到“无法使用消息”打印,但是,这似乎没有发生。将.log()
调用添加到链中时,我看到onNext
/ onComplete
信号,我希望看到onError
信号。
我的实际代码如下:
@Bean
public Function<Flux<CustomMessage>,Mono<Void>> consumer(MyService myService) {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}",msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message",throwable))
.then();
}
我注意到在服务类的深处,我试图对Reactor发布者进行错误处理。但是,使用Spring Cloud Stream时不会出现onError
信号。如果我只是在单元测试中像这样myService.processMessage(msg)
那样调用服务并模拟了异常,那么我的反应链将正确地传播错误信号。
当我连接到Spring Cloud Stream时,这似乎是一个问题。我想知道Spring Cloud Function / Stream是否正在做任何全局错误包装?
在我的平凡代码中,我确实注意到了此错误消息,这可能与为什么我没有收到错误信号有关?
ERROR --- onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: ...
为了使我更加困惑,如果将Spring Cloud Stream绑定按如下方式切换到非反应式实现,那么我可以在反应式链中得到onError
信号:
@Bean
public Consumer<CustomMessage> consumer(MyService myService) {
return customMessage -> Mono.just(customMessage)
.doOnNext(msg -> log.info("New message received: {}",throwable)) // prints successfully this time
.subscribe();
}
解决方法
这就是我从自己的调查中收集的信息,也许这可能会对其他人有所帮助。预警,我可能没有使用正确的“ Spring Reactor语言”,但这就是我最终解决它的方式...
在Hoxton.SR5
中,一个管理流量订阅的onErrorContinue
was included on the reactive binding。 onErrorContinue
的问题在于,它会通过在失败的运算符(如果支持)上应用BiConsumer函数来影响 上游 运算符。
这意味着,当我们的map
/ flatMap
运算符发生错误时,onErrorContinue
BiConsumer将启动并将下行信号修改为onComplete()
({{ 1}}或Mono<T>
(如果它从request(...)
请求了新元素)。由于没有Flux<T>
信号,导致我们的doOnError(...)
操作符无法执行。
最终,SCS团队决定加入remove this error handling wrapper。 onError()
不再有此Hoxton.SR6
。但是,这意味着传播到SCS绑定的异常将导致Flux订阅被切断。由于没有订阅者,因此随后的消息将无处路由。
此错误处理已传递给客户端,我们向{em>内部发布者添加了onErrorContinue
运算符以有效地删除错误信号。当onErrorResume
发布者遇到错误时,onErrorResume
会将发布者切换为作为参数传入的后备发布者,并从操作员链中的该点继续。在我们的情况下,该后备发布者仅返回myService::processMessage
,这使我们能够丢弃错误信号,同时仍允许内部错误处理机制运行,同时也不会影响外部源发布者。
Mono.empty()
示例/说明
可以通过一个非常简单的示例来说明上述技术。
onErrorResume
上面的Flux.just(1,2,3)
.flatMap(i -> i == 2
? Mono.error(new RuntimeException("error")
: Mono.just(i))
.onErrorResume(t -> Flux.just(4,5,6))
.doOnNext(i -> log.info("Element: {}",i))
.subscribe();
将输出以下内容:
Flux<Integer>
由于在元素Element: 1
Element: 4
Element: 5
Element: 6
上遇到错误,因此2
会进入后备状态,并且新发布者有效地从后备状态恢复为onErrorResume
。在我们的情况下,我们不想影响来源发布者(即Flux.just(4,6)
)。我们只想删除错误的元素(Flux.just(1,3)
),然后继续下一个元素(2
)。
我们不能简单地将3
更改为Flux.just(4,6)
或Flux.empty()
:
Mono.empty()
这将导致输出以下内容:
Flux.just(1,3)
.flatMap(i -> i == 2
? Mono.error(new RuntimeException("error")
: Mono.just(i))
.onErrorResume(t -> Mono.empty())
.doOnNext(i -> log.info("Element: {}",i))
.subscribe();
这是因为Element: 1
已将上游发布者替换为后备发布者(即onErrorResume
),并从那时起又恢复了 。
要获得我们期望的输出:
Mono.empty()
我们必须将Element: 1
Element: 3
运算符放在onErrorResume
的内部发布者上:
flatMap
现在,public Mono<Integer> func(int i) {
return i = 2 ? Mono.error(new RuntimeException("error")) : Mono.just(i);
}
Flux.just(1,3)
.flatMap(i -> func(i)
onErrorResume(t -> Mono.empty()))
.doOnNext(i -> log.info("Element: {}",i))
.subscribe();
仅影响onErrorResume
返回的内部发布者。如果func(i)
中的运算符发生错误,func(i)
将回退到onErrorResume
,有效完成Mono.empty()
而不会崩溃。这也仍然允许在回退运行之前应用错误处理运算符(例如Mono<T>
) doOnError
。这是因为,与func(i)
不同,它不会影响上游运算符,也不会在错误的位置更改下一个信号。
最终解决方案
在重复使用我的问题中的代码段后,我已经将Spring Cloud版本升级到onErrorContinue
,并将代码更改为如下所示:
Hoxton.SR6
请注意,@Bean
public Function<Flux<CustomMessage>,Mono<Void>> consumer(MyService myService) {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}",msg))
.flatMap(msg -> myService.processMessage(msg)
.onErrorResume(throwable -> Mono.empty())
)
.then();
}
位于内部发布者上(onErrorResume
内部)。
我认为该问题存在于以下代码中:
.map(msg -> new RuntimeException("exception encountered!"))
地图行中的lambda返回一个异常,而不是抛出异常。