短路 CompletionStage 链

问题描述

我正在使用 Java 8 并且我有一个我正在尝试运行的 CompletionStage 链。

我不想使用 join()get(),我想明确地完成 CompletionStage

我正在尝试运行两个数据库查询,第二个依赖于第一个查询的结果。我正在使用会话启动数据库事务,运行 write query1,write query2 并且只有当两者都成功时我才想提交事务或回滚它。 事务和会话是 Neo4j java API https://neo4j.com/docs/api/java-driver/current/org/neo4j/driver/async/AsyncSession.html#writeTransactionAsync-org.neo4j.driver.async.AsyncTransactionWork-

的一部分

在成功/失败运行两个查询后,我想关闭会话(标准数据库实践)

这是伪代码-

DB Session starts transaction
    run Write Query1
    run Write Query2
    
    if both are successful
       commit transaction
    else
       rollback transaction
close session

我想要实现的是,如果 query1/query2 失败,那么它应该只是回滚事务并关闭会话。

如果 Query1 的结果不正确(小于某个阈值),Query 1 也可以抛出 CustomException在这种情况下,它应该回滚事务。我正在为每个查询回滚 exceptionally 块中的事务。

快乐路径在下面的代码中工作正常,但是当我想抛出 CustomException 时,Query2 块没有被调用,甚至 Completable.allOf 也从未被调用

CompletableFuture<String> firstFuture = new CompletableFuture();
CompletableFuture<String> secondFuture = new CompletableFuture();
CompletableFuture<String> lastFuture = new CompletableFuture();


//Lambda that executes transaction
TransactionWork<CompletionStage<String>> runTransactionWork = transaction -> {

     //Write Query1
       transaction.runAsync("DB WRITE QUERY1") //Running Write Query 1
              .thenCompose(someFunctionThatReturnsCompletionStage)
              .thenApply(val -> {
                     //throw CustomException if value less then threshold
                     if(val < threshold){
                         throw new CustomException("Incorrect value found");
                     }else{
                       //if value is correct then complete future
                       firstFuture.complete(val);
                     }
                  firstQuery.complete(val);
              }).exceptionally(error -> {
                        //Since failure occured in Query1 want to roll back
                        transaction.rollbackAsync();
                        firstFuture.completeExceptionally(error);
                        throw new RuntimeException("There has been an error in first query " + error.getMessage());
                  });

         //after the first write query is done then run the second write query
         firstFuture.thenCompose(val -> transaction.runAsync("DB Write QUERY 2"))
                   .thenCompose(someFunctionThatReturnsCompletionStage)
                   .thenApply(val -> {                      
                       //if value is correct then complete
                       secondFuture.complete(val);
                     }
                   }).exceptionally(error -> {
                        //Incase of failure in Query2 want to roll back
                        transaction.rollbackAsync();
                        secondFuture.completeExceptionally(error);
                        throw new RuntimeException("There has been an error in second query " + error.getMessage());
                  });


   //wait for both to complete and then complete the last future
   CompletableFuture.allOf(firstFuture,secondFuture)
                    .handle((empty,ex) -> {
                        if(ex != null){
                            lastFuture.completeExceptionally(ex);
                        }else{
                            //commit the transaction
                            transaction.commitAsync();
                            lastFuture.complete("OK");
                        }

                        return lastFuture;
                    });

            return lastFuture;
}

 //Create a database session
 Session session = driver.session();

 //runTransactionWork is lambda that has access to transaction
 session.writeTransactionAsync(runTransactionWork)
      .handle((val,err) -> {
         if(val != null){
            session.closeAsync();
            //send message to some broker about success
         }else{
            //fail logic 
         }
      });


如何实现短路异常以确保事务回滚并直接进入会话中的异常块。

这些是我对根据不同用例调用代码块的观察,注意这些是基于我在代码中放置的调试点 -

  1. 快乐之路 - firstFuture(success) -> secondFuture(success) -> LastFuture (success) -> 会话块成功调用(工作正常)
  2. First Future 失败 - firstFuture(由于异常而失败)-> secondFuture(从未调用过)-> LastFuture(从未调用过)-> 会话块失败(从未调用过)
  3. Second Future 失败 - firstFuture(success) -> secondFuture(因异常失败) -> LastFuture(never call) -> session block failure(never call)

我希望 #2 和 #3 也能正常工作,并且应该回滚相应的事务并关闭会话。

我的问题是,为什么 handleallOf 中的 exeption 部分在未来的 completesExceptionally 中没有被调用

解决方法

当您抛出 CustomException 时,firstFuture 未完成。事实上,它没有任何反应。因为还没有完成(成功),所以:

firstFuture.thenCompose...

不会被执行。 thenCompose 的文档说:

当这个阶段正常完成时,给定的函数以这个阶段的结果作为参数被调用...

由于情况并非如此,该代码显然不会被触发。因此,secondFuture 也不会发生任何变化,因此 CompletableFuture::allOf 必须完全为零。可能是一个简化的例子会有所帮助:

public class CF {

  public static void main(String[] args) {
    CompletableFuture<Void> one = CompletableFuture.runAsync(CF::db1);
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
    System.out.println(one.isCompletedExceptionally());

    CompletableFuture<Void> two = one.thenRun(CF::db2);

    System.out.println("first is done : " + FIRST_FUTURE.isDone());
    System.out.println("second is done : " + SECOND_FUTURE.isDone());
    CompletableFuture.allOf(FIRST_FUTURE,SECOND_FUTURE).thenRun(() -> {
      System.out.println("allOf");
    });
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
  }

  private static final boolean FAIL = true;
  private static final CompletableFuture<String> FIRST_FUTURE = new CompletableFuture<>();
  private static final CompletableFuture<String> SECOND_FUTURE = new CompletableFuture<>();

  private static void db1() {
    if(FAIL) {
      throw new RuntimeException("failed one");
    } else {
      FIRST_FUTURE.complete("42");
    }
  }

  private static void db2() {
    System.out.println("Running");
    SECOND_FUTURE.complete("42");
  }

}

如果你运行这个,你会发现没有打印任何东西...


不幸的是,我不熟悉 Neo4j,但您很可能可以根据需要调整此示例:

public class CF {

  public static void main(String[] args) {
    CompletableFuture<Void> one = CompletableFuture.runAsync(CF::db1);

    CompletableFuture<Void> terminal =
    one.whenComplete((ok,th) -> {
      if(th != null || FIRST_FUTURE.isCompletedExceptionally()) {
        // no need to schedule the second one,need to rollback whatever the first one did
        // transaction.rollbackAsync();
        System.out.println("rollback because first one failed");
        LAST_FUTURE.completeExceptionally(new RuntimeException("because first one failed"));
      } else {
        CompletableFuture<Void> two = CompletableFuture.runAsync(CF::db2);
        two.whenComplete((ok2,th2) -> {
          if(th2 != null || SECOND_FUTURE.isCompletedExceptionally()) {
            System.out.println("rollback because second one failed");
            // transaction.rollbackAsync();
            LAST_FUTURE.completeExceptionally(new RuntimeException("because second one failed"));
          } else {
            LAST_FUTURE.complete("OK");
          }
        });
      }
    });

    // simulate that someone will call this
    terminal.join();
    System.out.println(LAST_FUTURE.join());

  }

  private static final boolean FAIL_ONE = false;
  private static final boolean FAIL_TWO = true;
  private static final CompletableFuture<String> FIRST_FUTURE = new CompletableFuture<>();
  private static final CompletableFuture<String> SECOND_FUTURE = new CompletableFuture<>();
  private static final CompletableFuture<String> LAST_FUTURE = new CompletableFuture<>();

  private static void db1() {
    if(FAIL_ONE) {
      LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
      RuntimeException ex = new RuntimeException("failed one");;
      FIRST_FUTURE.completeExceptionally(ex);
    } else {
      FIRST_FUTURE.complete("42");
    }
  }

  private static void db2() {
    if(FAIL_TWO) {
      LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
      RuntimeException ex = new RuntimeException("failed one");;
      SECOND_FUTURE.completeExceptionally(ex);
    } else {
      SECOND_FUTURE.complete("42");
    }
  }

}