问题描述
我正在尝试从S3存储桶加载所有传入的镶木地板文件,并使用delta-lake处理它们。我要例外了。
val df = spark.readStream().parquet("s3a://$bucketName/")
df.select("unit") //filter data!
.writeStream()
.format("delta")
.outputMode("append")
.option("checkpointLocation",checkpointFolder)
.start(bucketProcessed) //output goes in another bucket
.awaitTermination()
它抛出一个异常,因为“ unit”是模棱两可的。
我尝试调试它。由于某种原因,它两次找到“单位”。
这是怎么回事?可能是编码问题吗?
编辑: 这就是我创建Spark会话的方式:
val spark = SparkSession.builder()
.appName("streaming")
.master("local")
.config("spark.hadoop.fs.s3a.endpoint",endpoint)
.config("spark.hadoop.fs.s3a.access.key",accessKey)
.config("spark.hadoop.fs.s3a.secret.key",secretKey)
.config("spark.hadoop.fs.s3a.path.style.access",true)
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version",2)
.config("spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored",true)
.config("spark.sql.caseSensitive",true)
.config("spark.sql.streaming.schemaInference",true)
.config("spark.sql.parquet.mergeSchema",true)
.orCreate
edit2: df.printSchema()的输出
2020-10-21 13:15:33,962 [main] WARN org.apache.spark.sql.execution.datasources.DataSource - Found duplicate column(s) in the data schema and the partition schema: `unit`;
root
|-- unit: string (nullable = true)
|-- unit: string (nullable = true)
解决方法
正在读取相同的数据...
val df = spark.readStream().parquet("s3a://$bucketName/*")
...解决了问题。无论出于什么原因。我想知道为什么...:(