ParallelStream 收集设置?

问题描述

我有一个程序,它可以从使用 parallelStream() 中受益匪浅(大数据集,具有一些涉及映射和过滤方案,但不依赖于外部变量/同步),但必须作为一个集合收集。

我对并行流有点陌生(这是我第一次接触并行流),尝试使用下面的代码却发现这导致并发修改非并发后端和幕后死锁情况.

此映射尝试使用 Linux 本机命令 sudo blockdev --getsize64 unmounted_device_here 获取卸载磁盘的文件大小(我不知道 Java 是否可以在 Linux 上获取卸载磁盘的完整大小,所以我只是使用本机方法,因为无论如何这只会在 Linux 系统上发布)

映射方法(死锁):

var mountPath = Paths.get("/dev");
            //Do NVME Drives First
            var list = new ArrayList<Path>(10);
            //Looks like nvme1n1
            //For reasons beyond my understanding replacing [0-9] with \\d does not work here
            try (var directoryStream = Files.newDirectoryStream(mountPath,"nvme[0-9]n[0-9]")) {
                for (Path path : directoryStream) {
                    list.add(path);
                }
            }
//Map to DrivePacket (path,long),note that blockdev return bytes -> GB
var nvmePackets = list.parallelStream().map((drive) -> new DrivePacket(drive,(Long.parseLong(runcommand("sudo","blockdev","--getsize64",drive.toAbsolutePath().toString())) / (1024 * 1024 * 1024))))
                    .collect(Collectors.toSet());

IoUtils 来自 Apache 实用程序类:

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>

runcommand(执行本机调用):

public static String runcommand(String... command) {
        try {
            if (DEBUG_MODE) {
                systemMessage("Running Command: " + Arrays.asList(command).stream().collect(Collectors.joining(" ")));
            }
            var builder = new ProcessBuilder(command);
            var result = IoUtils.toString(builder.start().getInputStream(),StandardCharsets.UTF_8).replaceAll("\n","");
            if (DEBUG_MODE) {
                System.out.println("Result: " + result);
            }
            return result;
        } catch (IOException ex) {
            throw new IllegalStateException(ex);

        }
    }

DrivePacket 类:

   /**
     * A record of the relevant information for a drive
     *
     * Path is the fully qualified /dev/DRIVE path
     */
    public record DrivePacket(Path drivePath,long driveSize) {}

既然操作受益于并发,那么有没有办法使用parallelStream来做到这一点?还是我必须使用其他技术?

当我使用调试器时,它总是在执行这行代码的停止处挂起,并在 ForkJoinTask.java 处的 externalAwaitDone(); 处永远等待。

不幸的是,我找不到 SettoConcurrentMap() 类似物。

如何在仍然获得为计算授予的并行性和最终结果为集合的同时避免这种死锁?

系统:JDK 16

EDIT 0:更新了可重现性代码

鉴于映射代码调用不共享数据的子例程,我不确定为什么这会导致死锁情况。

解决方法

也许您可以将其收集到 ConcurrentMap,然后获得 keySet。 (假设为映射对象定义了 equalshashcode 方法):

list.parallelStream()
    .map(x -> complicated_mapping_here)
    .collect(Collectors.toConcurrentMap(Function.identity(),x -> Boolean.TRUE /*dummy value*/ ))
    .keySet();