使用 spring webflux 使用第一次调用的结果进行多次异步调用

问题描述

我需要进行异步调用并使用其中存在的一些值对同一服务进行多次调用。将这些调用的响应与第一个响应合并并返回。

例如,当我进行第一次调用时,我得到以下 JSON,其中包含一个 ID 列表。现在,我必须使用这些 ID 对服务进行多次调用,并列出它们的响应,然后通过将其附加到同一个 JSON 中将其发送到下游。

 {“id”: 145,“object”:[{“id”:111}]
    }

我尝试过使用 zipwhen 和

Flux.fromIterable(userIds).parallel().runOn(Schedulers.elastic()).flatMap()

但结果列表总是为空或为空。我们怎样才能做到这一点?我在这里遗漏了什么吗?

编辑 1: 使用 Flux.fromIterable 解决了它。阅读了更多关于它的信息,终于明白了它的用途并解决了它。下面的方法从列表中取出一项并调用内部方法,该方法调用多个 API:

return Flux.fromIterable(somelist).flatMap(listItem -> {
    return someMethodToCallAnotherAPIUsingZipwith(listItem);
    }).collectList();

内部方法: 它调用一个 API,将其结果传递给 zipwith,使用这个结果我们可以调用一个 API,或者我们可以简单地将它与它的响应一起使用。

private Mono<Object> someMethodToCallAnotherAPIUsingZipwith(String listItem) {
        return authService.getAccesstoken().flatMap(accesstoken ->
                webClient.get().uri(builder -> builder.path("/path").build(listItem))
                        .header(HttpHeaders.AUTHORIZATION,accesstoken)
                        .retrieve()
                        .toEntity(Entity.class).log()
                        .flatMap(entity -> {
                            //manipulate your response or create new object using it
                            return Mono.just(entity);
                        }).zipwhen(consent -> webClient.get().uri(builder -> builder.path("/otherpath").build(listItem))
                        .header(HttpHeaders.AUTHORIZATION,accesstoken)
                        .retrieve().bodyToMono(Entity.class).log()
                        .flatMap(entity -> {
                            //example
                            listItem = entity.getString();
                            return Mono.just(listItem);
                        }),(string1,string2) -> string1 + string2));
    }

解决方法

private Mono<Object> someMethodToCallAnotherAPIUsingZipWith(String listItem) {
    return authService.getAccessToken().flatMap(accessToken ->
            webClient.get().uri(builder -> builder.path("/path").build(listItem))
                    .header(HttpHeaders.AUTHORIZATION,accessToken)
                    .retrieve()
                    .toEntity(Entity.class).log()
                    .flatMap(entity -> {
                        //manipulate your response or create new object using it
                        return Mono.just(entity);
                    }).zipWhen(consent -> webClient.get().uri(builder -> builder.path("/otherpath").build(listItem))
                    .header(HttpHeaders.AUTHORIZATION,accessToken)
                    .retrieve().bodyToMono(Entity.class).log()
                    .flatMap(entity -> {
                        //example
                        listItem = entity.getString();
                        return Mono.just(listItem);
                    }),(string1,string2) -> string1 + string2));
}