根据对象中的字段之一将 JavaRDD 的每个对象存储到 S3

问题描述

我有一个 Java 对象,如下所示:

public class Obj {
   String id;
   String name;
   String date;
}

现在,我有一个 Obj 的 RDD,我们称之为 objRDD。我想将此RDD存储到s3。执行 objRDD.saveAsTextFile(s3path) 将整个输出存储到同一文件夹中。但是,我想根据 date 存储每个 RDD 对象。所以我的问题是如何根据 timestamp 为每个 RDD 对象设置路径。因此,带有日期的对象(例如 2021-07-27)位于一个文件夹中,其他文件夹也类似。

所以基本上 s3 结构应该看起来像

bucket/objFolder/date=2021-07-27/part-0000,part-0001
bucket/objFolder/date=2021-07-28/part-0000,part-0001,part-00002

解决方法

我将首先按 date 聚合行并收集日期。然后迭代这些日期,过滤并写入循环内。

我不熟悉 Java RDD。所以我用 PySpark SQL 写成伪代码,但概念应该是相似的:

df = df.cache()

rows = df.groupBy("date").count().collect()
dates = [r["date"] for r in rows]

for date in dates:
    df.filter(f"date = {date}").write.parquet(f"s3://xxx/xxx/date={date}")