问题描述
我们在火花中消耗了运动学流,并且流中有多个碎片。按照示例中的代码
// Create the Kinesis DStreams
List<JavaDStream<byte[]>> streamsList = new ArrayList<>(numStreams);
for (int i = 0; i < numStreams; i++) {
streamsList.add(JavaDStream.fromdstream(
KinesisInputDStream.builder()
.streamingContext(jssc)
.checkpointAppName(kinesisAppName)
.streamName(streamName)
.endpointUrl(endpointUrl)
.regionName(regionName)
.initialPosition(new KinesisInitialPositions.Latest())
.checkpointInterval(kinesisCheckpointInterval)
.storageLevel(StorageLevel.MEMORY_AND_disK_2())
.build(),classtag$.MODULE$.apply(byte[].class)
));
}
// Union all the streams if there is more than 1 stream
JavaDStream<byte[]> unionStreams;
if (streamsList.size() > 1) {
unionStreams = jssc.union(streamsList.toArray(new JavaDStream[0]));
} else {
// Otherwise,just use the 1 stream
unionStreams = streamsList.get(0);
}
最初,我们创建的dstream和分片一样多。
我们发现,我们能够以更少的dstream(反过来注册为接收者)使用所有分片中的事件。
https://spark.apache.org/docs/latest/streaming-kinesis-integration.html
由于每个接收器在运行时(https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers)都占用一个内核,因此对于我们来说,接收器的数量少于分片的数量,这对我们来说是很有意义的。我们可以立即消耗掉所有负载并进行处理而无需排队。
我的问题:
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)