问题描述
如果在GCS Bucket中同步,数据融合管道会在输出时为我们提供一个或多个零件文件。我的问题是我们如何将这些零件文件合并为一个,并给它们取一个有意义的名称?
解决方法
Data Fusion
转换在执行Dataproc
或Spark
作业的MapReduce
集群中运行。由于作业基于HDFS分区对数据进行分区(这是Spark / Hadoop的默认行为),因此最终输出将分成多个文件。
编写Spark脚本时,您可以操纵此默认行为并产生不同的输出。但是,Data Fusion旨在抽象化代码层,并为您提供使用完全托管的数据集成器的体验。使用分割文件应该不是问题,但是如果您确实需要合并它们,我建议您使用以下方法:
-
在Pipeline Studio顶部单击 Hub -> 插件,搜索 Dynamic Spark Plugin ,然后单击部署,然后在完成中(您可以忽略JAR文件)
-
返回到管道,在接收器部分中选择 Spark 。
-
将 GCS 插件替换为 Spark 插件
-
在您的Spark插件中,将在部署时编译设置为 false ,然后将代码替换为符合您需要的一些Spark代码。例如,下面的代码是经过硬编码的,但是可以正常工作:
def sink(df: DataFrame) : Unit = { new_df = df.coalesce(1) new_df.write.format("csv").save("gs://your/path/") }
此功能以 Dataframe 的形式从管道中接收数据。 coalesce 函数将分区数减少到1,最后一行将其写入GCS。
-
部署管道,即可开始运行