问题描述
CompletableFuture的行为使得分支链中的异常不一定会中断整个CompletableFuture链。
寻找克服方法。
此测试仅代表用法的一小部分。实际情况将有数以百万计的链。另一种情况是,导致节点故障的原因是“内存不足”情况,然后这种情况升级并加剧了这种情况,因为每个线程仍试图继续运行并最终也获得OOM。
Java版本:8
测试演示了这种情况: A =(B + C)*(D / E), 其中每个变量和操作均表示为CompletableFuture See pictorial representation
当一个部分有异常时,它不会中止其他异步部分,仅通知根(A)。
其中“根”是“ A”(或乘法结果),失败的节点是“ D / E”(例如div到零),而左分支上的节点B和C是仍然忙于异步活动。
import org.junit.Test;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.BiFunction;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class CompletableFutureTest {
static BiFunction<Double,Double,Double> ADD = (a,b) -> a + b;
static BiFunction<Double,Double> MULTIPLY = (a,b) -> a * b;
static BiFunction<Double,Double> DIVIDE = (a,b) -> (double) (a.intValue() / b.intValue());
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("HH:mm:ss");
Executor executor = Executors.newFixedThreadPool(8);
// A = (B + C) * (D / E)
@Test
public void cf() {
CompletableFuture<Double> d = getD();
CompletableFuture<Double> e = getE();
CompletableFuture<Double> b = getB();
CompletableFuture<Double> c = getC();
CompletableFuture<Double> a = null;
CompletableFuture<Double> firstArg = null;
CompletableFuture<Double> secondArg = null;
try {
firstArg = b.thenCombineAsync(c,ADD); // B + C
secondArg = d.thenCombine(e,DIVIDE); // D / E
a = firstArg.thenCombineAsync(secondArg,MULTIPLY); // (B + C) * (D / E)
firstArg.whenComplete((r,t) -> {
System.out.println(dtf.format(LocalDateTime.now()) + " Result (B + C): " + r);
});
secondArg.whenComplete((r,t) -> {
if (r != null) {
System.out.println(dtf.format(LocalDateTime.now()) + " Result (D / E): " + r);
}
if (t != null) {
System.out.println(dtf.format(LocalDateTime.now()) + " Exception occurred while calculating (D / E): "
+ t.getMessage());
}
});
a.join();
} catch (Exception ex) {
System.out.println(dtf.format(LocalDateTime.now()) + " Exception occurred while calculating A: " + ex.getMessage());
}
assertTrue(a.isCompletedExceptionally());
assertTrue(secondArg.isCompletedExceptionally());
assertFalse(firstArg.isCompletedExceptionally());
}
private CompletableFuture<Double> getB() {
return get(15,3000,"B");
}
private CompletableFuture<Double> getC() {
return get(5,4000,"C");
}
private CompletableFuture<Double> getD() {
return get(100,1000,"D");
}
private CompletableFuture<Double> getE() {
return get(0,500,"E");
}
private CompletableFuture<Double> get(final double val,final int time,final String name) {
return CompletableFuture.supplyAsync(() -> {
try {
System.out.println(dtf.format(LocalDateTime.now()) + " Start: " + name);
Thread.sleep(time);
System.out.println(dtf.format(LocalDateTime.now()) + " End: " + name);
} catch (InterruptedException e) {
e.printStackTrace();
}
return val;
},executor);
}
}
结果: 输入:B = 15,C = 5,D = 100,E = 0; 当另一个分支(D / E)中出现异常时,B和C的计算不会停止
14:59:08 Start: E
14:59:08 Start: C
14:59:08 Start: B
14:59:08 Start: D
14:59:08 End: E
14:59:09 End: D
14:59:09 Exception occurred while calculating (D / E): java.lang.ArithmeticException: / by zero
14:59:11 End: B
14:59:12 End: C
14:59:12 Result (B + C): 20.0
14:59:12 Exception occurred while calculating A: java.lang.ArithmeticException: / by zero
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)