如何创建地图列以计算不带udaf的事件

问题描述

我想创建一个Map列来计算出现次数

例如:

+---+----+
|  b|   a|
+---+----+
|  1|   b|
|  2|null|
|  1|   a|
|  1|   a|
+---+----+

会导致

+---+--------------------+
|  b|                 res|
+---+--------------------+
|  1|[a -> 2.0,b -> 1.0]|
|  2|                  []|
+---+--------------------+

目前,在Spark 2.4.6中,我能够使用udaf做到这一点。

碰到Spark3时,我想知道是否可以摆脱这个udaf(我尝试使用新方法aggregate失败了)

有没有一种有效的方法? (对于效率方面,我可以轻松进行测试)

解决方法

这里是Spark 3解决方案:

import org.apache.spark.sql.functions._

df.groupBy($"b",$"a").count()
  .groupBy($"b")
  .agg(
    map_from_entries(
      collect_list(
        when($"a".isNotNull,struct($"a",$"count"))
      )
    ).as("res")
  )
  .show()

给予:

+---+----------------+
|  b|             res|
+---+----------------+
|  1|[b -> 1,a -> 2]|
|  2|              []|
+---+----------------+

这里使用Aggregator解决方案:

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Encoder

val countOcc = new Aggregator[String,Map[String,Int],Int]] with Serializable {
    def zero: Map[String,Int] = Map.empty.withDefaultValue(0)
    def reduce(b: Map[String,a: String) = if(a!=null) b + (a -> (b(a) + 1)) else b
    def merge(b1: Map[String,b2: Map[String,Int]) = {
      val keys = b1.keys.toSet.union(b2.keys.toSet)
      keys.map{ k => (k -> (b1(k) + b2(k))) }.toMap
    }
    def finish(b: Map[String,Int]) = b
    def bufferEncoder: Encoder[Map[String,Int]] = implicitly(ExpressionEncoder[Map[String,Int]])
    def outputEncoder: Encoder[Map[String,Int]])
}

val countOccUDAF = udaf(countOcc)

df
  .groupBy($"b")
  .agg(countOccUDAF($"a").as("res"))
  .show()

给予:

+---+----------------+
|  b|             res|
+---+----------------+
|  1|[b -> 1,a -> 2]|
|  2|              []|
+---+----------------+
,

我们可以实现的是Spark 2.4

//GET THE COUNTS
val groupedCountDf = originalDf.groupBy("b","a").count

//CREATE MAPS FOR EVERY COUNT | EMPTY MAP FOR NULL KEY
//AGGREGATE THEM AS ARRAY 

val dfWithArrayOfMaps =  groupedCountDf
.withColumn("newMap",when($"a".isNotNull,map($"a",$"count")).otherwise(map()))
.groupBy("b").agg(collect_list($"newMap") as "multimap")

//EXPRESSION TO CONVERT ARRAY[MAP] -> MAP

val mapConcatExpr = expr("aggregate(multimap,map(),(k,v) -> map_concat(k,v))")

val finalDf = dfWithArrayOfMaps.select($"b",mapConcatExpr.as("merged_data"))
,

这里有一个带有单个groupBy和一个稍微复杂的sql表达式的解决方案。此解决方案适用于Spark 2.4 +

df.groupBy("b")
  .agg(expr("sort_array(collect_set(a)) as set"),expr("sort_array(collect_list(a)) as list"))
  .withColumn("res",expr("map_from_arrays(set,transform(set,x -> size(filter(list,y -> y=x))))"))
  .show()

输出:

+---+------+---------+----------------+
|  b|   set|     list|             res|
+---+------+---------+----------------+
|  1|[a,b]|[a,a,b]|[a -> 2,b -> 1]|
|  2|    []|       []|              []|
+---+------+---------+----------------+

这个想法是两次从a列中收集数据:一次进入一组,一次进入一个列表。然后借助transform对集合中的每个元素进行计数,以计算列表中特定元素的出现次数。最后,将元素集和数量与map_from_arrays组合。

但是我不能说这种方法是否真的比UDAF更快。

,

您始终可以将JBOSS_HOME与UDF一起使用,但前提是您的分组不太合理:

collect_list

给予:

val udf_histo = udf((x:Seq[String]) => x.groupBy(identity).mapValues(_.size))

df.groupBy($"b")
  .agg(
    collect_list($"a").as("as")
  )
  .select($"b",udf_histo($"as").as("res"))
  .show()

这应该比UDAF快:Spark custom aggregation : collect_list+UDF vs UDAF