在 Java 中从 C++ 复制延迟/异步启动策略

问题描述

在 C++ 中,您可以使用延迟或异步启动策略启动线程。有没有办法在 Java 中复制此功能

auto T1 = std::async(std::launch::deferred,doSomething());
auto T2 = std::async(std::launch::async,doSomething()); 

每个的描述--

异步:

如果设置了 async 标志,则 async 在一个新的执行线程上执行可调用对象 f(所有线程局部初始化),除非函数 f 返回一个值或抛出异常,它将存储在可通过 async 返回给调用者的 std::future 访问共享状态。

延期:

如果设置了延迟标志,则异步转换 f 和 args...与 std::thread 构造函数相同,但不会产生新的执行线程。相反,执行惰性求值:第一次调用 std::future 上的非定时等待函数,异步返回给调用者将导致 f 的副本与 args 的副本一起被调用(作为右值)。 . (也作为右值传递)在当前线程(它不必是最初调用 std::async 的线程)。结果或异常被置于与未来关联的共享状态中,然后才准备就绪。对同一个 std::future 的所有进一步访问将立即返回结果。

有关详细信息,请参阅 documentation

解决方法

未来

首先,我们必须观察到 std::async 是一个执行给定任务并返回一个 std::future 对象的工具,该对象在计算结果可用时保存。

例如我们可以调用 result.get() 来阻塞并等待结果到达。此外,当计算遇到异常时,只要调用 result.get(),它就会被存储并重新抛出给我们。

Java 提供了类似的类,接口是 Future,最相关的实现是 CompletableFuture

std::future#get 大致翻译为 Future#get。甚至异常行为也非常相似。虽然 C++ 在调用 get 时重新抛出异常,但 Java 将抛出一个 ExecutionException,它的原始异常设置为 cause


如何获得Future?

在 C++ 中,您可以使用 std::async 创建未来的对象。在 Java 中,您可以使用 CompletableFuture 中的众多静态辅助方法之一。就您而言,最相关的是

因此,为了创建仅打印 Hello World! 的未来,您可以例如执行

CompletableFuture<Void> task = CompletableFuture.runAsync(() -> System.out.println("Hello World!"));
/*...*/
task.get();

Java 不仅有 lambda,还有方法引用。假设您有一个计算繁重数学任务的方法:

class MyMath {
    static int compute() {
        // Very heavy,duh
        return (int) Math.pow(2,5);
    }
}

然后你可以创建一个未来,一旦结果可用

CompletableFuture<Integer> task = CompletableFuture.runAsync(MyMath::compute);
/*...*/
Integer result = task.get();

异步与延迟

在 C++ 中,您可以选择指定一个启动策略,它规定了任务的线程行为。让我们把 C++ 对内存的承诺放在一边,因为在 Java 中你没有那么多的内存控制。

不同之处在于 async 将立即安排线程的创建并在该线程中执行任务。结果将在某个时候可用,并在您可以继续执行主要任务时进行计算。具体是新线程还是缓存线程取决于编译器,未指定。

deferred 的行为与此完全不同。当您调用 std::async 时基本上没有任何反应,不会创建额外的线程并且不会计算任务。在此期间根本不会提供结果。但是,只要您调用 get,就会在您当前的线程中计算任务并返回结果。基本上就像您自己直接调用该方法一样,根本没有任何 async 实用程序。


std::launch::async 在 Java 中

也就是说,让我们关注如何将这种行为转换为 Java。让我们从 async 开始。

这是一个简单的方法,因为它基本上是 CompletableFuture 中提供的默认和预期行为。因此,您只需执行 runAsyncsupplyAsync,具体取决于您的方法是否返回结果。让我再次展示前面的例子:

// without result
CompletableFuture<Void> task = CompletableFuture.runAsync(() -> System.out.println("Hello World!"));
/*...*/ // the task is computed in the meantime in a different thread
task.get();

// with result
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(MyMath::compute);
/*...*/
Integer result = task.get();

请注意,除了 Executor 之外的方法还有重载,如果您有自己的线程池并希望 CompletableFuture 使用它而不是它自己的(有关详细信息,请参阅 here)。


std::launch::deferred 在 Java 中

我尝试了很多用 CompletableFuture 来模拟这种行为,但似乎不可能不创建自己的实现(如果我错了,请纠正我)。无论如何,它要么在创建时直接执行,要么根本不执行。

所以我建议直接使用您提供给 CompletableFuture 的底层任务接口,例如 RunnableSupplier。在我们的例子中,我们也可能使用 IntSupplier 来避免自动装箱。

这里是两个代码示例,但这次具有延迟行为:

// without result
Runnable task = () -> System.out.println("Hello World!");
/*...*/ // the task is not computed in the meantime,no threads involved
task.run(); // the task is computed now

// with result
IntSupplier task = MyMath::compute;
/*...*/
int result = task.getAsInt();

Java 中的现代多线程

作为最后一点,我想让您更好地了解当今 Java 中通常如何使用多线程。提供的工具比 C++ 默认提供的要丰富得多。

理想情况下,您的系统设计方式应该不必关心如此小的线程细节。您使用 Executors 创建一个自动管理的动态线程池,然后针对它启动初始任务(或使用 CompletableFuture 提供的默认执行程序服务)。之后,你只需在future对象上设置一个操作管道,类似于Stream API,然后等待最终的future对象。

例如,让我们假设您有一个文件名列表 List<String> fileNames 并且您想要

  1. 读取文件
  2. 验证其内容,如果无效则跳过
  3. 压缩文件
  4. 将文件上传到某个网络服务器
  5. 检查响应状态代码

并计算无效不成功成功的数量。假设你有一些方法,比如

class FileUploader {
    static byte[] readFile(String name) { /*...*/ }
    static byte[] requireValid(byte[] content) throws IllegalStateException { /*...*/ }
    static byte[] compressContent(byte[] content) { /*...*/ }
    static int uploadContent(byte[] content) { /*...*/ }
}

那么我们可以很容易地做到这一点

AtomicInteger successfull = new AtomicInteger();
AtomicInteger notSuccessfull = new AtomicInteger();
AtomicInteger invalid = new AtomicInteger();

// Setup the pipeline
List<CompletableFuture<Void>> tasks = fileNames.stream()
    .map(name -> CompletableFuture
        .completedFuture(name)
        .thenApplyAsync(FileUploader::readFile)
        .thenApplyAsync(FileUploader::requireValid)
        .thenApplyAsync(FileUploader::compressContent)
        .thenApplyAsync(FileUploader::uploadContent)
        .handleAsync((statusCode,exception) -> {
            AtomicInteger counter;
            if (exception == null) {
                counter = statusCode == 200 ? successfull : notSuccessfull;
            } else {
                counter = invalid;
            }
            counter.incrementAndGet();
        })
    ).collect(Collectors.toList());

// Wait until all tasks are done
tasks.forEach(CompletableFuture::join);

// Print the results
System.out.printf("Successfull %d,not successfull %d,invalid %d%n",successfull.get(),notSuccessfull.get(),invalid.get());

这样做的巨大好处是它将达到最大吞吐量并使用系统提供的所有硬件容量。所有任务都是完全动态和独立执行的,由自动线程池管理。而你只需等到一切都完成。

,

对于线程的异步启动,在现代 Java 中更喜欢使用高级 java.util.concurrent.ExecutorService

获得 ExecutorService 的一种方法是通过 java.util.concurrent.Executors。 ExecutorServices 有不同的行为; Executors 类提供了一些常见情况的方法。

一旦你有了一个 ExecutorService,你就可以向它提交 Runnables 和 Callables。

Future<MyReturnValue> myFuture = myExecutorService.submit(myTask);
,

如果我理解正确的话,可能是这样的:

@Canonical
class Greeter {
    String name
}

然后像这样使用它们:

private static CompletableFuture<Void> deferred(Runnable run) {
   CompletableFuture<Void> future = new CompletableFuture<>();
   future.thenRun(run);
   return future;
}

private static CompletableFuture<Void> async(Runnable run) {
    return CompletableFuture.runAsync(run);
}
,

要获得诸如延迟线程之类的东西,您可以尝试以降低的优先级运行线程。

首先,在 Java 中,首先使用 Runnable 来创建任务通常是惯用的。您还可以使用 Callable<T> 接口,该接口允许线程返回值(Runnable 不能)。

public class MyTask implements Runnable {
  @Override
  public void run() {
    System.out.println( "hello thread." );
  }
}

然后创建一个线程。在 Java 中,线程通常包装它们执行的任务。

MyTask myTask = new MyTask();
Thread t = new Tread( myTask );
t.setPriority( Thread.currentThread().getPriority()-1 );
t.start();

这个应该在有可用的内核之前不运行,这意味着它不应该运行,直到当前线程被阻塞或无事可做。但是,您在这里受操作系统调度程序的支配,因此无法保证特定的操作。大多数操作系统会保证所有线程最终都会运行,所以如果当前线程在没有阻塞的情况下需要很长时间,操作系统无论如何都会开始执行。

如果不允许您设置线程的优先级(不常见但可能),

setPriority() 可能会引发安全异常。所以请注意这些轻微的不便。

对于带有 Future 的异步任务,我会使用执行程序服务。类 Executors 中的辅助方法是执行此操作的便捷方法。

首先像以前一样完成你的任务。

public class MyCallable implements Callable<String> {
  @Override
  public String call() {
    return "hello future thread.";
  }
}

然后使用一个执行器服务来运行它:

MyCallable myCallable = new MyCallable();
ExecutorService es = Executors.newCachedThreadPool();
Future<String> f = es.submit( myCallable );

您可以使用 Future 对象查询线程,判断其运行状态并获取其返回值。在退出 JVM 之前,您需要关闭执行程序以停止其所有线程。

es.shutdown();

我尝试尽可能简单地编写此代码,没有使用 lambda 表达式或巧妙地使用泛型。上面应该向您展示这些 lambda 表达式实际实现的内容。然而,通常认为在编写代码时更复杂一点(并且不那么冗长)更好,因此一旦你觉得你理解了上述内容,你应该调查其他语法。