用自定义名称将零件文件缝合到一个

问题描述

如果在GCS Bucket中同步,数据融合管道会在输出时为我们提供一个或多个零件文件。我的问题是我们如何将这些零件文件合并为一个,并给它们取一个有意义的名称

解决方法

Data Fusion转换在执行DataprocSpark作业的MapReduce集群中运行。由于作业基于HDFS分区对数据进行分区(这是Spark / Hadoop的默认行为),因此最终输出将分成多个文件。

编写Spark脚本时,您可以操纵此默认行为并产生不同的输出。但是,Data Fusion旨在抽象化代码层,并为您提供使用完全托管的数据集成器的体验。使用分割文件应该不是问题,但是如果您确实需要合并它们,我建议您使用以下方法:

  1. 在Pipeline Studio顶部单击 Hub -> 插件,搜索 Dynamic Spark Plugin ,然后单击部署,然后在完成中(您可以忽略JAR文件)

  2. 返回到管道,在接收器部分中选择 Spark

  3. GCS 插件替换为 Spark 插件

  4. 在您的Spark插件中,将在部署时编译设置为 false ,然后将代码替换为符合您需要的一些Spark代码。例如,下面的代码是经过硬编码的,但是可以正常工作:

     def sink(df: DataFrame) : Unit = {
       new_df = df.coalesce(1)
       new_df.write.format("csv").save("gs://your/path/")
     }
    

    此功能以 Dataframe 的形式从管道中接收数据。 coalesce 函数将分区数减少到1,最后一行将其写入GCS。

  5. 部署管道,即可开始运行