如何在Spark中对数据集的窗口进行子采样?

问题描述

假设我有一个DataSet,看起来像这样:

Name    | Grade
---------------
Josh    | 94
Josh    | 87
Amanda  | 96
Karen   | 78
Amanda  | 90
Josh    | 88

我想创建一个新的DataSet,其中每个名称有3行,其中其他行(如果有的话)是从相同名称的行中采样的(例如,Karen将有3个相同的行) )。

如何做到这一点而又不用遍历每个名​​称?

解决方法

数据准备:

 val df = Seq(("Josh",94),("Josh",87),("Amanda",96),("Karen",78),90),88)).toDF("Name","Grade")

仅当您的数据为skewed的{​​{1}}时,才执行以下操作: 添加一个随机数,然后为每个Name过滤前3个随机数。

Name

现在,汇总每个val df2 = df.withColumn("random",round(rand()*10)) import org.apache.spark.sql.expressions.Window val windowSpec = Window.partitionBy("Name").orderBy("random") val df3 = df2.withColumn("row_number",row_number.over(windowSpec)) .filter($"row_number" <= 3) 的值并重复3次,以确保我们每个Name至少拥有3条记录。然后,最后取1st 3个值,和Name

explode

注释:

  • 由于上述代码在df4.groupBy("Name").agg(collect_list("Grade") as "grade_list") .withColumn("temp_list",slice( flatten(array_repeat($"grade_list",3)),1,3)) .select($"Name",explode($"temp_list") as "Grade").show 中最多具有3个值,因此将其复制3次不会有任何危害。
  • 如果您不使用grade_list步骤,则可以结合使用Window来消除不必要的重复操作。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...