CompletableFuture<Boolean> 作为返回类型不能按预期工作

问题描述

我在 Java 中有一个方法,它应该返回一个 CompletableFuture 并且当它应该是 true 时我有一些问题。我的方法如下:

@Override
    public CompletableFuture<Boolean> process(String name,Object data,Object message) {

        switch (name) {
            case ("first"):

                Map<String,Long> finalAmount = (Map<String,Long>) data;
                finalAmount.forEach((id,amount) -> {
                     event.retrieveEvent(id)
                            .thenCompose(config -> {
                                update(id,amount,config);
                                return CompletableFuture.completedFuture(true);
                            });
                });
        }
        return CompletableFuture.completedFuture(false);
    }

问题是我有一张地图,我必须遍历它并为每个值做一些事情。即使它总是以“CompletableFuture.completedFuture(true)”进入部分 - 最后,它总是进入最后的“return CompletableFuture.completedFuture(false)”并返回false而不是true。

我能做什么,我应该如何重写我的方法,以便在地图元素完成后返回 true 并且对于每个元素,一切正常并且返回 true?

解决方法

代码永远不会停止等待案例 case("first") 的结果,所以它会安排一个 CompletableFuture 来计算它并继续 return CompletableFuture.completedFuture(false);

这是一种可能的解决方案:

@Override
public CompletableFuture<Boolean> process(String name,Object data,Object message) {
        switch (name) {
            case ("first"):
                CompletableFuture<Void> result = CompletableFuture.completedFuture( null );
                Map<String,Long> finalAmount = (Map<String,Long>) data;
                finalAmount.forEach((id,amount) -> {
                     result = result
                         .thenCompose(v -> event.retrieveEvent(id))
                         .thenAccept(config -> update(id,amount,config));
                });
                return result.thenApply(v -> Boolean.TRUE);
             
        }         
        return CompletableFuture.completedFuture(Boolean.FALSE);
    }

如果您想并行运行所有任务,另一种解决方案是:

@Override
public CompletableFuture<Boolean> process(String name,Object message) {
        switch (name) {
            case ("first"):
                Map<String,Long>) data;
                CompletableFuture<Void>[] futures = new CompletableFuture<>[finalAmount.size()];
                AtomicInteger index = new AtomicInteger();
                finalAmount.forEach((id,amount) -> {
                     futures[index.getAndIncrement()] = event
                         .retrieveEvent(id)
                         .thenAccept(config -> update(id,config));
                });
                return CompletableFuture
                    .allOf(futures)
                    .thenApply(v -> Boolean.TRUE);
        }         
        return CompletableFuture.completedFuture(Boolean.FALSE);
    }

假设一切都是线程安全的。