分支链中的异常不会中断整个CompletableFuture链

问题描述

CompletableFuture的行为使得分支链中的异常不一定会中断整个CompletableFuture链。

寻找克服方法。

此测试仅代表用法的一小部分。实际情况将有数以百万计的链。另一种情况是,导致节点故障的原因是“内存不足”情况,然后这种情况升级并加剧了这种情况,因为每个线程仍试图继续运行并最终也获得OOM。

Java版本:8

测试演示了这种情况: A =(B + C)*(D / E), 其中每个变量和操作均表示为CompletableFuture See pictorial representation

当一个部分有异常时,它不会中止其他异步部分,仅通知根(A)。

其中“根”是“ A”(或乘法结果),失败的节点是“ D / E”(例如div到零),而左分支上的节点B和C是仍然忙于异步活动。

import org.junit.Test;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.BiFunction;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class CompletableFutureTest {

    static BiFunction<Double,Double,Double> ADD = (a,b) -> a + b;
    static BiFunction<Double,Double> MULTIPLY = (a,b) -> a * b;
    static BiFunction<Double,Double> DIVIDE = (a,b) -> (double) (a.intValue() / b.intValue());

    DateTimeFormatter dtf = DateTimeFormatter.ofPattern("HH:mm:ss");

    Executor executor = Executors.newFixedThreadPool(8);

    // A =  (B + C) * (D / E)
    @Test
    public void cf() {
        CompletableFuture<Double> d = getD();
        CompletableFuture<Double> e = getE();
        CompletableFuture<Double> b = getB();
        CompletableFuture<Double> c = getC();
        CompletableFuture<Double> a = null;
        CompletableFuture<Double> firstArg = null;
        CompletableFuture<Double> secondArg = null;
        try {

            firstArg = b.thenCombineAsync(c,ADD); // B + C
            secondArg = d.thenCombine(e,DIVIDE); // D / E

            a = firstArg.thenCombineAsync(secondArg,MULTIPLY); // (B + C) * (D / E)

            firstArg.whenComplete((r,t) -> {
                System.out.println(dtf.format(LocalDateTime.now()) + " Result (B + C): " + r);
            });

            secondArg.whenComplete((r,t) -> {
                if (r != null) {
                    System.out.println(dtf.format(LocalDateTime.now()) + " Result (D / E): " + r);
                }
                if (t != null) {
                    System.out.println(dtf.format(LocalDateTime.now()) + " Exception occurred while calculating (D / E): "
                            + t.getMessage());
                }
            });

            a.join();
        } catch (Exception ex) {
            System.out.println(dtf.format(LocalDateTime.now()) + " Exception occurred while calculating A: " + ex.getMessage());
        }

        assertTrue(a.isCompletedExceptionally());
        assertTrue(secondArg.isCompletedExceptionally());
        assertFalse(firstArg.isCompletedExceptionally());
    }

    private CompletableFuture<Double> getB() {
        return get(15,3000,"B");
    }

    private CompletableFuture<Double> getC() {
        return get(5,4000,"C");
    }

    private CompletableFuture<Double> getD() {
        return get(100,1000,"D");
    }

    private CompletableFuture<Double> getE() {
        return get(0,500,"E");
    }

    private CompletableFuture<Double> get(final double val,final int time,final String name) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(dtf.format(LocalDateTime.now()) + " Start: " + name);
                Thread.sleep(time);
                System.out.println(dtf.format(LocalDateTime.now()) + " End: " + name);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return val;
        },executor);
    }

}

结果: 输入:B = 15,C = 5,D = 100,E = 0; 当另一个分支(D / E)中出现异常时,B和C的计算不会停止

14:59:08 Start: E
14:59:08 Start: C
14:59:08 Start: B
14:59:08 Start: D
14:59:08 End: E
14:59:09 End: D
14:59:09 Exception occurred while calculating (D / E): java.lang.ArithmeticException: / by zero
14:59:11 End: B
14:59:12 End: C
14:59:12 Result (B + C): 20.0
14:59:12 Exception occurred while calculating A: java.lang.ArithmeticException: / by zero

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...