如何在for循环中处理completableFuture中的异常

问题描述

我是 CompletableFuture 的新手,我想要做的是我在 Spring Boot 项目下面有这个逻辑,我试图使用 CompletableFuture 将其转换为并行处理方法

@Transaction
void serviceMethod{
    for(Object obj : objList) //objlist can have 10000 objects and its a multi level composite objects 
    {
        //Get corresponding entity obj from the database,if not found throw user exception 
        //Process i.e. change some fields 
        

    }
}

在上面的逻辑中,因为方法是用@Transaction 注释的,所以我没有显式调用 JPA save 来保存实体。

现在,我试图与上述逻辑做并行处理。

@Transaction
void serviceMethod{
    for(Object obj : objList) //objlist can have 10000 objects and its a multi level composite objects 
    {
    
        //Get corresponding entity obj from the database,if not found throw user exception 
        CompletableFuture<Optional<Obj>> optionalObjFuture = CompletableFuture.supplyAsync( () -> {//get the object from repository})
        
        
        CompletableFuture<Obj> objFuture = optionalObjFuture.thenApply( optionalObj -> {
                            if(obj.isPresent()){
                                return obj.get();
                            }
                            else{
                                throw user exception;
                            }
                        })
                        
        ////Process i.e. change some fields 

    }
}

现在的问题是

  1. 当出现异常时,我必须遵循什么方法来中断 for 循环?
  2. 在这种情况下如何处理事务。有没有什么方法可以处理事务,而无需对存储在数据结构中的已处理对象调用 saveAll?

解决方法

对于 (1),您可以将这些 10_000 调用拆分为更小的批次。例如:

public static void main(String[] args) throws Exception {

    int innerIterations = 5;
    ExecutorService service = Executors.newFixedThreadPool(innerIterations);

    for(int i=0;i<10;++i) {

        List<CompletableFuture<String>> list = new ArrayList<>();
        int j = 0;
        for(;j<innerIterations;++j) {
            CompletableFuture<Optional<String>> cf = CompletableFuture.supplyAsync(() -> Optional.of("a"),service);


            CompletableFuture<String> cf2 = cf.thenApplyAsync(x -> {
                if(x.isPresent()) {
                    return x.get();
                } else {
                    throw new RuntimeException("test");
                }
            },Executors.newFixedThreadPool(2));

            list.add(cf2);
        }

        i += j;

        CompletableFuture<Void> all5 = CompletableFuture.allOf(list.toArray(new CompletableFuture[0]));
        if(all5.isCompletedExceptionally()) {
            // log exception,whatever
            break;
        } else {
            List<String> set = list.stream().map(CompletableFuture::join).collect(Collectors.toList());
            System.out.println(set);
        }

    }

}

这做了几件事:

  • 将最初的 10 分成两批 5
  • 对这 5 个期货调用 CompletableFuture::allOf。阅读文档以了解如果这些 5 中至少有一个 future 失败,那么 all5 也会失败。
  • 如果 all5 没有失败(即 all5.isCompletedExceptionally() 没有通过),则调用 CompletableFuture::join 以获取所有结果。真的不会有“加入”,因为前一个 allOff 已经在等待它们完成。

对于第二个问题 - 你不能。您将需要手动创建一个事务,但 Spring 使它变得相当容易。