如何对 scala 数据集进行复杂的操作

问题描述

我对 scala 还很陌生,并且来自 sql 和 pandas 背景,scala 中的数据集对象给我带来了一些麻烦。

我有一个如下所示的数据集...

|car_num|      colour|
+-----------+---------+
|      145| c|
|      132| p|
|      104| u|
|      110| c|
|      110| f|
|      113| c|
|      115| c|
|       11| i|
|      117| s|
|      118| a|


我已使用如下所示的案例类将其加载为数据集

case class carDS(carNum: String,Colour: String)

每个 car_num 对一辆汽车来说是独一无二的,许多汽车有多个条目。颜色栏是指汽车被喷漆的颜色。

例如,我想知道如何添加一个列,该列给出一辆汽车在没有变绿 (g) 的情况下的油漆作业总数。

到目前为止,我已经尝试过了。

carDS
  .map(x => (x.carNum,x.Colour))
  .groupBy("_1")
  .count()
  .orderBy($"count".desc).show()

但我相信它只是为我提供了汽车喷漆次数的计数列。不是汽车被喷漆而不是绿色的最长连续次数

我想我可能需要在查询中使用如下函数

def colourrun(sq: String): Int = {
  println(sq)
  sq.mkString(" ")
    .split("g")
    .filter(_.nonEmpty)
    .map(_.trim)
    .map(s => s.split(" ").length)
    .max
}

但我不确定它应该去哪里。

最终如果汽车 102 被涂成 r,b,g,o,y,r,g 我希望计数列给出 4 作为答案。

我该怎么做? 谢谢

解决方法

这是一种方法,它涉及将给定汽车的喷漆作业分组为单调编号的组,由颜色为“g”的喷漆作业分隔,然后是几个 groupBy/agg 以获得最大喷漆作业数颜色为“g”的油漆作业。

(请注意,添加了 timestamp 列以确保数据集中行的确定性排序。)

val ds = Seq(
  ("102","r",1),("102","b",2),"g",3),4),"o",5),"y",6),7),8),("145","c",7)
).toDF("car_num","colour","timestamp").as[(String,String,Long)]

import org.apache.spark.sql.expressions.Window
val win = Window.partitionBy("car_num").orderBy("timestamp")

ds.
  withColumn("group",sum(when($"colour" === "g",1).otherwise(0)).over(win)).
  groupBy("car_num","group").agg(
    when($"group" === 0,count("group")).otherwise(count("group") - 1).as("count")
  ).
  groupBy("car_num").agg(max("count").as("max_between_g")).
  show
// +-------+-------------+
// |car_num|max_between_g|
// +-------+-------------+
// |    102|            4|
// |    145|            2|
// +-------+-------------+

使用 DataFrame API 的替代方法是将 groupByKey 应用到数据集,后跟 mapGroups,如下所示:

ds.
  map(c => (c.car_num,c.colour)).
  groupByKey(_._1).mapGroups{ case (k,iter) =>
    val maxTuple = iter.map(_._2).foldLeft((0,0)){ case ((cnt,mx),c) =>
      if (c == "g") (0,math.max(cnt,mx)) else (cnt + 1,mx)
    }
    (k,maxTuple._2)
  }.
  show
  // +---+---+
  // | _1| _2|
  // +---+---+
  // |102|  4|
  // |145|  2|
  // +---+---+