问题描述
我正在使用spark并行处理一百万个任务。例如,训练一百万个单独的模型。
我需要确保尽可能多的成功,但失败要少。
在火花中,如果只有一种模型找不到最佳解决方案,则它可能会被吊死并永远运行。在这种情况下,spark作业将永远无法完成,并且杀死该作业也无法将其他999,999型号保存到hdfs。
这个问题真的很痛苦。
我搜索了一下,但没有找到有用的东西:
-
spark.task.maxFailures
:没有失败,所以这不会生效。 -
spark.network.timeout
:没有网络问题。 -
spark.executor.heartbeatInterval
:没有亲戚。
核心训练代码,主要使用rdd.map进行训练
df1 = (df.rdd
.map(lambda r: r.asDict())
.map(lambda d: transform_data(d))
.map(lambda d: create_model(d))
.map(lambda d: model_fit(d))
.map(lambda d: pickle_model(d))
)
如何为Spark任务设置超时时间?还是有什么好东西?
解决方法
我认为这不能成为配置级别的控制器。您可能只想将其应用于Spark任务的一个子集。 SparkListener
可以提供帮助,因为您可以挂接到任务,阶段,工作级别,然后使用sparkContenxt
决定取消任务。
/**
* Called when a task starts
*/
def onTaskStart(taskStart: SparkListenerTaskStart): Unit
在上面,您可以实现超时逻辑。
使用def cancelStage(stageId: Int)
您可以从侦听器事件中获取特定的ID