问题描述
我试图从两个潜在无限流中召唤一个笛卡尔积,然后我通过 limit()
对其进行限制。
到目前为止(大约)这是我的策略:
@Test
void flatMapIsLazy() {
Stream.of("a","b","c")
.flatMap(s -> Stream.of("x","y")
.flatMap(sd -> IntStream.rangeClosed(0,Integer.MAX_VALUE)
.mapToObj(sd::repeat)))
.map(s -> s + "u")
.limit(20)
.forEach(System.out::println);
}
这不起作用。
显然,我的第二个流在第一次在管道上使用时就被当场进行了最终评估。它不会产生我可以按照自己的节奏使用的惰性流。
我认为 .forEach
的这段代码中的 ReferencePipeline#flatMap
是罪魁祸首:
@Override
public void accept(P_OUT u) {
try (Stream<? extends R> result = mapper.apply(u)) {
if (result != null) {
if (!cancellationRequestedCalled) {
result.sequential().forEach(downstream);
}
else {
var s = result.sequential().spliterator();
do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
}
}
}
}
我希望上面的代码返回 20 个元素,如下所示:
a
ax
axx
axxx
axxxx
...
axxxxxxxxxxxxxxxxxxx
但它却以 OutOfMemoryError
崩溃,因为嵌套 Stream
的很长 flatMap
被急切地求值 (??) 并用重复的不必要副本填满我的记忆字符串。如果不是 Integer.MAX_VALUE
,而是提供值 3,保持相同的限制为 20,则预期输出将改为:
a
ax
axx
axxx
a
ay
ayy
ayyy
b
bx
bxx
bxxx
...
(up until 20 lines)
编辑:此时我刚刚使用惰性迭代器推出了我自己的实现。不过,我认为应该有一种方法可以用纯 Streams 做到这一点。
编辑 2:这已被承认为 Java https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8267758%20
解决方法
正如您已经写的那样,这已被视为错误。也许,它会在未来的 Java 版本中解决。
但即使现在也可能有解决方案。它不是很优雅,只有在外部流中的元素数量和限制足够小时才可能可行。但它会在这些限制下工作。
让我首先通过将外部 flatMap
转换为两个操作(具有标识的 map
和 flatMap
,仅执行展平)来稍微修改您的示例:
Stream.of("a","b","c")
.map(s -> Stream.of("x","y")
.flatMap(sd -> IntStream.rangeClosed(0,Integer.MAX_VALUE)
.mapToObj(sd::repeat)))
.flatMap(s -> s)
.map(s -> s + "u")
.limit(20)
.forEach(System.out::println);
我们可以很容易地看到,我们需要的每个内部流不超过 20 个元素。所以我们可以将每个流限制为这个数量的元素。这将起作用(您应该使用变量或常量作为限制):
Stream.of("a",Integer.MAX_VALUE)
.mapToObj(sd::repeat)))
.flatMap(s -> s.limit(20)) // limit each inner stream
.map(s -> s + "u")
.limit(20)
.forEach(System.out::println);
当然这样还是会产生过多的中间结果,不过在上面的限制下可能问题不大。