问题描述
Spark 示例目录中有此示例用于生成偏斜数据 (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala)。我并不完全了解此代码片段将如何生成倾斜数据。对我来说,密钥似乎总是唯一的,因为我们使用的是 java.util.Random 中的方法。谁能解释一下。
我尝试从 spark-shell 运行此代码,但不明白其中的逻辑。有人可以解释一下吗?
scala> val args = Array("3","1000","1000")
args: Array[String] = Array(3,1000,1000)
scala> val numMappers = if (args.length > 0) args(0).toInt else 2
numMappers: Int = 3
scala> val numKVPairs = if (args.length > 1) args(1).toInt else 1000
numKVPairs: Int = 1000
scala> val valSize = if (args.length > 2) args(2).toInt else 1000
valSize: Int = 1000
scala> val numReducers = if (args.length > 3) args(3).toInt else numMappers
numReducers: Int = 3
scala> val ratio = if (args.length > 4) args(4).toInt else 5.0
ratio: Double = 5.0
val pairs1 = spark.sparkContext.parallelize(0 until numMappers,numMappers).flatMap { p =>
val ranGen = new Random
val result = new Array[(Int,Array[Byte])](numKVPairs)
for (i <- 0 until numKVPairs) {
val byteArr = new Array[Byte](valSize)
ranGen.nextBytes(byteArr)
val offset = ranGen.nextInt(1000) * numReducers
if (ranGen.nextDouble < ratio / (numReducers + ratio - 1)) {
// give ratio times higher chance of generating key 0 (for reducer 0)
// println("p:"+p+"offset:"+offset)
result(i) = (offset,byteArr)
} else {
// generate a key for one of the other reducers
val key = 1 + ranGen.nextInt(numReducers-1) + offset
// println("p:"+p+"key:"+key)
result(i) = (key,byteArr)
}
}
result
}
scala> pairs1.count
res11: Long = 3000
scala> println(s"RESULT: ${pairs1.groupByKey(numReducers).count}")
RESULT: 1618
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)