超时后在Java CompletableFuture中停止线程

问题描述

我的Java代码中有一个异步链,我想在某个超时后停止 所以我创建了一个带有一些线程的threadPool,并这样称呼CompletableFuture

ExecutorService pool = Executors.newFixedThreadPool(10);

比我有一种循环方法,一旦所有CompletableFutures完成,它就会再次从db中加载数据并执行一些任务

CompletableFuture<MyObject> futureTask =
                CompletableFuture.supplyAsync(() -> candidate,pool)
                .thenApply(Task1::doWork).thenApply(Task2::doWork).thenApply(Task3::doWork)
                .thenApply(Task4::doWork).thenApply(Task5::doWork).orTimeout(30,TimeUnit.SECONDS)
                .thenApply(Task6::doWork).orTimeout(30,TimeUnit.SECONDS)
                .exceptionally(ExceptionHandlerService::handle);

我的问题出在 task6 中,该任务非常繁琐(其网络连接任务有时会永远挂起) 我注意到30秒后我的orTimeout被正确触发,但是运行Task6的线程仍在运行

经过几次这样的循环后,我所有的线程都耗尽了,我的应用程序死了

如何在超时后取消池中正在运行的线程? (不调用pool.shutdown())

更新 * 在主线程中,我做了一个简单的检查,如下所示

for (int i = TIME_OUT_SECONDS; i >= 0; i--) {
                unfinishedTasks = handleFutureTasks(unfinishedTasks,totalBatchSize);
                if(unfinishedTasks.isEmpty()) {
                    break;
                }
                if(i==0) {
                    //handle cancelation of the tasks
                    for(CompletableFuture<ComplianceCandidate> task: unfinishedTasks) {
                        **task.cancel(true);**
                        log.error("Reached timeout on task,is canceled: {}",task.isCancelled());
                    }
                    break;
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception ex) {
                }
            }

我看到的是几个周期后,所有任务都抱怨超时... 在最初的1-2个周期中,我仍然得到预期的响应(尽管有线程来处理它)

我仍然觉得线程池已耗尽

解决方法

我知道您说的是呼叫pool.shutDown,但是根本没有其他方法。但是,当您查看阶段时,它们将在“附加”它们的线程(添加了thenApply)中运行,或者在您定义的池中运行。也许一个例子更有意义。

public class SO64743332 {

    static ExecutorService pool = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {

        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> dbCall(),pool);

        //simulateWork(4);

        CompletableFuture<String> f2 = f1.thenApply(x -> {
            System.out.println(Thread.currentThread().getName());
            return transformationOne(x);
        });

        CompletableFuture<String> f3 = f2.thenApply(x -> {
            System.out.println(Thread.currentThread().getName());
            return transformationTwo(x);
        });

        f3.join();
    }

    private static String dbCall() {
        simulateWork(2);
        return "a";
    }

    private static String transformationOne(String input) {
        return input + "b";
    }

    private static String transformationTwo(String input) {
        return input + "b";
    }

    private static void simulateWork(int seconds) {
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
        } catch (InterruptedException e) {
            System.out.println("Interrupted!");
            e.printStackTrace();
        }
    }
}

上面代码的关键点是:simulateWork(4);。运行带有注释的代码,然后取消注释。查看什么线程实际上将执行所有这些thenApply。它是池中的main same 线程,这意味着尽管您定义了一个池-它只是该池中的一个线程将执行所有这些阶段。

在这种情况下,您可以定义一个将执行所有这些阶段的单线程执行器(比如说在一个方法内部)。这样,您可以控制何时调用shutDownNow并可能中断正在运行的任务(如果您的代码响应中断)。这是一个模拟示例:

public class SO64743332 {

    public static void main(String[] args) {
        execute();
    }


    public static void execute() {

        ExecutorService pool = Executors.newSingleThreadExecutor();

        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> dbCall(),pool);
        CompletableFuture<String> cf2 = cf1.thenApply(x -> transformationOne(x));

        // give enough time for transformationOne to start,but not finish
        simulateWork(2);

        try {
            CompletableFuture<String> cf3 = cf2.thenApply(x -> transformationTwo(x))
                                               .orTimeout(4,TimeUnit.SECONDS);
            cf3.get(10,TimeUnit.SECONDS);
        } catch (ExecutionException | InterruptedException | TimeoutException e) {
            pool.shutdownNow();
        }

    }

    private static String dbCall() {
        System.out.println("Started DB call");
        simulateWork(1);
        System.out.println("Done with DB call");
        return "a";
    }

    private static String transformationOne(String input) {
        System.out.println("Started work");
        simulateWork(10);
        System.out.println("Done work");
        return input + "b";
    }

    private static String transformationTwo(String input) {
        System.out.println("Started transformation two");
        return input + "b";
    }

    private static void simulateWork(int seconds) {
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
        } catch (InterruptedException e) {
            System.out.println("Interrupted!");
            e.printStackTrace();
        }
    }
}

运行此命令,您应该注意到transformationOne已开始,但是由于shutDownNow而被中断。

这样做的缺点很明显,execute的每次调用都会创建一个新的线程池...

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...