将并行数组流简化为单个数组

问题描述

我正在尝试将并行数组Stream 简化为单个数组ArrayList,因此 我对累加器和合并器使用了reduce方法,如下所示:-

    public static void main(String [] args) {
        ArrayList<String> l1 = new ArrayList<>();
        l1.add("a1");
        l1.add("a2");
        
        List<String> l2 = new ArrayList<>();
        l2.add("a3");
        l2.add("a4");
                
        List<List<String>> l = new ArrayList<>();
        l.add(l1);
        l.add(l2);
        
        Stream<List<String>> stream = l.stream();
        join(stream).forEach(System.out::println);
}

private  static  <T> List<T> join(Stream<List<T>> stream) {
        return stream.parallel().reduce(new ArrayList<>(),(total,element) -> {
            System.out.println("total: " + total);
            System.out.println("element: " + element);
            total.addAll(element);
            return total;
        },(total1,total2) -> {
            System.out.println("total1: " + total1);
            System.out.println("total2: " + total2);
            total1.addAll(total2);
            return total1;
        });
}

我知道组合器用于组合并行流。但是它没有按我预期的那样工作, 当我得到重复的结果如下:-

total: []
element: [a3,a4]
total: []
element: [a1,a2]
total1: [a3,a4,a1,a2]
total2: [a3,a2]
a3
a4
a1
a2
a3
a4
a1
a2

那么为什么重复结果呢?在累加器中使用数组列表也是线程安全的吗?

解决方法

您应该只使用flatMap

返回一个流,该流包括将流中的每个元素替换为通过将提供的映射函数应用于每个元素而生成的映射流的内容而得到的结果。将每个映射流的内容放入此流后,将其关闭。 (如果映射的流为null,则使用空流。)

这是一个中间操作。

l.stream().flatMap(x -> x.stream()).collect(Collectors.toList()); // is [a1,a2,a3,a4]

l.stream().flatMap(List::stream).collect(Collectors.toList());

您的代码存在的问题是您将功能样式的代码与副作用混合在一起。那不是一个好兆头。如果消除了副作用,则输出将达到预期效果:

    private static <T> List<T> join(Stream<List<T>> stream) {
        return stream.parallel().reduce(new ArrayList<>(),(total,element) -> {
            System.out.println("total: " + total);
            System.out.println("element: " + element);
            //total.addAll(element);
            //return total;
            var list = new ArrayList<T>(total);
            list.addAll(element);
            return list;
        },(total1,total2) -> {
            System.out.println("total1: " + total1);
            System.out.println("total2: " + total2);
            //total1.addAll(total2);
            //return total1;
            var list = new ArrayList<T>(total1);
            list.addAll(total2);
            return list;
        });
    }

除非有明确的客观原因,否则还应避免使用parallel()。并行化是一项开销,只有在要做大量工作的情况下,并行化才会变得更有成效。否则,同步开销将比任何收益都要大。