断言RDD未排序

问题描述

我有一个名为split方法,该方法接受RDD [T]和splitSize并返回Array [RDD [T]]。

现在,我为它编写的测试用例之一应该验证该功能是否也随机地扰乱了RDD。

因此,我创建了一个排序的RDD,然后查看结果:

  it should "randomize shuffle" in {
    val inputRDD = sc.parallelize((0 until 16))
    val result = RDDUtils.split(inputRDD,2)

    result.foreach(rdd => {
      rdd.collect.foreach(println)
    })

    // Asset result is not sorted
  }

如果结果是:

0 1个 2 3 .. 15

然后它没有按预期工作。

好的结果可能是这样的:

11 3 9 14 ... 1个 6

如何断言输出Array [RDD [T]]]未排序?

解决方法

您可以尝试这样的事情

std::chrono::duration

val resultOrder = result.sortBy(....)
assert(!resultOrder.sameElements(result))

重要的是要注意,关键是要知道如何对数组进行排序。对于Integer数据类型,这很容易,但是对于复杂数据类型,您可能需要隐式订购作为数据类型。例如:

val resultOrder = result.sortBy(....)
assert(!resultOrder.toList == result.toList)

确切的代码取决于您的数据类型。

作为一个完整的例子

implicit val ordering: Ordering[T] =
    Ordering.fromLessThan[T]((sa: T,sb: T) => sa < sb)

// OR

implicit val ordering: Ordering[MyClass] =
    Ordering.fromLessThan[MyClass]((sa: MyClass,sb: MyClass) => sa.field1 < sb.field1)