为什么即使我给了 MERGE_ON_READ,apache-hudi 也在创建 COPY_ON_WRITE 表?

问题描述

我正在尝试使用 MERGE_ON_READ 表类型创建一个简单的 Hudi 表。 执行仍在 hoodie.properties 文件中的代码后,我看到 hoodie.table.type=copY_ON_WRITE

在这里遗漏了什么吗?

代码的 Jupyter Notebook: https://github.com/sannidhiteredesai/spark/blob/master/hudi_acct.ipynb

hudi_options = {
    "hoodie.table.name": "hudi_acct","hoodie.table.type": "MERGE_ON_READ","hoodie.datasource.write.operation": "upsert","hoodie.datasource.write.recordkey.field": "acctid","hoodie.datasource.write.precombine.field": "ts","hoodie.datasource.write.partitionpath.field": "date","hoodie.datasource.write.hive_style_partitioning": "true","hoodie.upsert.shuffle.parallelism": 8,"hoodie.insert.shuffle.parallelism": 8,}

input_df = spark.createDataFrame(
    [
        (100,"2015-01-01","2015-01-01T13:51:39.340396Z",10),(101,"2015-01-01T12:14:58.597216Z",(102,"2015-01-01T13:51:40.417052Z",(103,"2015-01-01T13:51:40.519832Z",(104,"2015-01-02","2015-01-01T12:15:00.512679Z","2015-01-02T12:15:00.512679Z",20),(105,"2015-01-01T13:51:42.248818Z",],("acctid","date","ts","deposit"),)

# INSERT
(
    input_df.write.format("org.apache.hudi")
    .options(**hudi_options)
    .mode("append")
    .save(hudi_dataset)
)


update_df = spark.createDataFrame(
    [(100,20)],"deposit"))

# UPDATE
(
    update_df.write.format("org.apache.hudi")
    .options(**hudi_options)
    .mode("append")
    .save(hudi_dataset)
)

编辑: 执行上述代码后,我看到在 date=2015-01-01 分区中创建了 2 个 parquet 文件。在读取第二个 Parquet 文件时,我希望只获得更新的 1 条记录,但我也可以看到该分区中的所有其他记录。

解决方法

请您先在使用 insert 将数据加载到 hudi 时尝试使用 mode("overwrite") 并查看是否有效?

,

问题在于 "hoodie.table.type": "MERGE_ON_READ", 配置。您必须改用 hoodie.datasource.write.table.type。如果您按如下方式更新配置,它将起作用。我已经测试过了。

hudi_options = {
    "hoodie.table.name": "hudi_acct","hoodie.datasource.write.table.type": "MERGE_ON_WRITE","hoodie.datasource.write.operation": "upsert","hoodie.datasource.write.recordkey.field": "acctid","hoodie.datasource.write.precombine.field": "ts","hoodie.datasource.write.partitionpath.field": "date","hoodie.datasource.write.hive_style_partitioning": "true","hoodie.upsert.shuffle.parallelism": 8,"hoodie.insert.shuffle.parallelism": 8,"hoodie.compact.inline": "true","hoodie.compact.inline.max.delta.commits": 10
}