scala – Apache Spark – dataset.dropDuplicates()是否保留分区?

我知道存在几个保留父分区的转换(如果它在之前设置 – 例如mapValues)和一些不保留它的转换(例如map).

我使用Spark 2.2的数据集API.我的问题是 – dropDuplicates转换是否保留了分区?想象一下这段代码

case class Item(one: Int,two: Int,three: Int)

import session.implicits._
val ds = session.createDataset(List(Item(1,2,3),Item(1,3)))

val repart = ds.repartition('one,'two).cache()

repart.dropDuplicates(List("one","two")) // will be partitioning preserved?

解决方法

通常,dropDuplicates会进行随机播放(因此不会保留分区),但在您的特殊情况下,它不会执行额外的随机播放,因为您已经以优化器考虑的合适形式对数据集进行了分区:

repart.dropDuplicates(List("one","two")).explain()

 == Physical Plan ==
*HashAggregate(keys=[one#3,two#4,three#5],functions=[])
+- *HashAggregate(keys=[one#3,functions=[])
   +- InMemoryTableScan [one#3,three#5]
         +- InMemoryRelation [one#3,true,10000,StorageLevel(disk,memory,deserialized,1 replicas)
               +- Exchange hashpartitioning(one#3,200)
                  +- LocalTableScan [one#3,three#5]

要查找的关键字是:Exchange

但请考虑以下代码,首先使用plain repartition()重新分区数据集:

val repart = ds.repartition(200).cache()
repart.dropDuplicates(List("one","two")).explain()

这确实会引发额外的混乱(现在你有2个Exchange步骤):

== Physical Plan ==
*HashAggregate(keys=[one#3,two#4],functions=[first(three#5,false)])
+- Exchange hashpartitioning(one#3,200)
   +- *HashAggregate(keys=[one#3,functions=[partial_first(three#5,false)])
      +- InMemoryTableScan [one#3,three#5]
            +- InMemoryRelation [one#3,1 replicas)
                  +- Exchange RoundRobinPartitioning(200)
                     +- LocalTableScan [one#3,three#5]

注意:我使用Spark 2.1检查了它,它在Spark 2.2中可能有所不同,因为Spark 2.2(基于成本的优化器)中的优化器已更改

相关文章

共收录Twitter的14款开源软件,第1页Twitter的Emoji表情 Tw...
Java和Scala中关于==的区别Java:==比较两个变量本身的值,即...
本篇内容主要讲解“Scala怎么使用”,感兴趣的朋友不妨来看看...
这篇文章主要介绍“Scala是一种什么语言”,在日常操作中,相...
这篇文章主要介绍“Scala Trait怎么使用”,在日常操作中,相...
这篇文章主要介绍“Scala类型检查与模式匹配怎么使用”,在日...