SimpleSkewedGroupByTest中Spark生成偏斜数据的逻辑

问题描述

Spark 示例目录中有此示例用于生成偏斜数据 (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala)。我并不完全了解此代码片段将如何生成倾斜数据。对我来说,密钥似乎总是唯一的,因为我们使用的是 java.util.Random 中的方法。谁能解释一下。

我尝试从 spark-shell 运行此代码,但不明白其中的逻辑。有人可以解释一下吗?

scala> val args = Array("3","1000","1000")
args: Array[String] = Array(3,1000,1000)

scala> val numMappers = if (args.length > 0) args(0).toInt else 2
numMappers: Int = 3

scala> val numKVPairs = if (args.length > 1) args(1).toInt else 1000
numKVPairs: Int = 1000

scala> val valSize = if (args.length > 2) args(2).toInt else 1000
valSize: Int = 1000

scala> val numReducers = if (args.length > 3) args(3).toInt else numMappers
numReducers: Int = 3

scala> val ratio = if (args.length > 4) args(4).toInt else 5.0
ratio: Double = 5.0

val pairs1 = spark.sparkContext.parallelize(0 until numMappers,numMappers).flatMap { p =>
      val ranGen = new Random
      val result = new Array[(Int,Array[Byte])](numKVPairs)
      for (i <- 0 until numKVPairs) {
        val byteArr = new Array[Byte](valSize)
        ranGen.nextBytes(byteArr)
        val offset = ranGen.nextInt(1000) * numReducers
        if (ranGen.nextDouble < ratio / (numReducers + ratio - 1)) {
          // give ratio times higher chance of generating key 0 (for reducer 0)
          // println("p:"+p+"offset:"+offset)
          result(i) = (offset,byteArr)
        } else {
          // generate a key for one of the other reducers
          val key = 1 + ranGen.nextInt(numReducers-1) + offset
          // println("p:"+p+"key:"+key)
          result(i) = (key,byteArr)
        }
      }
      result
    }


scala>  pairs1.count
res11: Long = 3000

scala> println(s"RESULT: ${pairs1.groupByKey(numReducers).count}")
RESULT: 1618

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)