CompletableFuture 按请求组分组

问题描述

我想向 API Rest 发送 10 万个请求。所以我有一个循环,我以同步方式发送请求(完成所有请求需要几分钟时间)。

但是最好以异步方式发送它们,这样该过程可以花费更少的时间,因为目的地可以扩展它们的副本。

我已经阅读了 CompletableFuture 以及如何将所有结果与 allOf() 方法结合起来。

但是有没有办法以 1K 请求为一组进行调用?因为如果我并行发送所有请求,目标将关闭连接。

所以我想以 1K 并行请求为一组发送请求,当每个组完成时,对响应做一些事情(我不需要对响应进行排序)

谢谢。

解决方法

是的,您可以使用可完成的期货,而且操作起来非常简单。

假设您有一个 HTTP 客户端,它在请求时返回 CompletableFuture。

package com.example.demo.cf;

import com.example.demo.dto.Response;

import java.util.concurrent.CompletableFuture;

public interface HttpClient {
    CompletableFuture<Response> makeHttp();
}

并提供对请求进行批处理和排序的服务。

package com.example.demo.cf;

import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;

public class CfBatchingService {

    private static final int BATCH_SIZE = 100;

    private HttpClient httpClient;

    public void sendRequests() {
        CompletableFuture<Void> cf = CompletableFuture.completedFuture(null);
        for(int grp = 0; grp < 10; grp++) {
            cf = cf.thenCompose(unused -> CompletableFuture.allOf(IntStream.range(0,BATCH_SIZE).mapToObj(idx -> httpClient.makeHttp()).toArray(CompletableFuture[]::new)));
        }

        cf.whenComplete((unused,throwable) -> {
           System.out.println("All requested executed");
        });
    }

}

所以,这里唯一需要的是使用 thenCompose 方法,该方法期望从 lambda 获得 CompletableFuture。

要将请求分组并创建单个 CompletableFuture,您可以使用方法 CompletableFuture.allOf