问题描述
我的代码如下:
object DataTypeValidation extends Logging {
def main(args: Array[String]) {
val spark = SparkSession.builder()
.appName("SparkProjectforDataTypeValidation")
.master("local")
.getorCreate();
spark.sparkContext.setLogLevel("ERROR")
try {
breakable {
val format = new SimpleDateFormat("d-M-y hh:mm:ss.SSSSS")
println("*********Data Type Validation Started*************** " + format.format(Calendar.getInstance().getTime()))
val data = Seq(Row(873131558,"ABC22"),Row(29000000,99.00),Row(27000000,2.34))
val schema = StructType(Array(
StructField("oldcl",IntegerType,nullable = true),StructField("newcl",DoubleType,nullable = true))
)
val ONE = 1
var erroredRecordRow = new scala.collection.mutable.ListBuffer[Row]()
val newSchema = schema.fields.map({
case StructField(name,_: IntegerType,nullorNotnull,_) => StructField(name,StringType,nullorNotnull)
case StructField(name,_: DoubleType,nullorNotnull)
case fields => fields
}).dropRight(ONE)
val newStructType = StructType { newSchema }
val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)
df.show()
print(df.schema)
}
} catch {
case exception: Exception =>
println("exception caught in Data Type Mismatch In Schema Validation: " + exception.toString())
exception.printstacktrace();
}
spark.stop()
}
}
exception caught in Data Type Mismatch In Schema Validation: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 Failed 1 times,most recent failure: Lost task 0.0 in stage 0.0 (TID 0,localhost,executor driver): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of double
if (assertnotnull(input[0,org.apache.spark.sql.Row,true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0,true]),oldcl),IntegerType) AS oldcl#0
if (assertnotnull(input[0,1,newcl),DoubleType) AS newcl#1
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
解决方法
@AnkitTomar,
此错误是由于字符串值ABC22
映射到DoubleType
造成的。
请更新以下几行
val data = Seq(Row(873131558,"ABC22"),Row(29000000,99.00),Row(27000000,2.34))
val schema = StructType(Array(
StructField("oldcl",IntegerType,nullable = true),StructField("newcl",DoubleType,nullable = true))
)
使用
val data = Seq(Row(873131558,"99.00"),"2.34"))
val schema = StructType(Array(
StructField("oldcl",StringType,nullable = true))
)
以便您可以检索预期的结果,
val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)
df.show()
/*
+---------+-----+
| oldcl|newcl|
+---------+-----+
|873131558|ABC22|
| 29000000|99.00|
| 27000000| 2.34|
+---------+-----+
*/
注意:我在您的代码中找不到newSchema的用法,如果您采用任何其他方法,请发表评论
val ONE = 1
var erroredRecordRow = new scala.collection.mutable.ListBuffer[Row]()
val newSchema = schema.fields.map({
case StructField(name,_: IntegerType,nullorNotnull,_) => StructField(name,nullorNotnull)
case StructField(name,_: DoubleType,nullorNotnull)
case fields => fields
}).dropRight(ONE)
val newStructType = StructType { newSchema }