问题描述
我正在使用 Java 8 并且我有一个我正在尝试运行的 CompletionStage 链。
我不想使用 join()
或 get()
,我想明确地完成 CompletionStage
。
我正在尝试运行两个数据库查询,第二个依赖于第一个查询的结果。我正在使用会话启动数据库事务,运行 write query1,write query2 并且只有当两者都成功时我才想提交事务或回滚它。 事务和会话是 Neo4j java API https://neo4j.com/docs/api/java-driver/current/org/neo4j/driver/async/AsyncSession.html#writeTransactionAsync-org.neo4j.driver.async.AsyncTransactionWork-
的一部分这是伪代码-
DB Session starts transaction
run Write Query1
run Write Query2
if both are successful
commit transaction
else
rollback transaction
close session
我想要实现的是,如果 query1/query2 失败,那么它应该只是回滚事务并关闭会话。
如果 Query1 的结果不正确(小于某个阈值),Query 1 也可以抛出 CustomException
。在这种情况下,它应该回滚事务。我正在为每个查询回滚 exceptionally
块中的事务。
快乐路径在下面的代码中工作正常,但是当我想抛出 CustomException
时,Query2 块没有被调用,甚至 Completable.allOf
也从未被调用。
CompletableFuture<String> firstFuture = new CompletableFuture();
CompletableFuture<String> secondFuture = new CompletableFuture();
CompletableFuture<String> lastFuture = new CompletableFuture();
//Lambda that executes transaction
TransactionWork<CompletionStage<String>> runTransactionWork = transaction -> {
//Write Query1
transaction.runAsync("DB WRITE QUERY1") //Running Write Query 1
.thenCompose(someFunctionThatReturnsCompletionStage)
.thenApply(val -> {
//throw CustomException if value less then threshold
if(val < threshold){
throw new CustomException("Incorrect value found");
}else{
//if value is correct then complete future
firstFuture.complete(val);
}
firstQuery.complete(val);
}).exceptionally(error -> {
//Since failure occured in Query1 want to roll back
transaction.rollbackAsync();
firstFuture.completeExceptionally(error);
throw new RuntimeException("There has been an error in first query " + error.getMessage());
});
//after the first write query is done then run the second write query
firstFuture.thenCompose(val -> transaction.runAsync("DB Write QUERY 2"))
.thenCompose(someFunctionThatReturnsCompletionStage)
.thenApply(val -> {
//if value is correct then complete
secondFuture.complete(val);
}
}).exceptionally(error -> {
//Incase of failure in Query2 want to roll back
transaction.rollbackAsync();
secondFuture.completeExceptionally(error);
throw new RuntimeException("There has been an error in second query " + error.getMessage());
});
//wait for both to complete and then complete the last future
CompletableFuture.allOf(firstFuture,secondFuture)
.handle((empty,ex) -> {
if(ex != null){
lastFuture.completeExceptionally(ex);
}else{
//commit the transaction
transaction.commitAsync();
lastFuture.complete("OK");
}
return lastFuture;
});
return lastFuture;
}
//Create a database session
Session session = driver.session();
//runTransactionWork is lambda that has access to transaction
session.writeTransactionAsync(runTransactionWork)
.handle((val,err) -> {
if(val != null){
session.closeAsync();
//send message to some broker about success
}else{
//fail logic
}
});
如何实现短路异常以确保事务回滚并直接进入会话中的异常块。
这些是我对根据不同用例调用的代码块的观察,注意这些是基于我在代码中放置的调试点 -
- 快乐之路 - firstFuture(success) -> secondFuture(success) -> LastFuture (success) -> 会话块成功调用(工作正常)
- First Future 失败 - firstFuture(由于异常而失败)-> secondFuture(从未调用过)-> LastFuture(从未调用过)-> 会话块失败(从未调用过)
- Second Future 失败 - firstFuture(success) -> secondFuture(因异常失败) -> LastFuture(never call) -> session block failure(never call)
我希望 #2 和 #3 也能正常工作,并且应该回滚相应的事务并关闭会话。
我的问题是,为什么 handle
的 allOf
中的 exeption 部分在未来的 completesExceptionally
中没有被调用?
解决方法
当您抛出 CustomException
时,firstFuture
未完成。事实上,它没有任何反应。因为还没有完成(成功),所以:
firstFuture.thenCompose...
不会被执行。 thenCompose
的文档说:
当这个阶段正常完成时,给定的函数以这个阶段的结果作为参数被调用...
由于情况并非如此,该代码显然不会被触发。因此,secondFuture
也不会发生任何变化,因此 CompletableFuture::allOf
必须完全为零。可能是一个简化的例子会有所帮助:
public class CF {
public static void main(String[] args) {
CompletableFuture<Void> one = CompletableFuture.runAsync(CF::db1);
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
System.out.println(one.isCompletedExceptionally());
CompletableFuture<Void> two = one.thenRun(CF::db2);
System.out.println("first is done : " + FIRST_FUTURE.isDone());
System.out.println("second is done : " + SECOND_FUTURE.isDone());
CompletableFuture.allOf(FIRST_FUTURE,SECOND_FUTURE).thenRun(() -> {
System.out.println("allOf");
});
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
}
private static final boolean FAIL = true;
private static final CompletableFuture<String> FIRST_FUTURE = new CompletableFuture<>();
private static final CompletableFuture<String> SECOND_FUTURE = new CompletableFuture<>();
private static void db1() {
if(FAIL) {
throw new RuntimeException("failed one");
} else {
FIRST_FUTURE.complete("42");
}
}
private static void db2() {
System.out.println("Running");
SECOND_FUTURE.complete("42");
}
}
如果你运行这个,你会发现没有打印任何东西...
不幸的是,我不熟悉 Neo4j
,但您很可能可以根据需要调整此示例:
public class CF {
public static void main(String[] args) {
CompletableFuture<Void> one = CompletableFuture.runAsync(CF::db1);
CompletableFuture<Void> terminal =
one.whenComplete((ok,th) -> {
if(th != null || FIRST_FUTURE.isCompletedExceptionally()) {
// no need to schedule the second one,need to rollback whatever the first one did
// transaction.rollbackAsync();
System.out.println("rollback because first one failed");
LAST_FUTURE.completeExceptionally(new RuntimeException("because first one failed"));
} else {
CompletableFuture<Void> two = CompletableFuture.runAsync(CF::db2);
two.whenComplete((ok2,th2) -> {
if(th2 != null || SECOND_FUTURE.isCompletedExceptionally()) {
System.out.println("rollback because second one failed");
// transaction.rollbackAsync();
LAST_FUTURE.completeExceptionally(new RuntimeException("because second one failed"));
} else {
LAST_FUTURE.complete("OK");
}
});
}
});
// simulate that someone will call this
terminal.join();
System.out.println(LAST_FUTURE.join());
}
private static final boolean FAIL_ONE = false;
private static final boolean FAIL_TWO = true;
private static final CompletableFuture<String> FIRST_FUTURE = new CompletableFuture<>();
private static final CompletableFuture<String> SECOND_FUTURE = new CompletableFuture<>();
private static final CompletableFuture<String> LAST_FUTURE = new CompletableFuture<>();
private static void db1() {
if(FAIL_ONE) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
RuntimeException ex = new RuntimeException("failed one");;
FIRST_FUTURE.completeExceptionally(ex);
} else {
FIRST_FUTURE.complete("42");
}
}
private static void db2() {
if(FAIL_TWO) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
RuntimeException ex = new RuntimeException("failed one");;
SECOND_FUTURE.completeExceptionally(ex);
} else {
SECOND_FUTURE.complete("42");
}
}
}