如何在反应堆中有条件地重复或重试

问题描述

我使用SpringBoot和Webflux进行反应式编程。我想重复该服务,直到有可用数据为止(除了null之外,还会返回某些内容)

我有一个将一些数据插入数据库的服务,还有第二个使用该数据的服务。 我想继续从第二个服务查询数据库,直到有数据可用为止。下面的代码我试图使用Project Reactor来实现:

Mono<SubscriptionQueryResult<App,App>> subscriptionQuery = reactiveQueryGateway
.subscriptionQuery(new FindAppByIdQuery(appId),ResponseTypes.instanceOf(App.class),ResponseTypes.instanceOf(App.class));

subscriptionQuery
  .filter(a -> Objects.nonNull(a.initialResult().block()))
  .repeatWhen(Repeat.onlyIf(repeatContext -> true)
  .exponentialBackoff(Duration.ofMillis(100),Duration.ofSeconds(100))
  .timeout(Duration.ofSeconds(30))).subscribe();

执行此操作时,出现以下异常:

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking,which is not supported in thread parallel-1

在浏览webflux文档时,我发现在Reactor线程中无法调用block()函数。这样的尝试会导致上述错误:

要克服我在下面尝试过的问题,

subscriptionQuery
 .flatMap(a -> a.initialResult())
 .filter(a -> Objects.nonNull(a))
 .repeatWhen(Repeat.onlyIf(repeatContext -> true)
 .exponentialBackoff(Duration.ofMillis(100),Duration.ofSeconds(100))
 .timeout(Duration.ofSeconds(30)))
 .subscribe();

但是它没有给我想要的结果,我想我错过了一些东西。任何人都可以建议实现此目标的正确方法。

谢谢。

解决方法

让我尝试在这个问题上为您提供帮助。

实际上,解决的最佳方法是在发送命令之前先对其进行订阅。 这样,您就知道订阅查询何时发出更新。

我们有一个code-samples可以为您提供帮助。

要对此进行扩展,您最感兴趣的部分应该是CommandController上的该部分:

public <U> Mono<U> sendAndReturnUpdate(Object command,SubscriptionQueryResult<?,U> result) {
    /* The trick here is to subscribe to initial results first,even it does not return any result
     Subscribing to initialResult creates a buffer for updates,even that we didn't subscribe for updates yet
     they will wait for us in buffer,after this we can safely send command,and then subscribe to updates */
    return Mono.when(result.initialResult())
               .then(Mono.fromCompletionStage(() -> commandGateway.send(command)))
               .thenMany(result.updates())
               .timeout(Duration.ofSeconds(5))
               .next()
               .doFinally(unused -> result.cancel());
    /* dont forget to close subscription query on the end and add a timeout */
}

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...