对火花查询正确执行分桶

问题描述

让我们考虑一个数据集:

姓名 年龄
最大 33
亚当 32
Zim 41
穆勒 62

现在,如果我们在数据集 x 上运行这个查询

x.as("a").join(x.as("b")).where(
          $"b.age" - $"a.age" <= 10 and
          $"a.age" > $"b.age").show()
姓名 年龄 姓名 年龄
最大 33 Zim 41
亚当 32 最大 33
亚当 32 Zim 41

这是我想要的结果。

现在,从概念上讲,如果我有一个非常大的数据集,我可能想使用分桶来减少搜索空间。

因此,使用以下方法进行分桶:

val buck_x = x.withColumn("buc_age",floor($"age"/ 10))

这给了我:

姓名 年龄 buck_age
最大 33 3
亚当 32 3
Zim 41 4
穆勒 62 6

爆炸后,我得到以下结果:

val exp_x = buck_x.withColumn("buc_age",explode(array($"buc_age" -1,$"buc_age",$"buc_age" + 1)))
姓名 年龄 buck_age
最大 33 2
最大 33 3
最大 33 4
亚当 32 2
亚当 32 3
亚当 32 4
Zim 41 3
Zim 41 4
Zim 41 5
穆勒 62 5
穆勒 62 6
穆勒 62 7

现在,在最终查询之后,

exp_x.as("a").join(exp_x.as("b")).where(
          $"a.buc_age" === $"b.buc_age" and
          $"b.age" - $"a.age" <= 10 and
          $"b.age" > $"a.age").show()

我得到以下结果。

姓名 年龄 buc_age 姓名 年龄 buc_age
最大 33 3 Zim 41 3
最大 33 4 Zim 41 4
亚当 32 2 最大 33 2
亚当 32 3 Zim 41 3
亚当 32 3 最大 33 3
亚当 32 4 Zim 41 4
亚当 32 4 最大 33 4

显然,这与我的预期不同,我得到的行数比预期的多。使用bucket时如何解决这个问题?

解决方法

删除分桶列,然后选择不同的行,基本上消除了爆炸引起的重复:

<div>
    <div v-if="active == 0">
        <ingresar-informacion-general></ingresar-informacion-general>
    </div>
    <div v-if="active == 1">
        <ingresar-proveedor></ingresar-proveedor>
    </div>
    <div v-if="active == 2">
        <ingresar-condiciones></ingresar-condiciones>
    </div>
</div>

<el-steps :active="active" finish-status="success" simple>
    <el-step title="Informacion General"></el-step>
    <el-step title="Proveedor"></el-step>
    <el-step title="Condiciones"></el-step>
</el-steps>
,

真的没有必要爆炸。

相反,这种方法联合了两个内部自连接。两个连接查找以下情况:

  • A 和 B 在同一个桶中,而 B 更老
  • B 多一桶,但不超过 10 岁

这应该比使用爆炸效果更好,因为执行的比较更少(因为这里连接的集合是爆炸尺寸的三分之一)。

val namesDF = Seq(("Max",33),("Adam",32),("Zim",41),("Muller",62)).toDF("name","age")

val buck_x =  namesDF.withColumn("buc_age",floor($"age" / 10))

// same bucket where b is still older
val same = buck_x.as("a").join(buck_x.as("b"),($"a.buc_age" === $"b.buc_age" && $"b.age" > $"a.age"),"inner")

// different buckets -- b is one bucket higher but still no more than 10 ages different
val diff = buck_x.as("a").join(buck_x.as("b"),($"a.buc_age" + 1 === $"b.buc_age" && $"b.age" <= $"a.age" + 10),"inner")

val result = same.union(diff)

结果(您可以执行 drop 以删除多余的列,如 Charlie 的答案):

result.show(false)
+----+---+-------+----+---+-------+
|name|age|buc_age|name|age|buc_age|
+----+---+-------+----+---+-------+
|Adam|32 |3      |Max |33 |3      |
|Max |33 |3      |Zim |41 |4      |
|Adam|32 |3      |Zim |41 |4      |
+----+---+-------+----+---+-------+