问题描述
我正在处理一个非常大的数据集,其中包括 200 个压缩的 JSON 文件(每个约 8G 未压缩)。我创建了一个主数据框 largeDF
,以及几个额外的数据框来计算嵌套属性(结构数组)的聚合。我想执行一般统计数据计算(填充率和组计数)。
对整个数据集的每次处理需要大约 20 分钟(加载文件、解压缩和执行聚合)。对于 50 个字段,它需要很长时间,因为每次我更改我的条件并一次又一次地运行带有附加过滤器的查询。
我想依靠 PySpark 的惰性求值并避免多次加载数据,因此我可以创建一个复杂的聚合并将其应用于整个数据集一次,然后将所有结果转换为 Pandas。或者更好,如果我可以预先定义作业并要求 Spark 并行处理它们(加载一次,计算全部),然后分别返回每个作业的结果。
这些不是我的主要 ETL,但我正在尝试提取数据集的语义以编写实际的 ETL 管道。
计算 1:计算统计数据并找到所有字段的填充率:
stats = DF_large.describe().toPandas()
计算 2:处理带有分类数据的简单字段:
def group_count(df,col,limit,sort,skip_null):
"""This function groups data-set on based on provided column[s],and counts each group."""
if skip_null:
df = df.where(df[col].isNotNull())
if limit:
df = df.limit(limit)
df = df.groupBy(col).count()
if sort:
df = df.sort(col,ascending=False)
return df.toPandas()
aggregations = {}
for col in group_count_list_of_columns:
aggregations[col] = group_count(largeDF,limit=0,skip_null=True,sort=False)
计算 3:计算和计算嵌套字段的填充率:
def get_nested_fields(spDf,col : str,othercols : tuple,stats = True):
"""This function unwinds a nested array field out of data-set based on provided column,and either returns the whole or statistics of it."""
spDf = spDf.where(spDf[col].isNotNull())
df = spDf.select(F.explode(col),*othercols)
if limit:
df = df.limit(limit)
if stats:
res = df.describe().toPandas()
else:
res = df.toPandas()
return res
nested_fields_aggregate = {}
for col in nested_fields_lists:
nested_fields_aggregate[col] = get_nested_field(largeDF,limit=10**4,othercols =['name','id','timestamp'],stats = True)
这需要多次读取整个数据集。形状不一样所以我不能加入。理论上应该有办法减少时间,因为没有一个计算是相互依赖的。
解决方法
每次调用 pandas 时,您都会再次读取 DF_large 数据帧。为避免这种情况,您可以使用 DF_large = DF_large.cache()
缓存此数据帧。