问题描述
让我们说,我有以下用逗号,
分隔的原始源数据,但是有一些X
字段具有非常自定义的格式。为简单起见,我将此示例最小化为3个字段/列。在这种情况下,自定义字段是具有特殊格式(键/值用大括号括起来)的address
。可能还有其他字段的格式完全不同。
Bob,35,[street:75917;city:new york city;state:ny;zip:10000]
...
Roger,75,[street:81659;city:los angeles;state:ca;zip:99999]
案例类别:
case class Person(name: String,age: Int,address: Address)
case class Address(street: String,city: String,state: String,zip: Int)
将源数据(包括地址字段的解析)处理为Dataset[Person]
的最有效方法是什么?
目前,我想到了两个选项:
选项1 -执行逐行手动转换:
val df = df.read.csv(source)
val dataset = df.map(row =>
Person(row.getString("_c0"),row.getInt("_c1"),getAddress(row.getString("_c3")))
).as[Person]
选项2 -为自定义格式的列使用UDF
(用户定义的函数),并使用withColumn
和withColumnRenamed
:
val udfAddress : UserDefinedFunction = udf((address: String) => toAddressObject(address))
var df = df.read.csv(source)
df = df.withColumnRenamed("_c0","name").withColumn("name",col("name").cast(StringType))
.withColumnRenamed("_c1","age").withColumn("age",col("age").cast(IntegerType))
.withColumnRenamed("_c2","address").withColumn("address",udfAddress(col("address")))
val dataset = df.as[Person]
通常,在选项1 和选项2 之间,什么效率更高?为什么?另外,如果还有另一个选项在处理/解析自定义格式的字段时更有效,那么我也欢迎其他选项。有没有更好的选择,包括手动将StructType与StructFields组合?谢谢!
解决方法
其中一种可能是-
请注意,我尚未进行任何性能测试
加载测试数据
val data =
"""
|Bob,35,[street:75917;city:new york city;state:ny;zip:10000]
|Roger,75,[street:81659;city:los angeles;state:ca;zip:99999]
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\\,").map(_.replaceAll("""^[ \t]+|[ \t]+$""","")).mkString("|"))
.toSeq.toDS()
val df = spark.read
.option("sep","|")
.option("inferSchema","true")
// .option("header","true")
// .option("nullValue","null")
.csv(stringDS)
df.show(false)
df.printSchema()
/**
* +-----+---+----------------------------------------------------+
* |_c0 |_c1|_c2 |
* +-----+---+----------------------------------------------------+
* |Bob |35 |[street:75917;city:new york city;state:ny;zip:10000]|
* |Roger|75 |[street:81659;city:los angeles;state:ca;zip:99999] |
* +-----+---+----------------------------------------------------+
*
* root
* |-- _c0: string (nullable = true)
* |-- _c1: integer (nullable = true)
* |-- _c2: string (nullable = true)
*/
将行的数据框转换为人
val person = ScalaReflection.schemaFor[Person].dataType.asInstanceOf[StructType]
val toAddr = udf((map: Map[String,String]) => Address(map("street"),map("city"),map("state"),map("zip").toInt))
val p = df.withColumn("_c2",translate($"_c2","[]",""))
.withColumn("_c2",expr("str_to_map(_c2,';',':')"))
.withColumn("_c2",toAddr($"_c2"))
.toDF(person.map(_.name): _*)
.as[Person]
p.show(false)
p.printSchema()
/**
* +-----+---+---------------------------------+
* |name |age|address |
* +-----+---+---------------------------------+
* |Bob |35 |[75917,new york city,ny,10000]|
* |Roger|75 |[81659,los angeles,ca,99999] |
* +-----+---+---------------------------------+
*
* root
* |-- name: string (nullable = true)
* |-- age: integer (nullable = true)
* |-- address: struct (nullable = true)
* | |-- street: string (nullable = true)
* | |-- city: string (nullable = true)
* | |-- state: string (nullable = true)
* | |-- zip: integer (nullable = false)
*/