Java Apache Spark:长转换链导致二次时间

我有一个使用Apache Spark的 Java程序.该程序中最有趣的部分如下所示:
long seed = System.nanoTime();

JavaRDD<AnnotatedDocument> annotated = documents
    .mapPartitionsWithIndex(new InitialAnnotater(seed),true);
annotated.cache();

for (int iter = 0; iter < 2000; iter++) {
    GlobalCounts counts = annotated
        .mapPartitions(new GlobalCounter())
        .reduce((a,b) -> a.sum(b)); // update overall counts (*)

    seed = System.nanoTime();

    // copy overall counts which CountChanger uses to compute a stochastic thing (**)
    annotated = annotated
        .mapPartitionsWithIndex(new CountChanger(counts,seed),true); 
    annotated.cache();

    // adding these lines causes constant time complexity like i want
    //List<AnnotatedDocument> ll = annotated.collect();
    //annotated = sc.parallelize(ll,8); 
}

因此,实际上,行(**)会生成带有表单的RDD

documents
    .mapPartitionsWithIndex(initial)
    .mapPartitionsWithIndex(nextIter)
    .mapPartitionsWithIndex(nextIter)
    .mapPartitionsWithIndex(nextIter)
    ... 2000 more

确实是一个很长的地图链.此外,由于需要更新计数,因此行(*)会在每次迭代时强制计算(非惰性).

我遇到的问题是,我得到的时间复杂度随着每次迭代而线性增加,因此总体上呈二次方式:

我认为这是因为Spark试图“记住”链中的每个RDD,以及容错算法或导致其增长的任何因素.但是,我真的不知道.

我真正想做的是在每次迭代时告诉Spark“崩溃”RDD,以便只有最后一个保存在内存中并继续工作.我认为这应该导致每次迭代的时间不变.这可能吗?还有其他解决方案吗?

谢谢!

解决方法

尝试使用rdd.checkpoint.这将把RDD保存到hdfs并清除沿袭.

每次转换RDD时,都会增加谱系,Spark必须跟踪可用内容和必须重新计算的内容.处理DAG是昂贵的,并且大型DAG倾向于非常快地杀死性能.通过“检查点”,您可以指示Spark计算并保存生成的RDD,并丢弃其创建方式的信息.这使得它类似于简单地保存RDD并将其读回,从而最大限度地减少DAG操作.

在旁注中,由于您遇到了这个问题,因此最好知道union也会通过添加步骤来影响RDD性能,并且由于沿袭信息的方式,也可能抛出StackOverflowError. See this post

This link有更多细节和漂亮的图表,主题也提到了in this SO post.

相关文章

最近看了一下学习资料,感觉进制转换其实还是挺有意思的,尤...
/*HashSet 基本操作 * --set:元素是无序的,存入和取出顺序不...
/*list 基本操作 * * List a=new List(); * 增 * a.add(inde...
/* * 内部类 * */ 1 class OutClass{ 2 //定义外部类的成员变...
集合的操作Iterator、Collection、Set和HashSet关系Iterator...
接口中常量的修饰关键字:public,static,final(常量)函数...