迭代 GraphFrames AggregateMessages 达到内存限制

问题描述

我正在使用 GraphFrame 的 aggregateMessages 功能来构建自定义聚类算法。我在一个小样本数据集(约 100 个项目)上测试了这个算法,并验证了它的工作原理。但是当我在包含 50k 个项目的真实数据集上运行它时,在大约 10 次迭代后出现 OOM 错误。有趣的是,前几次迭代在几分钟内处理完毕,而 mem 是正常范围。在第 6 次迭代之后,内存使用量攀升至约 30GB 并最终爆炸。我在 16 核 32GB 的 2 节点集群上运行它。

由于这是一个迭代算法,而且每次迭代后的内存只会增加,我想知道我是否需要以某种方式释放内存。我在循环的末尾添加了 unpersist 块,但这并没有帮助。

我可以使用其他任何效率吗?在迭代设置中使用 GraphFrames 是否有最佳实践?

我注意到的另一件事是,在执行程序页面的 spark UI 上,使用的“存储内存”大约为 300MB,但 spark 进程实际上占用了大约 30GB。不确定这是否是内存泄漏!

enter image description here

while ( true ) {
    
    System.out.println("["+new Date()+"] Running " + i);
    Dataset<Row> lastRoutesDs = groups;
    Dataset<Row> groupUnwind = groups.withColumn("id",explode(col("routeItems")));

    GraphFrame gf = new GraphFrame(groupUnwind,edgesDs);
    
    Dataset<Row> lvl1 = gf.aggregateMessages()
            .sendToSrc(when(
                    callUDF("contains_in_array_str",AggregateMessages.dst().getField("routeItems"),AggregateMessages.src().getField("id")).equalTo(false),struct(AggregateMessages.dst().getField("routeItems").as("routeItems"),AggregateMessages.dst().getField("routeScores").as("routeScores"),AggregateMessages.dst().getField("grpId").as("grpId"),AggregateMessages.dst().getField("grpScore").as("grpScore"),AggregateMessages.edge().getField("score").as("edgeScore"))))
            .agg(collect_set(AggregateMessages.msg()).as("incomings"))
            .withColumn("inItem",explode(col("incomings")))
            .groupBy("id","inItem.grpId")
            .agg(first("inItem.routeItems").as("routeItems"),first("inItem.routeScores").as("routeScores"),first("inItem.grpScore").as("grpScore"),collect_list("inItem.edgeScore").as("inScores"))
            .groupBy("grpId")
            .agg(bestRouteAgg.apply(col("routeItems"),col("routeScores"),col("inScores"),col("grpScore"),col("id"),col("grpScore")).as("best"))
            .withColumn("newScore",callUDF("calcRouteScores",expr("size(best.routeItems)+1"),col("best.routeScores"),col("best.inScores")))
            .withColumn("edgeCount",expr("size(best.routeScores)"))
            .persist(StorageLevel.MEMORY_AND_DISK());
    
    lvl1
            .filter("newScore > " + groupMaxScore)
            .withColumn("itr",lit(i))
            .select("grpId","best.routeItems","best.routeScores","best.grpScore","edgeCount","itr")
            .write()
            .mode(SaveMode.Append)
            .json(workspaceDir + "clusters-rank-collect");

    if (lvl1.count() == 0) {
        System.out.println("****** End reached " + i);
        break;
    }

    
    Dataset<Row> newGroups = lvl1.filter("newScore <= " + groupMaxScore)
            .withColumn("routeItems_new",callUDF("merge2Array",col("best.routeItems"),array(col("best.newNode"))))
            .withColumn("routeScores_new",callUDF("merge2ArrayDouble",col("best.inScores")))
            .select(col("grpId"),col("routeItems_new").as("routeItems"),col("routeScores_new").as("routeScores"),col("newScore").as("grpScore"));
    
    if (i > 0 && (i % 2) == 0) {
        newGroups = newGroups
                .checkpoint();
    }
    
    newGroups = newGroups
            .persist(StorageLevel.DISK_ONLY());
    System.out.println( newGroups.count() );
    
    groups.unpersist();
    lastRoutesDs.unpersist();
    groupUnwind.unpersist();
    lvl1.unpersist();
    
    groups = newGroups;
    
    i++;
}
    

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...