带有 CompletableFuture

问题描述

我正在尝试使用 CompletableFuture 并行执行 for 循环。在循环中,我使用 supplyAsync 调用 doSomething获取输出字符串,然后将其放入 HashMap 中:

    ...
    ConcurrentHashMap<Integer,String> map = new ConcurrentHashMap<>();
    CompletableFuture<?> CompletableFuture = null;

    for ( int i = 0; i < numberOfRecords; i++ ) {
        final int finalI = i;
        CompletableFuture = CompletableFuture
                .supplyAsync( () -> doSomething( data,finalI ) )
                .thenAccept( str -> map.put( finalI,str ) );
    }
    CompletableFuture.join();

private String doSomething(HashMap<String,String> data,int finalI ) ) {
    ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

    for ( int k = 0; k < data.size(); k++ ) {
    //process data and add it in queue
    }
    String result = processQueueAndReturnString(queue);
    return result;
 

问题是当 for 循环快完成时(当 i 接近 numberOfRecords 时),for 方法中的另一个 doSomething 循环跳过了一些迭代,例如如果 k=5 它只能运行循环直到 k=2 or 3 并且在这种情况下 supplyAsync( () -> doSomething( data,finalI ) ) 返回 null。所以看起来我的 for 循环与 CompletableFuture 完成,直到一些迭代完全完成。

关于如何解决这个问题的任何建议或提示

解决方法

所以看起来我的 for 循环与 CompletableFuture 完成 [before] 一些迭代完全完成。

示例代码中的每个循环迭代都会创建一个 CompletableFuture。如果您想等待所有工作完成,您需要加入所有,而不仅仅是最后一次迭代创建的。

这样的东西(风格更正!):

ConcurrentHashMap<Integer,String> map = new ConcurrentHashMap<>();
CompletableFuture<Void>[] futures = new CompletableFuture<Void>[nosRecords];

for (int i = 0; i < nosRecords; i++) {
    final int finalI = i;
    futures[i] = CompletableFuture
            .supplyAsync(() -> doSomething(data,finalI))
            .thenAccept(str -> map.put(finalI,str));
}
CompletableFuture.allOf(futures);

请注意,您需要将 CompletableFuture<?> 更改为 CompletableFuture<Void>,因为 allOf() (javadoc) 的声明需要这样做。幸运的是,thenAccept(...) 调用已经返回了 CompletableFuture<Void>


HashMap data 不是线程安全的,应该是吗?我只是在方法 doSomething 中使用它来获取基于索引 finalI 的条目值。我不处理那个 HashMap。我刚读完。

supplyAsync 调用和对其 lambda 参数的调用之间将有一个 发生在之前。因此,只要 data 在执行任何 doSomething 调用期间不发生变化,它们都会在 data 映射中看到正确的值。

假设事情如您所说(并保持这种状态),可以在那里使用非同步 HashMap

,

Answer by Stephen C 看起来是正确的,适用于今天的 Java。但在未来(啊哈,看看我在那里做了什么?),Java 可能会提供一种更简单、更快的方法,使用虚拟线程。

项目织机

Project Loom 即将加入 Java,初步 builds available now 建立在早期的 Java 16 之上。

一个主要功能是虚拟线程 (fibers)。这些是轻量级线程。当任何虚拟线程中的控制流阻塞时,Java 会检测到该阻塞并切换到另一个虚拟线程中以保持 CPU 内核忙碌。这可以大大加快经常阻塞的线程代码(与视频编码等严格受 CPU 限制的任务相反)。

请注意,根据 Ron Pressler(Loom 项目的工作人员之一)的说法,对 CompletableFuture 上的大多数方法的需求随着虚拟线程消失了。您可能只会调用 get。查看他的演示文稿,最新的是 2020-11-112020-09-172020-07-28

虽然我没有捕捉到您业务逻辑的所有细微差别,但我想我已经掌握了要点。与 Stephen C 类似,我收集所有返回的 CompletableFuture 对象。然后我检查它们是否成功完成。

在 Project Loom 中,ExecutorService 现在是 AutoCloseable。所以我们可以使用 try-with-resources 语法。 try-block 的结尾将阻塞,直到所有提交的任务都完成。这种自然阻塞取代了 Stephen C 在解决方案中看到的 CompletableFuture.allOf(futures);

示例代码

这是我们任务的一个类,一个返回 Callable 对象的 UUID。我们还让每项任务休眠一秒钟,以演示一项冗长的任务。我们的任务还将其结果记录在我们传递给其构造函数的 ConcurrentMap 中。

package work.basil.example;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;

public class DoSomething implements Callable < UUID >
{
    private Integer identifier;
    private ConcurrentMap < Integer,UUID > results;

    // Constructor
    public DoSomething ( Integer identifier,ConcurrentMap < Integer,UUID > resultsMap )
    {
        this.identifier = identifier;
        this.results = resultsMap;
    }

    @Override
    public UUID call ( ) throws Exception
    {
        Thread.sleep( Duration.ofSeconds( 1 ) );
        UUID uuid = UUID.randomUUID();
        this.results.put( this.identifier,uuid );
        return uuid;
    }
}

这是实例化和运行这些任务的代码。

    public static void main ( String[] args )
    {
        System.out.println( "INFO - Java version: " + Runtime.version() );
        System.out.println( "INFO - Host OS: " + System.getProperty( "os.name" ) + " version " + System.getProperty( "os.version" ) );
        System.out.println( "INFO - arch: " + System.getProperty( "os.arch" ) + " | Available processors (cores): " + Runtime.getRuntime().availableProcessors() );
        long maxMemory = Runtime.getRuntime().maxMemory();
        System.out.println( "INFO - Maximum memory (bytes): " + String.format( Locale.getDefault(),"%,d",( maxMemory == Long.MAX_VALUE ? "no limit" : maxMemory ) ) );
        System.out.println( "----------------------------------------------" );
        long start = System.nanoTime();

        ConcurrentMap < Integer,UUID > results = new ConcurrentSkipListMap <>();
        int countTasks = 1_000_000;
        System.out.println( "INFO - Starting a run of " + countTasks + ". " + Instant.now() );
        List < CompletableFuture < UUID > > futures = new ArrayList <>( countTasks );
        try (
                ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
        )
        {
            for ( int nthTask = 0 ; nthTask < countTasks ; nthTask++ )
            {
                executorService.submit( new DoSomething( nthTask,results ) );
            }
        }
        // At this point,flow-of-control blocks until all submitted tasks finish (are done,or are cancelled).
        List < CompletableFuture < UUID > > canceled = new ArrayList <>();
        List < CompletableFuture < UUID > > completedExceptionally = new ArrayList <>();
        for ( CompletableFuture < UUID > future : futures )
        {
            if ( future.isCancelled() )
            {
                canceled.add( future );
            } else if ( future.isCompletedExceptionally() )
            {
                completedExceptionally.add( future );
            } else if ( ! future.isDone() )
            {
                throw new IllegalStateException( "All tasks should be done at this point,normally or interrupted." );
            } else
            {
                throw new IllegalStateException( "Should not be able to reach this point." );
            }
        }

        Duration duration = Duration.ofNanos( System.nanoTime() - start );
        System.out.println( "Done at " + Instant.now() + ". Took: " + duration );
        System.out.println( "Problems… canceled size: " + canceled.size() + " | completedExceptionally size: " + completedExceptionally.size() );
        System.out.println( "Results size = " + String.format( Locale.getDefault(),results.size() ) );
    }
INFO - Java version: 16-loom+9-316
INFO - Host OS: Mac OS X version 10.14.6 
INFO - arch: x86_64 | Available processors (cores): 6
INFO - Maximum memory (bytes): 8,589,934,592
----------------------------------------------
INFO - Starting a run of 10000000. 2021-01-01T05:40:28.564019Z
Done at 2021-01-01T05:41:11.567852Z. Took: PT43.006895236S
Problems… canceled size: 0 | completedExceptionally size: 0
Results size = 10,000,000

运行一百万个这样的任务需要几秒钟。跑一千万只需要不到一分钟。

因此您可以看到休眠一秒钟的阻塞线程显然没有在内核上占用时间。如果他们在内核上花费时间,我们会等待很长时间:10,000 个任务 * 每个 1 秒/6 个内核 = 1,666,666 秒 = 462 小时。