问题描述
我正在尝试使用下面的Scala代码在脚本编辑中以CVS格式过滤到S3后写入数据帧。
当前状态:
环境条件:我对所有S3都具有管理员权限。
import com.amazonaws.services.glue.glueContext
import <others>
object glueApp {
def main(sysArgs: Array[String]) {
val spark: SparkContext = new SparkContext()
val glueContext: glueContext = new glueContext(spark)
// @params: [JOB_NAME]
val args = glueArgParser.getResolvedOptions(sysArgs,Seq("JOB_NAME").toArray)
Job.init(args("JOB_NAME"),glueContext,args.asJava)
val datasource0 = glueContext.getCatalogSource(database = "db",tableName = "table",redshiftTmpDir = "",transformationContext = "datasource0").getDynamicFrame()
val appymapping1 = datasource0.appyMapping(mapping=........)
val temp=appymapping1.toDF.filter(some filtering rules)
print("start")
if (temp.count() <= 0) {
temp.write.format("csv").option("sep",",").save("s3://directory/error.csv")
}
print("End")
解决方法
您正在使用if条件将Dataframe写入S3(If条件是检查数据帧是否具有一行或多行),但是If条件是反转的。仅当数据帧具有0(或更小)行时才是正确的。所以改变它。
高级:Spark始终将文件保存为“ part-”名称。因此将S3路径更改为 s3:// directory / 。并添加 .mode(“ overwrite”)。
因此您的写df查询应该是
temp.write.format(“ csv”)。option(“ sep”,“,”).mode(“ overwrite”).save(“ s3:// directory”)