问题描述
我正在尝试使用 CompletableFuture 并行执行 for
循环。在循环中,我使用 supplyAsync
调用 doSomething
来获取输出字符串,然后将其放入 HashMap 中:
...
ConcurrentHashMap<Integer,String> map = new ConcurrentHashMap<>();
CompletableFuture<?> CompletableFuture = null;
for ( int i = 0; i < numberOfRecords; i++ ) {
final int finalI = i;
CompletableFuture = CompletableFuture
.supplyAsync( () -> doSomething( data,finalI ) )
.thenAccept( str -> map.put( finalI,str ) );
}
CompletableFuture.join();
private String doSomething(HashMap<String,String> data,int finalI ) ) {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
for ( int k = 0; k < data.size(); k++ ) {
//process data and add it in queue
}
String result = processQueueAndReturnString(queue);
return result;
问题是当 for
循环快完成时(当 i
接近 numberOfRecords
时),for
方法中的另一个 doSomething
循环跳过了一些迭代,例如如果 k=5
它只能运行循环直到 k=2 or 3
并且在这种情况下 supplyAsync( () -> doSomething( data,finalI ) )
返回 null
。所以看起来我的 for
循环与 CompletableFuture
完成,直到一些迭代完全完成。
解决方法
所以看起来我的 for
循环与 CompletableFuture
完成 [before] 一些迭代完全完成。
示例代码中的每个循环迭代都会创建一个 CompletableFuture
。如果您想等待所有工作完成,您需要加入所有,而不仅仅是最后一次迭代创建的。
这样的东西(风格更正!):
ConcurrentHashMap<Integer,String> map = new ConcurrentHashMap<>();
CompletableFuture<Void>[] futures = new CompletableFuture<Void>[nosRecords];
for (int i = 0; i < nosRecords; i++) {
final int finalI = i;
futures[i] = CompletableFuture
.supplyAsync(() -> doSomething(data,finalI))
.thenAccept(str -> map.put(finalI,str));
}
CompletableFuture.allOf(futures);
请注意,您需要将 CompletableFuture<?>
更改为 CompletableFuture<Void>
,因为 allOf()
(javadoc) 的声明需要这样做。幸运的是,thenAccept(...)
调用已经返回了 CompletableFuture<Void>
。
HashMap
data
不是线程安全的,应该是吗?我只是在方法 doSomething
中使用它来获取基于索引 finalI
的条目值。我不处理那个 HashMap
。我刚读完。
在 supplyAsync
调用和对其 lambda 参数的调用之间将有一个 发生在之前。因此,只要 data
在执行任何 doSomething
调用期间不发生变化,它们都会在 data
映射中看到正确的值。
假设事情如您所说(并保持这种状态),可以在那里使用非同步 HashMap
。
Answer by Stephen C 看起来是正确的,适用于今天的 Java。但在未来(啊哈,看看我在那里做了什么?),Java 可能会提供一种更简单、更快的方法,使用虚拟线程。
项目织机
Project Loom 即将加入 Java,初步 builds available now 建立在早期的 Java 16 之上。
一个主要功能是虚拟线程 (fibers)。这些是轻量级线程。当任何虚拟线程中的控制流阻塞时,Java 会检测到该阻塞并切换到另一个虚拟线程中以保持 CPU 内核忙碌。这可以大大加快经常阻塞的线程代码(与视频编码等严格受 CPU 限制的任务相反)。
请注意,根据 Ron Pressler(Loom 项目的工作人员之一)的说法,对 CompletableFuture
上的大多数方法的需求随着虚拟线程消失了。您可能只会调用 get
。查看他的演示文稿,最新的是 2020-11-11、2020-09-17 和 2020-07-28。
虽然我没有捕捉到您业务逻辑的所有细微差别,但我想我已经掌握了要点。与 Stephen C 类似,我收集所有返回的 CompletableFuture
对象。然后我检查它们是否成功完成。
在 Project Loom 中,ExecutorService
现在是 AutoCloseable
。所以我们可以使用 try-with-resources 语法。 try-block 的结尾将阻塞,直到所有提交的任务都完成。这种自然阻塞取代了 Stephen C 在解决方案中看到的 CompletableFuture.allOf(futures);
。
示例代码
这是我们任务的一个类,一个返回 Callable
对象的 UUID
。我们还让每项任务休眠一秒钟,以演示一项冗长的任务。我们的任务还将其结果记录在我们传递给其构造函数的 ConcurrentMap
中。
package work.basil.example;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
public class DoSomething implements Callable < UUID >
{
private Integer identifier;
private ConcurrentMap < Integer,UUID > results;
// Constructor
public DoSomething ( Integer identifier,ConcurrentMap < Integer,UUID > resultsMap )
{
this.identifier = identifier;
this.results = resultsMap;
}
@Override
public UUID call ( ) throws Exception
{
Thread.sleep( Duration.ofSeconds( 1 ) );
UUID uuid = UUID.randomUUID();
this.results.put( this.identifier,uuid );
return uuid;
}
}
这是实例化和运行这些任务的代码。
public static void main ( String[] args )
{
System.out.println( "INFO - Java version: " + Runtime.version() );
System.out.println( "INFO - Host OS: " + System.getProperty( "os.name" ) + " version " + System.getProperty( "os.version" ) );
System.out.println( "INFO - arch: " + System.getProperty( "os.arch" ) + " | Available processors (cores): " + Runtime.getRuntime().availableProcessors() );
long maxMemory = Runtime.getRuntime().maxMemory();
System.out.println( "INFO - Maximum memory (bytes): " + String.format( Locale.getDefault(),"%,d",( maxMemory == Long.MAX_VALUE ? "no limit" : maxMemory ) ) );
System.out.println( "----------------------------------------------" );
long start = System.nanoTime();
ConcurrentMap < Integer,UUID > results = new ConcurrentSkipListMap <>();
int countTasks = 1_000_000;
System.out.println( "INFO - Starting a run of " + countTasks + ". " + Instant.now() );
List < CompletableFuture < UUID > > futures = new ArrayList <>( countTasks );
try (
ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
)
{
for ( int nthTask = 0 ; nthTask < countTasks ; nthTask++ )
{
executorService.submit( new DoSomething( nthTask,results ) );
}
}
// At this point,flow-of-control blocks until all submitted tasks finish (are done,or are cancelled).
List < CompletableFuture < UUID > > canceled = new ArrayList <>();
List < CompletableFuture < UUID > > completedExceptionally = new ArrayList <>();
for ( CompletableFuture < UUID > future : futures )
{
if ( future.isCancelled() )
{
canceled.add( future );
} else if ( future.isCompletedExceptionally() )
{
completedExceptionally.add( future );
} else if ( ! future.isDone() )
{
throw new IllegalStateException( "All tasks should be done at this point,normally or interrupted." );
} else
{
throw new IllegalStateException( "Should not be able to reach this point." );
}
}
Duration duration = Duration.ofNanos( System.nanoTime() - start );
System.out.println( "Done at " + Instant.now() + ". Took: " + duration );
System.out.println( "Problems… canceled size: " + canceled.size() + " | completedExceptionally size: " + completedExceptionally.size() );
System.out.println( "Results size = " + String.format( Locale.getDefault(),results.size() ) );
}
INFO - Java version: 16-loom+9-316
INFO - Host OS: Mac OS X version 10.14.6
INFO - arch: x86_64 | Available processors (cores): 6
INFO - Maximum memory (bytes): 8,589,934,592
----------------------------------------------
INFO - Starting a run of 10000000. 2021-01-01T05:40:28.564019Z
Done at 2021-01-01T05:41:11.567852Z. Took: PT43.006895236S
Problems… canceled size: 0 | completedExceptionally size: 0
Results size = 10,000,000
运行一百万个这样的任务需要几秒钟。跑一千万只需要不到一分钟。
因此您可以看到休眠一秒钟的阻塞线程显然没有在内核上占用时间。如果他们在内核上花费时间,我们会等待很长时间:10,000 个任务 * 每个 1 秒/6 个内核 = 1,666,666 秒 = 462 小时。