pyspark将数据写入hdfs时无法覆盖特定的分区数据

问题描述

我有一个customer_table,该表基于以下三列进行了分区。因此,在hdfs中,其外观如下:

date=1901 > cus_id=A > cus_type=online > file
          > cus_id=B > cus_type=online > file
          > cus_id=C > cus_type=online > file
date=1902 > cus_id=A > cus_type=online > file
          > cus_id=B > cus_type=online > file
          > cus_id=C > cus_type=online > file
date=1903 > cus_id=A > cus_type=online > file
          > cus_id=B > cus_type=online > file
          > cus_id=C > cus_type=online > file
      

现在,我已经按输入数据进行了过滤,仅考虑了cus_id = A

的数据
df_filtered = df_input.filter(df.cus_id == "A")
df_filtered {dataframe has data from 1901,1902 and 1903}

我已经完成了数据框操作,并且新计算的数据框df_filter_updated必须覆盖到customer_table中

因此,在所有date = **** hdfs文件夹中只需要替换cus_id=A文件夹数据。

我们正在执行以下操作:

df_filter_updated.write.option("compression","snappy").mode("overwrite")
.partitionby("date","cus_id","cus_type").parquet(hdfs_path)

但是,它将覆盖整个表而不是特定的分区文件夹。 我们如何实现这种覆盖方式?

实际上,我执行此操作的原因是为customer_table中存在的所有旧数据计算新列。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)