问题描述
如何仅针对某些特定列而不是全部将其平铺在火花中?
def flattenDataframe(df: DataFrame): DataFrame = {
val fields = df.schema.fields
val fieldNames = fields.map(x => x.name)
val length = fields.length
for(i <- 0 to fields.length-1){
val field = fields(i)
val fieldtype = field.dataType
val fieldName = field.name
fieldtype match {
case arrayType: ArrayType =>
val fieldNamesExcludingArray = fieldNames.filter(_!=fieldName)
val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName) as $fieldName")
// val fieldNamesToSelect = (fieldNamesExcludingArray ++ Array(s"$fieldName.*"))
val explodedDf = df.selectExpr(fieldNamesAndExplode:_*)
return flattenDataframe(explodedDf)
case structType: StructType =>
val childFieldnames = structType.fieldNames.map(childname => fieldName +"."+childname)
val newfieldNames = fieldNames.filter(_!= fieldName) ++ childFieldnames
val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".","_").replace("$","").replace(" ","_").replace("-","_"))))
val explodedf = df.select(renamedcols:_*)
return flattenDataframe(explodedf)
case _ =>
}
}
df
}
我正在使用上面的代码,但是它使所有列扁平化。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)