select和withcolumn都不能和foldleft一起使用

问题描述

试图从嵌套模式中爆炸给定的列。我正在尝试在数据框上向左折叠。

这里我只处理了两种情况

  1. 如果列类型是struct,那么我试图通过选择子句
  2. 如果列类型是数组,那么我试图通过使用withColumn然后选择子句来爆炸数据

这是我的架构:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val schema = StructType(Array(
    StructField("RootData",StructType(Seq(
        StructField("Rates",ArrayType(StructType(Array(
            StructField("Code",StringType,true),StructField("Rate",StructField("Type",StructField("TargetValue",true)))),StructField("RecordCount",LongType,true))),StructField("CreationDate",StructField("SysID",StructField("ImportID",true)))


|-- RootData: struct (nullable = true)
|    |-- Rates: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- Code: string (nullable = true)
|    |    |    |-- Rate: string (nullable = true)
|    |    |    |-- Type: string (nullable = true)
|    |    |    |-- TargetValue: string (nullable = true)
|    |-- RecordCount: long (nullable = true)
|-- CreationDate: string (nullable = true)
|-- SysID: string (nullable = true)
|-- ImportID: string (nullable = true)

下面是代码片段:

 // Here sourceDF has nested schema dataframe
 // List of  nested columns 
def execute(sourceDf: DataFrame,exp_Cols : Array[String]) = {
    var list = Array[String]()
    val df = exp_Cols.foldLeft(sourceDf){(df,colName) =>
        if ( df.columns.contains(colName) ) {
            val typeName = df.schema( colName ).dataType.typeName
            println("typeName " + typeName)
            if ( typeName == "struct" || typeName == "array") list = list :+ colName
            if (typeName == "struct") df.selectExpr("*",colName + ".*")
            else if (typeName == "array") df.withColumn(colName,explode(col(colName))).selectExpr("*",colName + ".*")
            else df 
        }
        df
    }
    println(list.toList)
    df.drop(list:_*)
}

但是当我尝试使用下面的语句时,它的工作符合预期。我用foldleft写过的东西也一样。

 nestedDf.selectExpr("*","RootData.*").withColumn("Rates",explode($"Rates")).selectExpr("*","Rates.*").drop("RootData","Rates")

我在上述方法中没有犯任何错误,还是可以通过更好的方法来实现?

Am使用spark2.30版本和scala 2.11版本

编辑:

请找到以下示例数据:

val jsonStr = """{"RootData":{"Rates":[{"Code":"USD","Rate":"2.007500000","Type":"Common","TargetValue":"BYR"},{"Code":"USD","Rate":"357.300000000","TargetValue":"MRO"},"Rate":"21005.000000000","TargetValue":"STD"},"Rate":"248520.960000000","TargetValue":"VEF"},"Rate":"77.850000000","TargetValue":"AFN"},"Rate":"475.150000000","TargetValue":"AMD"},"Rate":"250.000000000","TargetValue":"YER"},"Rate":"15.063500000","TargetValue":"ZAR"},"Rate":"13.291500000","TargetValue":"ZMW"},"Rate":"1.000000000","TargetValue":"USD"}
],"RecordCount":10},"CreationDate":"2020-01-01","SysID":"987654321","ImportID":"123456789"}"""

val nestedDf = spark.read.json(Seq(jsonStr).toDS)
val exp_cols = Array("RootData","Rates")
execute(nestedDf,exp_cols)

我正在使用的临时解决方案如下:

def forStructTypeCol(df : DataFrame,colName: String) = df.selectExpr("*",colName +".*")
def forArrayTypeCol(df : DataFrame,colName: String) = df.withColumn(colName,colName +".*")
var t_nestedDf = nestedDf
exp_cols.foreach(colName=> { t_nestedDf =  if ( t_nestedDf.columns.contains(colName) ) { val typeName = t_nestedDf.schema( colName ).dataType.typeName ; if ( typeName == "struct") forStructTypeCol(t_nestedDf,colName) else if (typeName == "array") forArrayTypeCol(t_nestedDf,colName) else t_nestedDf } else t_nestedDf  })
val finaldf = t_nestedDf.drop(exp_cols:_*)

解决方法

我认为您的代码是错误的,因为您总是返回df而不是用其他列进行了充实的df(也许您缺少else子句?):

def execute(sourceDf: DataFrame,exp_Cols : Array[String]) = {
    var list = Array[String]()
    val df = exp_Cols.foldLeft(sourceDf){(df,colName) =>
        if ( df.columns.contains(colName) ) {
            val typeName = df.schema( colName ).dataType.typeName
            println("typeName " + typeName)
            if ( typeName == "struct" || typeName == "array") list = list :+ colName
            if (typeName == "struct") df.selectExpr("*",colName + ".*")
            else if (typeName == "array") df.withColumn(colName,explode(col(colName))).selectExpr("*",colName + ".*")
            else df 
        } else {
            df
        }
    }
    println(list.toList)
    df.drop(list:_*)
}