取消已经运行的 CompletableFutures 的预期模式是什么

问题描述

我在“Java 8 in Action”中找不到任何关于 CompletableFuture 故意忽略 mayInterruptIfRunning 的信息。但即便如此,我也没有真正看到自定义 cancel(boolean) 的任何钩子,这在中断不影响阻塞操作(例如 I/O 流、简单锁和很快)。到目前为止,任务本身似乎是预期的钩子,在 Future 的抽象级别上工作在这里没有任何好处。

因此,我问的是为了从这种情况中挤出一些 neet 自定义取消机制而必须引入的最少样板代码集。

解决方法

CompletableFuture 是封装以下三种状态之一的对象:

  1. 未完成,
  2. 完成一个值,或
  3. 异常完成

从“未完成”到其他状态之一的转换可由传递给其工厂方法之一或 CompletionStage 实现方法之一的函数触发。

但也可以在其上调用 completecompleteExceptionally。在这方面,对其调用 cancel 与对其调用 completeExceptionally(new CancellationException()) 具有相同的效果。

这也是它的类名所暗示的,它是一个可以完成而不是将[最终]完成的未来。该类不提供对可能在任意线程中运行的现有完成尝试的控制,并且它不会特别对待自己安排的完成尝试。

同样重要的是要明白,当通过 CompletionStage 实现方法链接操作时,产生的 future 只代表链的最后一个阶段,取消它也只会影响最后一个阶段。

例如以下代码

CompletableFuture<?> cf = CompletableFuture.supplyAsync(() -> {
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        String s = "value1";
        System.out.println("return initial " + s);
        return s;
    }).thenApply(s -> {
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        s = s.toUpperCase();
        System.out.println("return transformed " + s);
        return s;
    }).thenAccept(s -> {
        System.out.println("starting last stage");
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        System.out.println("processed " + s);
    });
cf.cancel(false);
ForkJoinPool.commonPool().awaitQuiescence(1,TimeUnit.DAYS);

将打印

return initial value1
return transformed VALUE1

证明只有链的最后一个阶段被取消,而前一个阶段运行完成,完全不受影响。

保留对第一阶段的引用以尝试取消整个链只会在第一阶段尚未完成时起作用,因为尝试取消已经完成的未来是无效的。

long[] waitBeforeCancel = { 500,1500 };
for(long l: waitBeforeCancel) {
    CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
        String s = "value1";
        System.out.println("return initial " + s);
        return s;
    });
    first.thenApply(s -> {
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
            s = s.toUpperCase();
            System.out.println("return transformed " + s);
            return s;
        }).thenAccept(s -> {
            System.out.println("starting last stage");
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
            System.out.println("processed " + s);
        });
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(l));
    System.out.println("Trying to cancel");
    first.cancel(false);
    ForkJoinPool.commonPool().awaitQuiescence(1,TimeUnit.DAYS);
    System.out.println();
}
Trying to cancel
return initial value1

return initial value1
Trying to cancel
return transformed VALUE1
starting last stage
processed VALUE1

这说明当第一阶段及时取消时整个链被取消(除了第一个Supplier的代码由于不存在中断仍然完成),而取消太晚​​不会影响任何阶段。

记住所有 CompletableFuture 实例,以便能够取消所有实例,这将违背 API 的目的。您可以使用跟踪所有当前处理的作业的执行程序,在最后一个阶段被取消时将取消和中断转发给它们。然后,CompletableFuture 实现会将取消转发到相关阶段。这样,完成的阶段仍然可以被垃圾回收。

设置有点复杂;预先需要包装执行器来构造 CompletableFuture 链,取消的转发需要已构造链的最后阶段。这就是为什么我制作了一个实用方法,接受链构造代码作为 Function<Executor,CompletableFuture<T>>:

static <T> Future<T> setupForInterruption(Function<Executor,CompletableFuture<T>> f) {
    return setupForInterruption(f,ForkJoinPool.commonPool());
}
static <T> Future<T> setupForInterruption(
        Function<Executor,CompletableFuture<T>> f,Executor e) {

    AtomicBoolean dontAcceptMore = new AtomicBoolean();
    Set<Future<?>> running = ConcurrentHashMap.newKeySet();
    Executor wrapper = r -> {
        if(dontAcceptMore.get()) throw new CancellationException();
        FutureTask<?> ft = new FutureTask<>(r,null) {
            @Override protected void done() { running.remove(this); }
        };
        running.add(ft);
        e.execute(ft);
    };
    CompletableFuture<T> cf = f.apply(wrapper);
    cf.whenComplete((v,t) -> {
        if(cf.isCancelled()) {
            dontAcceptMore.set(true);
            running.removeIf(ft -> ft.cancel(true) || ft.isDone());
        }
    });
    return cf;
}

可以像这样使用

long[] waitBeforeCancel = { 500,1500,2500,3500 };
for(long l: waitBeforeCancel) {
    Future<?> f = setupForInterruption(executor ->
        CompletableFuture.supplyAsync(() -> {
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
                if(Thread.interrupted()) throw new IllegalStateException();
                String s = "value1";
                System.out.println("return initial " + s);
                return s;
            },executor).thenApplyAsync(s -> {
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
                if(Thread.interrupted()) throw new IllegalStateException();
                s = s.toUpperCase();
                System.out.println("return transformed " + s);
                return s;
            },executor).thenAcceptAsync(s -> {
                System.out.println("starting last stage");
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
                if(Thread.interrupted()) throw new IllegalStateException();
                System.out.println("processed " + s);
            },executor));
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(l));
    System.out.println("Trying to cancel");
    f.cancel(true);
    ForkJoinPool.commonPool().awaitQuiescence(1,TimeUnit.DAYS);
    System.out.println();
}
Trying to cancel

return initial value1
Trying to cancel

return initial value1
return transformed VALUE1
starting last stage
Trying to cancel

return initial value1
return transformed VALUE1
starting last stage
processed VALUE1
Trying to cancel

由于 API 使用了 SupplierFunctionConsumer,它们都不允许抛出 InterruptedException,因此示例代码对中断进行了显式测试并抛出 { {1}} 代替。这也是它使用 IllegalStateException 的原因,它首先在中断时立即返回而不是 parkNanos。在实际应用场景中,您可能会调用中断敏感方法,并且必须捕获 Thread.sleepInterruptedExceptionInterruptedIOException(等)并将它们转换为未经检查的异常。

请注意,上述方法总是会因中断而取消,因为 InterruptedNamingException 不会说明取消是否有中断。如果你想得到这个参数的值,你需要一个前端的CompletableFuture实现来反映最后一个阶段的结果,将取消转发给它,但是将Future的值传递给当前正在运行的作业。

mayInterruptIfRunning
,

你可以想象一个 CompletableFuture 只是一个异步子进程的(类型化的)句柄(当然它还有更多的好处)。但它不是异步子进程的控制器。您甚至无法直接访问它(或执行它的线程)。

为了实现 abort(),我是这样做的,扩展了 CompletableFuture

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
import x.y.MyEnum;

/**
 * Provides an interruptible CompletableFuture implementation for one time usage.
 */
public final class InteruptibleCompletableFuture extends CompletableFuture<MyEnum>
{
    /**
     * the embedded executor service
     */
    private ExecutorService myExecutorService;
    
    /**
     * Executes the given supplier asynchronously and returns the future to wait for.
     * 
     * @param aSupplier the supplier to be executed
     * @return the CompletableFuture to wait for
     */
    public static InteruptibleCompletableFuture execute(Supplier<MyEnum> aSupplier)
    {
        InteruptibleCompletableFuture result = new InteruptibleCompletableFuture();
        
        result.myExecutorService.submit( () -> 
        {
            try
            {
                MyEnum res = aSupplier.get(); 
                result.complete( res);
            }
            catch ( Throwable ex )
            {
                result.completeExceptionally( ex);
            }
        });
    
        return result;
    }

    private InteruptibleCompletableFuture()
    {
        super();
        myExecutorService = Executors.newSingleThreadExecutor();
    }

    @Override
    public boolean complete( MyEnum value)
    {
        if ( isDone() )
        {
            // Future already completed
            return false;
        }
        
        shutdownExecutorService();
        return super.complete( value);
    }

    @Override
    public boolean completeExceptionally( Throwable ex)
    {
        if ( isDone() )
        {
            // Future already completed
            return false;
        }
        
        shutdownExecutorService();
        return super.completeExceptionally( ex);
    }
    
    private void shutdownExecutorService()
    {
        try
        {
            myExecutorService.shutdownNow();
        }
        catch ( Throwable e )
        {
            // log error
        }
    }
}

这个想法是这个实现对执行者有帮助。强制终止从主应用程序委托给调用 myExecutorService.shutdownNow()。仔细阅读这个方法的javadoc。

请注意,java 没有合法的方法来杀死一个线程。调用 myThread.interrupt() 不会杀死它。它只是设置中断标志并唤醒正在监视器中等待的线程(抛出 InterruptedException)。如果您的实现在循环中执行批处理作业,则必须必须定期检查中断标志,并对 InterruptedException 做出正确反应。