无法使用Apache Hudi写入未初始化的表

问题描述

我正在使用Apache Hudi将未分区表写入AWS S3,并将其同步到配置单元。这是正在使用的DataSourceWriteOptions

val hudioptions: Map[String,String] = Map[String,String](
      DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "MERGE_ON_READ",DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "PERSON_ID",DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "",DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "UPDATED_DATE",DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "",DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[NonPartitionedExtractor].getName,DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true",DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator"
    )

如果已分区表已成功写入,但是在我尝试写入未分区表时出现错误。这是错误输出代码

Caused by: java.lang.NullPointerException
        at org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableMetaClientForBasePath(HoodieInputFormatUtils.java:283)
        at org.apache.hudi.hadoop.InputPathHandler.parseInputPaths(InputPathHandler.java:100)
        at org.apache.hudi.hadoop.InputPathHandler.<init>(InputPathHandler.java:60)
        at org.apache.hudi.hadoop.HoodieParquetInputFormat.listStatus(HoodieParquetInputFormat.java:81)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:288)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getorElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getorElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getorElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getorElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getorElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at org.apache.spark.rdd.RDD.getNumPartitions(RDD.scala:289)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:83)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:82)
        at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.cancel(QueryStageExec.scala:152)
        at org.apache.spark.sql.execution.adaptive.MaterializeExecutable.cancel(AdaptiveExecutable.scala:357)
        at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.fail(AdaptiveExecutor.scala:280)
        ... 41 more

这是HoodieInputFormatUtils.getTableMetaClientForBasePath()

代码
/**
   * Extract HoodieTableMetaClient from a partition path(not base path).
   * @param fs
   * @param dataPath
   * @return
   * @throws IOException
   */
  public static HoodieTableMetaClient getTableMetaClientForBasePath(FileSystem fs,Path dataPath) throws IOException {
    int levels = HoodieHiveUtils.DEFAULT_LEVELS_TO_BASEPATH;
    if (HoodiePartitionMetadata.hasPartitionMetadata(fs,dataPath)) {
      HoodiePartitionMetadata Metadata = new HoodiePartitionMetadata(fs,dataPath);
      Metadata.readFromFS();
      levels = Metadata.getPartitionDepth();
    }
    Path baseDir = HoodieHiveUtils.getNthParent(dataPath,levels);
    LOG.info("Reading hoodie Metadata from path " + baseDir.toString());
    return new HoodieTableMetaClient(fs.getConf(),baseDir.toString());
  }

第283行是LOG.info(),这导致NullPointerException。因此,似乎为分区提供的配置值已被弄乱。该代码正在AWS EMR上运行。

Release label:emr-5.30.1
Hadoop distribution:Amazon 2.8.5
Applications:Hive 2.3.6,Spark 2.4.5

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)