问题描述
我的Java代码中有一个异步链,我想在某个超时后停止 所以我创建了一个带有一些线程的threadPool,并这样称呼CompletableFuture
ExecutorService pool = Executors.newFixedThreadPool(10);
比我有一种循环方法,一旦所有CompletableFutures完成,它就会再次从db中加载数据并执行一些任务
CompletableFuture<MyObject> futureTask =
CompletableFuture.supplyAsync(() -> candidate,pool)
.thenApply(Task1::doWork).thenApply(Task2::doWork).thenApply(Task3::doWork)
.thenApply(Task4::doWork).thenApply(Task5::doWork).orTimeout(30,TimeUnit.SECONDS)
.thenApply(Task6::doWork).orTimeout(30,TimeUnit.SECONDS)
.exceptionally(ExceptionHandlerService::handle);
我的问题出在 task6 中,该任务非常繁琐(其网络连接任务有时会永远挂起) 我注意到30秒后我的orTimeout被正确触发,但是运行Task6的线程仍在运行
经过几次这样的循环后,我所有的线程都耗尽了,我的应用程序死了
如何在超时后取消池中正在运行的线程? (不调用pool.shutdown())
更新 * 在主线程中,我做了一个简单的检查,如下所示
for (int i = TIME_OUT_SECONDS; i >= 0; i--) {
unfinishedTasks = handleFutureTasks(unfinishedTasks,totalBatchSize);
if(unfinishedTasks.isEmpty()) {
break;
}
if(i==0) {
//handle cancelation of the tasks
for(CompletableFuture<ComplianceCandidate> task: unfinishedTasks) {
**task.cancel(true);**
log.error("Reached timeout on task,is canceled: {}",task.isCancelled());
}
break;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception ex) {
}
}
我看到的是几个周期后,所有任务都抱怨超时... 在最初的1-2个周期中,我仍然得到预期的响应(尽管有线程来处理它)
我仍然觉得线程池已耗尽
解决方法
我知道您说的是不呼叫pool.shutDown
,但是根本没有其他方法。但是,当您查看阶段时,它们将在“附加”它们的线程(添加了thenApply
)中运行,或者在您定义的池中运行。也许一个例子更有意义。
public class SO64743332 {
static ExecutorService pool = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> dbCall(),pool);
//simulateWork(4);
CompletableFuture<String> f2 = f1.thenApply(x -> {
System.out.println(Thread.currentThread().getName());
return transformationOne(x);
});
CompletableFuture<String> f3 = f2.thenApply(x -> {
System.out.println(Thread.currentThread().getName());
return transformationTwo(x);
});
f3.join();
}
private static String dbCall() {
simulateWork(2);
return "a";
}
private static String transformationOne(String input) {
return input + "b";
}
private static String transformationTwo(String input) {
return input + "b";
}
private static void simulateWork(int seconds) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
} catch (InterruptedException e) {
System.out.println("Interrupted!");
e.printStackTrace();
}
}
}
上面代码的关键点是:simulateWork(4);
。运行带有注释的代码,然后取消注释。查看什么线程实际上将执行所有这些thenApply
。它是池中的main
或 same 线程,这意味着尽管您定义了一个池-它只是该池中的一个线程将执行所有这些阶段。
在这种情况下,您可以定义一个将执行所有这些阶段的单线程执行器(比如说在一个方法内部)。这样,您可以控制何时调用shutDownNow
并可能中断正在运行的任务(如果您的代码响应中断)。这是一个模拟示例:
public class SO64743332 {
public static void main(String[] args) {
execute();
}
public static void execute() {
ExecutorService pool = Executors.newSingleThreadExecutor();
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> dbCall(),pool);
CompletableFuture<String> cf2 = cf1.thenApply(x -> transformationOne(x));
// give enough time for transformationOne to start,but not finish
simulateWork(2);
try {
CompletableFuture<String> cf3 = cf2.thenApply(x -> transformationTwo(x))
.orTimeout(4,TimeUnit.SECONDS);
cf3.get(10,TimeUnit.SECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
pool.shutdownNow();
}
}
private static String dbCall() {
System.out.println("Started DB call");
simulateWork(1);
System.out.println("Done with DB call");
return "a";
}
private static String transformationOne(String input) {
System.out.println("Started work");
simulateWork(10);
System.out.println("Done work");
return input + "b";
}
private static String transformationTwo(String input) {
System.out.println("Started transformation two");
return input + "b";
}
private static void simulateWork(int seconds) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
} catch (InterruptedException e) {
System.out.println("Interrupted!");
e.printStackTrace();
}
}
}
运行此命令,您应该注意到transformationOne
已开始,但是由于shutDownNow
而被中断。
这样做的缺点很明显,execute
的每次调用都会创建一个新的线程池...