动态生成具有过滤器的代码,具有ColumnRenamed和合并条件Scala Spark

问题描述

我有一段代码要动态生成。我想以列表或序列的形式在下面的列中使用filtercoalescedrop语句执行withColumnRenamed操作。

这是我要动态接受的列的列表(此处为字符串)。

val cols = "a|tmp_a,b|tmp_b"

代码看起来像这样:

val df1 = df2.filter(!(coalesce(col("a"),lit(0)) === coalesce(col("tmp_a"),lit(0))) || !(upper(col("b")) === upper(col("tmp_b"))))
  .drop("a")
  .drop("b")
  .withColumnRenamed("tmp_a","a")
  .withColumnRenamed("tmp_b","b")

如果将更多列添加cols,该代码如何动态调整?新的列对应使用与上述“ b | tmp_b”相同的过滤条件。

解决方法

给输入一个带有成对列名称的输入,您可以创建两种类型的过滤条件(在第一个列对下面使用第一个过滤器模式,其余使用第二个过滤器模式)。过滤数据框后,可以使用drop来应用withColumnRenamedfoldLeft

val cols = "a|tmp_a,b|tmp_b,c|tmp_c".split(",").map(_.split("\\|"))

val filterCondHead = !(coalesce(col(cols.head(0)),lit(0)) === coalesce(col(cols.head(1)),lit(0)))
val filterCondTail = cols.tail.map(c => !(upper(col(c(0))) === upper(col(c(1))))).reduce(_ || _)

val df2 = df.filter(filterCondHead || filterCondTail)

val df3 = cols.foldLeft(df2){ case(df,c) => 
  df.drop(c(0)).withColumnRenamed(c(1),c(0))
}