使用无止境的流量测试Kotlin Flow.combine

问题描述

我有以下测试

@Test
fun combineUnendingFlows() = runBlockingTest {

    val source1 = flow {
        emit("a")
        emit("b")
        suspendCancellableCoroutine { /* Never complete. */ }
    }

    val source2 = flowOf(1,2,3)

    val combinations = mutablelistof<String>()

    combine(source1,source2) { first,second -> "$first$second" }
        .onEach(::println)
        .onEach(combinations::add)
        .launchIn(this)

    advanceUntilIdle()

    assertthat(combinations).containsExactly("a1","b1","b2","b3")
}

断言成功,但是测试失败,但出现以下异常:

kotlinx.coroutines.test.UncompletedCoroutinesError: Test finished with active jobs

我知道这是人为的,我们可以通过确保source1完成来轻松通过此过程,但是我想知道为什么它失败了? runBlockingTest是测试永无止境的流程的错误方法吗?

(这与协程1.4.0一起使用)。

解决方法

问题的症结似乎是runBlockingTest(可能是正确的),它期望到完成时没有正在运行的作业。

如果我们可以明确取消runBlockingTest提供的范围,那很好,但是尝试在该范围上调用.cancel()会引发异常。

this GitHub issue中有关于该问题的更多讨论。

在这个人为设计的示例中,很容易获得对Job创建的launchIn()的引用,并在测试关闭之前将其取消。

更复杂的情况可能会捕获UncompletedCoroutinesError并忽略它,因为他们希望存在未完成的协程,或者可以创建一个新子CoroutineScope来明确取消。在此示例中,

@Test
fun combineUnendingFlows() = runBlockingTest {
    val job = Job()
    val childScope = CoroutineScope(this.coroutineContext + job)

    val source1 = flow {
        emit("a")
        emit("b")
        suspendCancellableCoroutine { /* Never complete. */ }
    }

    val source2 = flowOf(1,2,3)

    val combinations = mutableListOf<String>()

    combine(source1,source2) { first,second -> "$first$second" }
        .onEach(::println)
        .onEach(combinations::add)
        .launchIn(childScope)

    advanceUntilIdle()
    assertThat(combinations).containsExactly("a1","b1","b2","b3")
    job.cancel()
}