问题描述
让我们考虑一个数据集:
姓名 | 年龄 |
---|---|
最大 | 33 |
亚当 | 32 |
Zim | 41 |
穆勒 | 62 |
现在,如果我们在数据集 x
上运行这个查询:
x.as("a").join(x.as("b")).where(
$"b.age" - $"a.age" <= 10 and
$"a.age" > $"b.age").show()
姓名 | 年龄 | 姓名 | 年龄 |
---|---|---|---|
最大 | 33 | Zim | 41 |
亚当 | 32 | 最大 | 33 |
亚当 | 32 | Zim | 41 |
这是我想要的结果。
现在,从概念上讲,如果我有一个非常大的数据集,我可能想使用分桶来减少搜索空间。
因此,使用以下方法进行分桶:
val buck_x = x.withColumn("buc_age",floor($"age"/ 10))
这给了我:
姓名 | 年龄 | buck_age |
---|---|---|
最大 | 33 | 3 |
亚当 | 32 | 3 |
Zim | 41 | 4 |
穆勒 | 62 | 6 |
爆炸后,我得到以下结果:
val exp_x = buck_x.withColumn("buc_age",explode(array($"buc_age" -1,$"buc_age",$"buc_age" + 1)))
姓名 | 年龄 | buck_age |
---|---|---|
最大 | 33 | 2 |
最大 | 33 | 3 |
最大 | 33 | 4 |
亚当 | 32 | 2 |
亚当 | 32 | 3 |
亚当 | 32 | 4 |
Zim | 41 | 3 |
Zim | 41 | 4 |
Zim | 41 | 5 |
穆勒 | 62 | 5 |
穆勒 | 62 | 6 |
穆勒 | 62 | 7 |
现在,在最终查询之后,
exp_x.as("a").join(exp_x.as("b")).where(
$"a.buc_age" === $"b.buc_age" and
$"b.age" - $"a.age" <= 10 and
$"b.age" > $"a.age").show()
我得到以下结果。
姓名 | 年龄 | buc_age | 姓名 | 年龄 | buc_age |
---|---|---|---|---|---|
最大 | 33 | 3 | Zim | 41 | 3 |
最大 | 33 | 4 | Zim | 41 | 4 |
亚当 | 32 | 2 | 最大 | 33 | 2 |
亚当 | 32 | 3 | Zim | 41 | 3 |
亚当 | 32 | 3 | 最大 | 33 | 3 |
亚当 | 32 | 4 | Zim | 41 | 4 |
亚当 | 32 | 4 | 最大 | 33 | 4 |
显然,这与我的预期不同,我得到的行数比预期的多。使用bucket时如何解决这个问题?
解决方法
删除分桶列,然后选择不同的行,基本上消除了爆炸引起的重复:
<div>
<div v-if="active == 0">
<ingresar-informacion-general></ingresar-informacion-general>
</div>
<div v-if="active == 1">
<ingresar-proveedor></ingresar-proveedor>
</div>
<div v-if="active == 2">
<ingresar-condiciones></ingresar-condiciones>
</div>
</div>
<el-steps :active="active" finish-status="success" simple>
<el-step title="Informacion General"></el-step>
<el-step title="Proveedor"></el-step>
<el-step title="Condiciones"></el-step>
</el-steps>
,
真的没有必要爆炸。
相反,这种方法联合了两个内部自连接。两个连接查找以下情况:
- A 和 B 在同一个桶中,而 B 更老
- B 多一桶,但不超过 10 岁
这应该比使用爆炸效果更好,因为执行的比较更少(因为这里连接的集合是爆炸尺寸的三分之一)。
val namesDF = Seq(("Max",33),("Adam",32),("Zim",41),("Muller",62)).toDF("name","age")
val buck_x = namesDF.withColumn("buc_age",floor($"age" / 10))
// same bucket where b is still older
val same = buck_x.as("a").join(buck_x.as("b"),($"a.buc_age" === $"b.buc_age" && $"b.age" > $"a.age"),"inner")
// different buckets -- b is one bucket higher but still no more than 10 ages different
val diff = buck_x.as("a").join(buck_x.as("b"),($"a.buc_age" + 1 === $"b.buc_age" && $"b.age" <= $"a.age" + 10),"inner")
val result = same.union(diff)
结果(您可以执行 drop
以删除多余的列,如 Charlie 的答案):
result.show(false)
+----+---+-------+----+---+-------+
|name|age|buc_age|name|age|buc_age|
+----+---+-------+----+---+-------+
|Adam|32 |3 |Max |33 |3 |
|Max |33 |3 |Zim |41 |4 |
|Adam|32 |3 |Zim |41 |4 |
+----+---+-------+----+---+-------+