过滤后如何将数据帧写入S3

问题描述

我正在尝试使用下面的Scala代码在脚本编辑中以CVS格式过滤到S3后写入数据帧。

当前状态:

  • 运行后不显示任何错误,只是不写入S3。

  • 日志屏幕显示“开始”,但是看不到“打印结束”。

  • 没有表明该问题的特定错误消息。

  • 以温度计数停止。

环境条件:我对所有S3都具有管理员权限。

import com.amazonaws.services.glue.glueContext
import <others>

object glueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: glueContext = new glueContext(spark)
    // @params: [JOB_NAME]
    val args = glueArgParser.getResolvedOptions(sysArgs,Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"),glueContext,args.asJava)
    
    val datasource0 = glueContext.getCatalogSource(database = "db",tableName = "table",redshiftTmpDir = "",transformationContext = "datasource0").getDynamicFrame()
    val appymapping1 = datasource0.appyMapping(mapping=........)

    val temp=appymapping1.toDF.filter(some filtering rules)
    print("start")
    if (temp.count() <= 0) {
    temp.write.format("csv").option("sep",",").save("s3://directory/error.csv")
  }
    print("End")
     

解决方法

您正在使用if条件将Dataframe写入S3(If条件是检查数据帧是否具有一行或多行),但是If条件是反转的。仅当数据帧具有0(或更小)行时才是正确的。所以改变它。

高级:Spark始终将文件保存为“ part-”名称。因此将S3路径更改为 s3:// directory / 。并添加 .mode(“ overwrite”)

因此您的写df查询应该是

temp.write.format(“ csv”)。option(“ sep”,“,”).mode(“ overwrite”).save(“ s3:// directory”)