如何将超时设置为spark任务或map操作? 或跳过长时间运行的任务

问题描述

我正在使用spark并行处理一百万个任务。例如,训练一百万个单独的模型。

我需要确保尽可能多的成功,但失败要少。 在火花中,如果只有一种模型找不到最佳解决方案,则它可能会被吊死并永远运行。在这种情况下,spark作业将永远无法完成,并且杀死该作业也无法将其他999,999型号保存到hdfs。
这个问题真的很痛苦。

我搜索了一下,但没有找到有用的东西:

  • spark.task.maxFailures:没有失败,所以这不会生效。
  • spark.network.timeout:没有网络问题。
  • spark.executor.heartbeatInterval:没有亲戚。

核心训练代码,主要使用rdd.map进行训练

df1 = (df.rdd
      .map(lambda r: r.asDict())
      .map(lambda d: transform_data(d))
      .map(lambda d: create_model(d))
      .map(lambda d: model_fit(d))
      .map(lambda d: pickle_model(d))
)

如何为Spark任务设置超时时间?还是有什么好东西?

解决方法

我认为这不能成为配置级别的控制器。您可能只想将其应用于Spark任务的一个子集。 SparkListener可以提供帮助,因为您可以挂接到任务,阶段,工作级别,然后使用sparkContenxt决定取消任务。

 /**
   * Called when a task starts
   */
  def onTaskStart(taskStart: SparkListenerTaskStart): Unit

在上面,您可以实现超时逻辑。

使用def cancelStage(stageId: Int)

使用sparkContext可以完成特定任务

您可以从侦听器事件中获取特定的ID

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...