问题描述
我是 CompletableFuture 的新手,我想要做的是我在 Spring Boot 项目下面有这个逻辑,我试图使用 CompletableFuture 将其转换为并行处理方法。
@Transaction
void serviceMethod{
for(Object obj : objList) //objlist can have 10000 objects and its a multi level composite objects
{
//Get corresponding entity obj from the database,if not found throw user exception
//Process i.e. change some fields
}
}
在上面的逻辑中,因为方法是用@Transaction 注释的,所以我没有显式调用 JPA save 来保存实体。
现在,我试图与上述逻辑做并行处理。
@Transaction
void serviceMethod{
for(Object obj : objList) //objlist can have 10000 objects and its a multi level composite objects
{
//Get corresponding entity obj from the database,if not found throw user exception
CompletableFuture<Optional<Obj>> optionalObjFuture = CompletableFuture.supplyAsync( () -> {//get the object from repository})
CompletableFuture<Obj> objFuture = optionalObjFuture.thenApply( optionalObj -> {
if(obj.isPresent()){
return obj.get();
}
else{
throw user exception;
}
})
////Process i.e. change some fields
}
}
现在的问题是
解决方法
对于 (1),您可以将这些 10_000
调用拆分为更小的批次。例如:
public static void main(String[] args) throws Exception {
int innerIterations = 5;
ExecutorService service = Executors.newFixedThreadPool(innerIterations);
for(int i=0;i<10;++i) {
List<CompletableFuture<String>> list = new ArrayList<>();
int j = 0;
for(;j<innerIterations;++j) {
CompletableFuture<Optional<String>> cf = CompletableFuture.supplyAsync(() -> Optional.of("a"),service);
CompletableFuture<String> cf2 = cf.thenApplyAsync(x -> {
if(x.isPresent()) {
return x.get();
} else {
throw new RuntimeException("test");
}
},Executors.newFixedThreadPool(2));
list.add(cf2);
}
i += j;
CompletableFuture<Void> all5 = CompletableFuture.allOf(list.toArray(new CompletableFuture[0]));
if(all5.isCompletedExceptionally()) {
// log exception,whatever
break;
} else {
List<String> set = list.stream().map(CompletableFuture::join).collect(Collectors.toList());
System.out.println(set);
}
}
}
这做了几件事:
- 将最初的
10
分成两批5
。 - 对这 5 个期货调用
CompletableFuture::allOf
。阅读文档以了解如果这些5
中至少有一个 future 失败,那么all5
也会失败。 - 如果
all5
没有失败(即all5.isCompletedExceptionally()
没有通过),则调用CompletableFuture::join
以获取所有结果。真的不会有“加入”,因为前一个allOff
已经在等待它们完成。
对于第二个问题 - 你不能。您将需要手动创建一个事务,但 Spring 使它变得相当容易。