火花saveAsTable的位置在s3存储桶的根本原因处NullPointerException

问题描述

我正在使用Spark 3.0.1,我的分区表存储在s3中。请在此处找到问题的描述。

创建表格

Create table root_table_test_spark_3_0_1 (
    id string,name string
)
USING PARQUET
PARTITIONED BY (id)
LOCATION  's3a://MY_BUCKET_NAME/'

在第二次运行时导致NullPointerException的代码

Seq(MinimalObject("id_1","name_1"),MinimalObject("id_2","name_2"))
      .toDS()
      .write
      .partitionBy("id")
      .mode(SaveMode.Append)
      .saveAsTable("root_table_test_spark_3_0_1")

当Hive元存储库为空时,一切都很好,但是当spark尝试在getCustomPartitionLocations阶段执行InsertIntoHadoopFsRelationCommand时,问题就出现了。 (例如第二轮)

实际上,它调用以下方法:from({org.apache.hadoop.fs.Path

/** Adds a suffix to the final name in the path.*/
public Path suffix(String suffix) {
    return new Path(getParent(),getName()+suffix);
}

但是getParent()将在我们处于根目录时返回null,从而导致NullPointerException。我目前唯一的选择是重写此方法以执行类似的操作:

/** Adds a suffix to the final name in the path.*/
public Path suffix(String suffix) {
    return (isRoot()) ? new Path(uri.getScheme(),uri.getAuthority(),suffix) : new Path(getParent(),getName()+suffix);
}

当Spark Hive表的LOCATION处于根级别时,有人遇到问题吗?任何解决方法?是否存在任何已知问题?

我的运行时不允许我重写Path类并修复suffix方法,并且我无法将数据从存储桶的根目录移出,因为它存在已有两年了。

发生此问题是因为我正在从Spark 2.1.0迁移到Spark 3.0.1,并且检查自定义分区的行为出现在Spark 2.2.0(https://github.com/apache/spark/pull/16460

整个上下文有助于理解问题,但基本上您可以轻松重现它

val path: Path = new Path("s3a://MY_BUCKET_NAME/")
println(path.suffix("/id=id"))

仅供参考。 hadoop的通用版本是2.7.4,请在此处找到完整的stacktrace

NullPointerException
at org.apache.hadoop.fs.Path.<init>(Path.java:104)
    at org.apache.hadoop.fs.Path.<init>(Path.java:93)
    at org.apache.hadoop.fs.Path.suffix(Path.java:361)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.$anonfun$getCustomPartitionLocations$1(InsertIntoHadoopFsRelationCommand.scala:262)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.getCustomPartitionLocations(InsertIntoHadoopFsRelationCommand.scala:260)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:107)
    at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:575)
    at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:218)
    at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:166)

谢谢

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)