问题描述
我有如下代码:
testMethod(List<String> ids) {
List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>();
for(String id : ids) {
CompletableFuture<ResultOne> resultOne = AynchOne(id);
CompletableFuture<ResultTwo> resultTwo = AynchTwo(id);
CompletableFuture<ResultThree> resultThree = resultOne.thenCombine(resultTwo,(ResultOne a,ResultTwo b) -> computeCombinedResultThree(a,b));
resultThreeList.add(resultThree);
}
// PROCESS RESULTS HERE
}
class ResultOne {
boolean goodResult;
String id;
ResultOne(String promId) {
this.goodResult = true;
this.id = promId;
}
}
class ResultTwo {
boolean goodResult;
String id;
ResultTwo(String promId) {
this.goodResult = true;
this.id = promId;
}
class ResultThree() {
boolean goodResult;
String = id;
}
private ResultThree computeCombinedResultThree(ResultOne r1,ResultTwo r2) {
ResultThree resultThree = new ResultThree();
resultThree.id = r1.id;
resultThree.goodResult = r1.goodResult && r2.goodResult;
return resultThree;
}
,我需要能够将结果resultOne和resultTwo在一起进行“与”运算,以便对于每次迭代,在完成整个同步执行时,我都有一个(我想)数组或映射,可以随后在其中处理数组中的一个对象具有对应的ID,并具有该ID的true或false(表示来自单独对象的两个布尔值的AND与运算。
根据读者的反馈,我已经完成了代码,可以合并两个原始期货,并合并每次迭代的所有结果以获得整个期货循环。此时,我只需要处理结果即可。
我认为也许我需要另一个CompletableFuture?这可能是这样的(放在上面我有“ // PROCESS RESULTS HERE”的位置):
CompletableFuture<Void> future = resultThreeList
.thenRun(() -> forwardSuccesses(resultThreeList));
future.get();
forwardSuccesses()将通过resultThreeList进行迭代,将成功的ID转发给另一个进程,但不起诉这是怎么做的。 感谢任何想法。谢谢。
解决方法
这就是到目前为止你走的距离:
import sys
print(sys.executable)
现在您需要做的是convert this List<CompletableFuture<ResultThree>>
to a CompletableFuture<List<ResultThree>>
,一旦所有结果完成计算就将完成。
List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>(ids.size());
for (String id : ids) {
CompletableFuture<ResultOne> resultOne = aynchOne(id);
CompletableFuture<ResultTwo> resultTwo = aynchTwo(id);
CompletableFuture<ResultThree> resultThree = resultOne.thenCombine(resultTwo,this::computeCombinedResultThree);
resultThreeList.add(resultThree);
}
或类似
CompletableFuture<List<ResultThree>> combinedCompletables =
CompletableFuture.allOf(resultThreeList.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> resultThreeList.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
其中CompletableFuture<List<ResultThree>> combinedCompletables =
CompletableFuture.supplyAsync(() -> resultThreeList.stream().map(this::safeGet).collect(Collectors.toList()));
是仅调用safeGet
并捕获可能发生的异常的方法-由于这些异常,您不能只在lambda中调用future.get()
。
现在您可以使用get()
处理此列表:
thenAccept()
同样,捕获到的异常是由于调用了try {
combinedCompletables.thenAccept(this::forwardSuccesses).get(30,TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
。
请注意,我真的不明白为什么会有三个结果类,因为您所需要的-至少对于代码的这一部分-是id和结果状态。我将为此引入一个接口(get()
?),并且只能在该接口上工作。
在我看来,您不需要3个不同的ResultOne
ResultTwo
ResultThree
类,因为它们定义了相同的类型,因此我将它们替换为Result
。 / p>
假设您只想转发成功,我在isGoodResult()
类中添加了一个简短的Result
方法,用作流的谓词:
class Result {
public boolean goodResult;
public String id;
// ...
public boolean isGoodResult() {
return this.goodResult;
}
}
我还建议您摆脱循环,将其替换为流,以使您的代码更流畅。
如果forwardSuccess
是严格,接受List<Result>
,这就是我实现testMethod
的方式:
void testMethod(List<String> ids) {
final List<Result> results = ids.stream()
.parallel()
.map(id -> asynchOne(id).thenCombine(
asynchTwo(id),(r1,r2) -> computeCombinedResult(r1,r2)))
.map(CompletableFuture::join)
.filter(Result::isGoodResult)
.collect(Collectors.toList());
// PROCESS RESULTS HERE
forwardSuccesses(results);
}
如果forwardSuccess
是懒惰,则接受CompletableFuture<List<Result>>
:
void testMethod(List<String> ids) {
final List<CompletableFuture<Result>> futures = ids.stream()
.parallel()
.map(id -> asynchOne(id).thenCombine(
asynchTwo(id),r2)))
.collect(Collectors.toList());
final CompletableFuture<List<Result>> asyncResults =
CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new))
.thenApply(__ -> futures
.stream()
.map(CompletableFuture::join)
.filter(Result::isGoodResult)
.collect(Collectors.toList()));
// PROCESS RESULTS HERE
forwardSuccessesAsync(asyncResults);
}
,
在for循环中,您将立即获得CompletableFutures作为回报。在后台发生了一些魔术,您想等到两者都完成。
因此,在两个CompletableFutures都返回之后,通过调用带有长值和时间单位的CompletableFuture.get引起阻塞等待。如果仅调用不带任何参数的get,则将永远等待。
明智地选择超时和JDK。可能会发生JDK 8没有提供超时的情况。此外,不再支持JDK 8。 JDK 11现在已得到长期支持,最近的编译器不再将JDK 8作为目标。
我真的希望您阅读有关CompletableFuture的肮脏细节,以及它与Future的区别,尤其是。关于线程控制之类的取消。不知道CompletableFuture的基础提供程序,我还假设查询一个ID浪费资源,并且吞吐量非常有限。但这是一个单独的问题。