问题描述
我需要对具有嵌套结构的数据框的列进行转换。转换依赖于已经存在的功能。
假设数据如下
case class A(A: B)
case class B(B: String,C: String,D: Seq[C])
case class C(E: String,F: String)
val df = sc.parallelize(Seq(A(B("b","c",Seq(C("e1","f1"),C("e2","f2")))) )).toDF
df.printSchema
root
|-- A: struct (nullable = true)
| |-- B: string (nullable = true)
| |-- C: string (nullable = true)
| |-- D: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- E: string (nullable = true)
| | | |-- F: string (nullable = true)
并假设该转换将字符串转换为大写字母
val upper: String => String = _.toupperCase
val upperUDF = udf(upper)
Here我发现了一种可以部分解决我的问题的方法。应用此处给出的代码
def mutate(df: DataFrame,fn: Column => Column): DataFrame = {
// Get a projection with fields mutated by `fn` and select it
// out of the original frame with the schema reassigned to the original
// frame (explained later)
df.sqlContext.createDataFrame(df.select(traverse(df.schema,fn):_*).rdd,df.schema)
}
def traverse(schema: StructType,fn: Column => Column,path: String = ""): Array[Column] = {
schema.fields.map(f => {
f.dataType match {
case s: StructType => struct(traverse(s,fn,path + f.name + "."): _*)
case _ => fn(col(path + f.name))
}
})
}
以下对我来说很好
val df2 = mutate(df,c => if (c.toString == "A.B" || c.toString == "A.C") upperUDF(c) else c)
但是,当涉及到嵌套数组D的列的转换时,它失败而没有错误。
val df3 = mutate(df,c => if (c.toString == "A.D.F") upperUDF(c) else c)
这是怎么回事?如上所述,如何转换嵌套数组的列?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)