问题描述
我正在使用Java 8,我想知道对3个异步作业强制执行超时的推荐方法,我将执行该异步作业并从将来检索结果。请注意,所有3个作业的超时都相同。如果工作超出时间限制,我也想取消。
我在想这样的事情:
// Submit jobs async
List<CompletableFuture<String>> futures = submitJobs(); // Uses CompletableFuture.supplyAsync
List<CompletableFuture<Void>> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
try {
allFutures.get(100L,TimeUnit.MILLISECONDS);
} catch (TimeoutException e){
for(CompletableFuture f : future) {
if(!f.isDone()) {
/*
From Java Doc:
@param mayInterruptIfRunning this value has no effect in this
* implementation because interrupts are not used to control
* processing.
*/
f.cancel(true);
}
}
}
List<String> output = new ArrayList<>();
for(CompeletableFuture fu : futures) {
if(!fu.isCancelled()) { // Is this needed?
output.add(fu.join());
}
}
return output;
- 这样的事情会起作用吗?有更好的方法吗?
- 如何正确取消未来? Java doc说,线程不能中断吗?因此,如果我要取消将来并调用
join()
,由于线程不会被中断,我会立即得到结果吗? - 是否建议在等待结束后使用join()或get()获得结果?
解决方法
值得注意的是,在CompletableFuture上调用cancel实际上与在当前阶段调用completeException相同。取消不会影响先前的阶段。这样说:
- 原则上,在不需要上游取消的情况下,类似的事情会起作用(从伪代码的角度来看,上述语法有错误)。
- CompletableFuture取消不会中断当前线程。取消将导致所有下游阶段立即被CancellationException触发(将缩短执行流程)。 在呼叫者愿意无限期等待的情况下,
- “ join”和“ get”实际上是相同的。联接句柄为您包装已检查的异常。如果呼叫者想超时,则需要获取。
包括一段以说明取消行为。请注意,下游进程将不会启动,但是即使取消后上游进程也会继续。
public static void main(String[] args) throws Exception
{
int maxSleepTime = 1000;
Random random = new Random();
AtomicInteger value = new AtomicInteger();
List<String> calculatedValues = new ArrayList<>();
Supplier<String> process = () -> { try { Thread.sleep(random.nextInt(maxSleepTime)); System.out.println("Stage 1 Running!"); } catch (InterruptedException e) { e.printStackTrace(); } return Integer.toString(value.getAndIncrement()); };
List<CompletableFuture<String>> stage1 = IntStream.range(0,10).mapToObj(val -> CompletableFuture.supplyAsync(process)).collect(Collectors.toList());
List<CompletableFuture<String>> stage2 = stage1.stream().map(Test::appendNumber).collect(Collectors.toList());
List<CompletableFuture<String>> stage3 = stage2.stream().map(Test::printIfCancelled).collect(Collectors.toList());
CompletableFuture<Void> awaitAll = CompletableFuture.allOf(stage2.toArray(new CompletableFuture[0]));
try
{
/*Wait 1/2 the time,some should be complete. Some not complete -> TimeoutException*/
awaitAll.get(maxSleepTime / 2,TimeUnit.MILLISECONDS);
}
catch(TimeoutException ex)
{
for(CompletableFuture<String> toCancel : stage2)
{
boolean irrelevantValue = false;
if(!toCancel.isDone())
toCancel.cancel(irrelevantValue);
else
calculatedValues.add(toCancel.join());
}
}
System.out.println("All futures Cancelled! But some Stage 1's may still continue printing anyways.");
System.out.println("Values returned as of cancellation: " + calculatedValues);
Thread.sleep(maxSleepTime);
}
private static CompletableFuture<String> appendNumber(CompletableFuture<String> baseFuture)
{
return baseFuture.thenApply(val -> { System.out.println("Stage 2 Running"); return "#" + val; });
}
private static CompletableFuture<String> printIfCancelled(CompletableFuture<String> baseFuture)
{
return baseFuture.thenApply(val -> { System.out.println("Stage 3 Running!"); return val; }).exceptionally(ex -> { System.out.println("Stage 3 Cancelled!"); return ex.getMessage(); });
}
如果有必要取消上游处理(例如:取消某些网络呼叫),则需要自定义处理。
,在调用cancel之后,您将无法参加狂欢,因为您会遇到异常。
一种终止计算的方法是让它有一个对未来的引用并定期检查它:如果取消了,则从内部中止计算。 如果计算是一个循环,可以在每次迭代中进行检查。
您需要它成为CompletableFuture吗?导致另一种方法是避免使用CompleatableFuture,而改为使用简单的Future或FutureTask:如果您使用调用Executor的Executor执行它,则如果可能,cancel(true)将终止计算。
回答以下问题:“致电join(),我将立即得到结果”。
不,您不会立即得到它,它会挂起并等待完成计算:无法强制在较短时间内完成较长时间的计算。
您可以调用future.complete(value)来提供一个值,该值将被其他引用该Future的线程用作默认结果。