我知道存在几个保留父分区的转换(如果它在之前设置 – 例如mapValues)和一些不保留它的转换(例如map).
我使用Spark 2.2的数据集API.我的问题是 – dropDuplicates转换是否保留了分区?想象一下这段代码:
case class Item(one: Int,two: Int,three: Int) import session.implicits._ val ds = session.createDataset(List(Item(1,2,3),Item(1,3))) val repart = ds.repartition('one,'two).cache() repart.dropDuplicates(List("one","two")) // will be partitioning preserved?
解决方法
通常,dropDuplicates会进行随机播放(因此不会保留分区),但在您的特殊情况下,它不会执行额外的随机播放,因为您已经以优化器考虑的合适形式对数据集进行了分区:
repart.dropDuplicates(List("one","two")).explain() == Physical Plan == *HashAggregate(keys=[one#3,two#4,three#5],functions=[]) +- *HashAggregate(keys=[one#3,functions=[]) +- InMemoryTableScan [one#3,three#5] +- InMemoryRelation [one#3,true,10000,StorageLevel(disk,memory,deserialized,1 replicas) +- Exchange hashpartitioning(one#3,200) +- LocalTableScan [one#3,three#5]
要查找的关键字是:Exchange
但请考虑以下代码,首先使用plain repartition()重新分区数据集:
val repart = ds.repartition(200).cache() repart.dropDuplicates(List("one","two")).explain()
这确实会引发额外的混乱(现在你有2个Exchange步骤):
== Physical Plan == *HashAggregate(keys=[one#3,two#4],functions=[first(three#5,false)]) +- Exchange hashpartitioning(one#3,200) +- *HashAggregate(keys=[one#3,functions=[partial_first(three#5,false)]) +- InMemoryTableScan [one#3,three#5] +- InMemoryRelation [one#3,1 replicas) +- Exchange RoundRobinPartitioning(200) +- LocalTableScan [one#3,three#5]
注意:我使用Spark 2.1检查了它,它在Spark 2.2中可能有所不同,因为Spark 2.2(基于成本的优化器)中的优化器已更改