以增量模式读取apache hudi MOR表时如何获取键的最后一次提交?

问题描述

我有一个键 = acctid 的 MOR 表,当我在同一个键上进行 3 次提交并尝试以增量模式读取时,我只看到第一次提交,无论如何要读取最后一次提交或所有提交使用增量模式给定密钥?

请查看以下详细信息:

我在第一次运行时在 mor 表中插入了以下数据

input_df = spark.createDataFrame(
    [
        (100,"2015-01-01","2015-01-01T01:01:01.010101Z",10),(101,(102,(103,(104,(105,],("acctid","date","ts","deposit"),)

hudi 选项是:

hudi_options=
{'hoodie.table.name': 'compaction','hoodie.datasource.write.table.type': 'MERGE_ON_READ','hoodie.datasource.write.operation': 'upsert','hoodie.datasource.write.recordkey.field': 'acctid','hoodie.datasource.write.partitionpath.field': 'date','hoodie.datasource.write.precombine.field': 'ts','hoodie.datasource.write.hive_style_partitioning': 'true','hoodie.upsert.shuffle.parallelism': 2,'hoodie.insert.shuffle.parallelism': 2,'hoodie.delete.shuffle.parallelism': 2}

之后,我通过传递 ts 和存款的 3 个不同值来运行密钥 100 的更新,以便在同一密钥上完成 3 次提交

# UPDATE deposit to **11** for key 100
update_df = spark.createDataFrame(
    [(100,"2015-01-01T11:01:01.000000Z",11)],"deposit"))
update_df.write.format("org.apache.hudi").options(**hudi_options).mode("append").save(hudi_dataset)
# UPDATE deposit to **12** for key 100
update_df = spark.createDataFrame(
    [(100,"2015-01-01T12:01:01.000000Z",12)],"deposit"))
update_df.write.format("org.apache.hudi").options(**hudi_options).mode("append").save(hudi_dataset)
# UPDATE deposit to **13** for key 100
update_df = spark.createDataFrame(
    [(100,"2015-01-01T13:01:01.000000Z",13)],"deposit"))
update_df.write.format("org.apache.hudi").options(**hudi_options).mode("append").save(hudi_dataset)
first_commit = '20210719234312' # As per this particular run

output_df = (spark.read
             .option("hoodie.datasource.query.type","incremental")
             .option("hoodie.datasource.read.begin.instanttime",first_commit)
             .format("org.apache.hudi")
             .load(hudi_dataset+"/*/*"))

output_df.show()

在这输出中,我看到存款 = 11,有没有办法在不使用压缩的情况下以增量模式获得存款 = 13?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)