在数据帧上使用head1导致Spark中发生超时异常

问题描述

我正在运行一个简单的spark-scala代码:-

val df=spark.read.json("/home/files/data/date_20200811.json")
df.persist
if(!df.head(1).isEmpty){
val validDF=df.where("status=OLD")
validDF.write.json("/home/files/result")
}
else{
println("No data found")
}

当我运行这段代码时,它给了我一个例外:-

java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.sql.execution.joins.broadcastHashJoin.doExecute(broadcastHashJoin.scala:107)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at org.apache.spark.sql.execution.Project.doExecute(basicoperators.scala:46)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryColumnarTableScan.scala:100)
        at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryColumnarTableScan.scala:89)
        at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryColumnarTableScan.scala:41)
        at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:93)
        at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:60)
        at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:84)
        at org.apache.spark.sql.DataFrame.persist(DataFrame.scala:1596)

但是如果我将 df.head(1).isEmpty 替换为 df.count> 0 效果很好

解决方法

可能是一个巧合,您确定这段代码是错误背后的元凶吗?我认为缺少了一些东西。

请阅读错误stacktrace的第7行,它是at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:107)

这意味着在某个地方广播了一个数据帧以进行连接。并且此广播未在300秒内完成,这是默认spark.sql.broadcastTimeout