展平架构

问题描述

如何仅针对某些特定列而不是全部将其平铺在火花中?

def flattenDataframe(df: DataFrame): DataFrame = {
    
        val fields = df.schema.fields
        val fieldNames = fields.map(x => x.name)
        val length = fields.length
        
        for(i <- 0 to fields.length-1){
          val field = fields(i)
          val fieldtype = field.dataType
          val fieldName = field.name
          fieldtype match {
            case arrayType: ArrayType =>
              val fieldNamesExcludingArray = fieldNames.filter(_!=fieldName)
              val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName) as $fieldName")
             // val fieldNamesToSelect = (fieldNamesExcludingArray ++ Array(s"$fieldName.*"))
              val explodedDf = df.selectExpr(fieldNamesAndExplode:_*)
              return flattenDataframe(explodedDf)
            case structType: StructType =>
              val childFieldnames = structType.fieldNames.map(childname => fieldName +"."+childname)
              val newfieldNames = fieldNames.filter(_!= fieldName) ++ childFieldnames
              val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".","_").replace("$","").replace(" ","_").replace("-","_"))))
             val explodedf = df.select(renamedcols:_*)
              return flattenDataframe(explodedf)
            case _ =>
          }
        }
        df
      }

我正在使用上面的代码,但是它使所有列扁平化。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)