如何懒惰地评估嵌套的 flatMap

问题描述

我试图从两个潜在无限流中召唤一个笛卡尔积,然后我通过 limit() 对其进行限制。

到目前为止(大约)这是我的策略:

@Test
void flatMapIsLazy() {
        Stream.of("a","b","c")
            .flatMap(s -> Stream.of("x","y")
                .flatMap(sd -> IntStream.rangeClosed(0,Integer.MAX_VALUE)
                    .mapToObj(sd::repeat)))
            .map(s -> s + "u")
            .limit(20)
            .forEach(System.out::println);
}

这不起作用。

显然,我的第二个流在第一次在管道上使用时就被当场进行了最终评估。它不会产生我可以按照自己的节奏使用的惰性流。

我认为 .forEach 的这段代码中的 ReferencePipeline#flatMap 是罪魁祸首:

@Override
public void accept(P_OUT u) {
    try (Stream<? extends R> result = mapper.apply(u)) {
        if (result != null) {
            if (!cancellationRequestedCalled) {
               result.sequential().forEach(downstream);
            }
            else {
                var s = result.sequential().spliterator();
                do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
            }
        }
    }
}

我希望上面的代码返回 20 个元素,如下所示:

a
ax
axx
axxx
axxxx
...
axxxxxxxxxxxxxxxxxxx

但它却以 OutOfMemoryError 崩溃,因为嵌套 Stream 的很长 flatMap 被急切地求值 (??) 并用重复的不必要副本填满我的记忆字符串。如果不是 Integer.MAX_VALUE,而是提供值 3,保持相同的限制为 20,则预期输出将改为:

a
ax
axx
axxx
a
ay
ayy
ayyy
b
bx
bxx
bxxx
...
(up until 20 lines)

编辑:此时我刚刚使用惰性迭代器推出了我自己的实现。不过,我认为应该有一种方法可以用纯 Streams 做到这一点。

编辑 2:这已被承认为 Java https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8267758%20

解决方法

正如您已经写的那样,这已被视为错误。也许,它会在未来的 Java 版本中解决。

但即使现在也可能有解决方案。它不是很优雅,只有在外部流中的元素数量和限制足够小时才可能可行。但它会在这些限制下工作。

让我首先通过将外部 flatMap 转换为两个操作(具有标识的 mapflatMap,仅执行展平)来稍微修改您的示例:

Stream.of("a","b","c")
      .map(s -> Stream.of("x","y")
            .flatMap(sd -> IntStream.rangeClosed(0,Integer.MAX_VALUE)
                  .mapToObj(sd::repeat)))
      .flatMap(s -> s)
      .map(s -> s + "u")
      .limit(20)
      .forEach(System.out::println);

我们可以很容易地看到,我们需要的每个内部流不超过 20 个元素。所以我们可以将每个流限制为这个数量的元素。这将起作用(您应该使用变量或常量作为限制):

Stream.of("a",Integer.MAX_VALUE)
                  .mapToObj(sd::repeat)))
      .flatMap(s -> s.limit(20))            // limit each inner stream
      .map(s -> s + "u")
      .limit(20)
      .forEach(System.out::println);

当然这样还是会产生过多的中间结果,不过在上面的限制下可能问题不大。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...