问题描述
给定一个使用 Stream API 的简单 Apache Storm Topology,有两种初始化流的方法:
版本 1 - 隐式声明
StreamBuilder builder = new StreamBuilder();
builder
.newStream(new Intspout(),new ValueMapper<Integer>(0),1)
.filter(x -> x > 5)
.print();
结果:这按预期工作,它只打印大于 5 的整数。
版本 2 - 显式声明
Stream<Integer> integerStream = builder.newStream(new Intspout(),1);
integerStream.filter(x -> x > 5);
integerStream.print();
问题:为什么这个显式声明不能正常工作以及如何解决这个问题?
拓扑在本地集群上运行,其中 Intspout
只是一个简单的 spout,它使用以下命令发出随机整数:
StormTopology topo = builder.build();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test",new HashMap<>(),topo);
解决方法
那是因为 integerStream.filter(x -> x > 5);
返回一个您忽略的新流。
这有效:
Stream<Integer> integerStream = builder.newStream(new IntSpout(),new ValueMapper<Integer>(0),1);
Stream<Integer> filteredStream = integerStream.filter(x -> x > 5);
filteredStream.print();
您的第一个示例中也存在语法错误。在第四行的末尾有一个额外的分号。
StreamBuilder builder = new StreamBuilder();
builder
.newStream(new IntSpout(),1)
.filter(x -> x > 5) // <= there was a semicolon here
.print();