问题描述
我想在 Palantir Foundry 中联合多个数据集,数据集的名称是动态的,因此我无法静态地在 transforms_df(..) 中给出数据集名称。有没有一种方法可以在 transforms_df 中动态获取多个输入并联合所有这些数据帧?
我尝试遍历数据集,例如:
li = ['dataset1_path','dataset2_path']
union_df = None
for p in li:
@transforms_df(
my_input = Input(p),Output(p+"_output")
)
def my_compute_function(my_input):
return my_input
if union_df is None:
union_df = my_compute_function
else:
union_df = union_df.union(my_compute_function)
如有任何帮助,将不胜感激,谢谢。
解决方法
通过一些更改,这应该可以为您工作,这是一个带有 json 文件的动态数据集的示例,您的情况可能只有一点不同。这是您可以执行动态 json 输入数据集的通用方法,该数据集应该适用于您可以指定的任何类型的动态输入文件类型或代工厂内部数据集。这个通用示例正在处理上传到平台中数据集节点的一组 json 文件。这应该是完全动态的。在这之后做工会应该是一件简单的事情。
这里也有一些奖励日志记录。
希望能帮到你
from transforms.api import Input,Output,transform
from pyspark.sql import functions as F
import json
import logging
def transform_generator():
transforms = []
transf_dict = {## enter your dynamic mappings here ##}
for value in transf_dict:
@transform(
out=Output(' path to your output here '.format(val=value)),inpt=Input(" path to input here ".format(val=value)),)
def update_set(ctx,inpt,out):
spark = ctx.spark_session
sc = spark.sparkContext
filesystem = list(inpt.filesystem().ls())
file_dates = []
for files in filesystem:
with inpt.filesystem().open(files.path) as fi:
data = json.load(fi)
file_dates.append(data)
logging.info('info logs:')
logging.info(file_dates)
json_object = json.dumps(file_dates)
df_2 = spark.read.option("multiline","true").json(sc.parallelize([json_object]))
df_2 = df_2.withColumn('upload_date',F.current_date())
df_2.drop_duplicates()
out.write_dataframe(df_2)
transforms.append(update_logs)
return transforms
TRANSFORMS = transform_generator()
,
所以这个问题分为两个问题。
如何使用程序化输入路径处理转换
要处理程序化输入的转换,重要的是要记住两件事:
1st - Transforms 将在 CI 时间确定您的输入和输出。这意味着您可以拥有生成转换的 Python 代码,但您无法从数据集中读取路径,需要将它们硬编码到生成转换的 Python 代码中。
2nd - 您的转换将在 CI 执行期间创建一次。这意味着无论何时构建数据集,您都无法使用增量或特殊逻辑来生成不同的路径。
有了这两个前提,就像你的例子或@jeremy-david-gamet's(回复的ty,给你一个+1),你可以拥有在CI时生成路径的python代码。
dataset_paths = ['dataset1_path','dataset2_path']
for path in dataset_paths:
@transforms_df(
my_input = Input(path),Output(f"{path}_output")
)
def my_compute_function(my_input):
return my_input
但是要合并它们,您需要第二次转换来执行合并,您需要传递多个输入,因此您可以为此使用 *args
或 **kwargs
:
dataset_paths = ['dataset1_path','dataset2_path']
all_args = [Input(path) for path in dataset_paths]
all_args.append(Output("path/to/unioned_dataset"))
@transforms_df(*all_args)
def my_compute_function(*args):
input_dfs = []
for arg in args:
# there are other arguments like ctx in the args list,so we need to check for type. You can also use kwargs for more determinism.
if isinstance(arg,pyspark.sql.DataFrame):
input_dfs.append(arg)
# now that you have your dfs in a list you can union them
# Note I didn't test this code,but it should be something like this
...
如何合并具有不同模式的数据集。
对于这一部分,有很多关于如何在 spark 中合并不同数据帧的问答。这是从 https://stackoverflow.com/a/55461824/26004
复制的简短代码示例from pyspark.sql import SparkSession,HiveContext
from pyspark.sql.functions import lit
from pyspark.sql import Row
def customUnion(df1,df2):
cols1 = df1.columns
cols2 = df2.columns
total_cols = sorted(cols1 + list(set(cols2) - set(cols1)))
def expr(mycols,allcols):
def processCols(colname):
if colname in mycols:
return colname
else:
return lit(None).alias(colname)
cols = map(processCols,allcols)
return list(cols)
appended = df1.select(expr(cols1,total_cols)).union(df2.select(expr(cols2,total_cols)))
return appended
,
由于输入和输出是在 CI 时间确定的,我们无法形成真正的动态输入。我们将不得不以某种方式指向代码中的特定数据集。假设数据集的路径共享相同的根,以下似乎需要最少的维护:
from transforms.api import transform_df,Input,Output
from functools import reduce
datasets = [
'dataset1','dataset2','dataset3',]
inputs = {f'inp{i}': Input(f'input/folder/path/{x}') for i,x in enumerate(datasets)}
kwargs = {
**{'output': Output('output/folder/path/unioned_dataset')},**inputs
}
@transform_df(**kwargs)
def my_compute_function(**inputs):
unioned_df = reduce(lambda df1,df2: df1.unionByName(df2),inputs.values())
return unioned_df
关于不同模式的联合,since Spark 3.1 one can use this:
df1.unionByName(df2,allowMissingColumns=True)