如何正确使用 Flow.onStart {} 重新获取缓存内容?

问题描述

我有一个获取某物的方法,为了简单起见,让我们将其设为字符串。该方法应该返回一个最初发出缓存字符串的流,然后在查询我的 API 后发出“新鲜”值。

幸运的是,每当给定表更新时,Room 都会发出新数据,因此部分逻辑可以开箱即用。我也有刷新/重新获取工作。但是当我尝试使用 .onStart{}(恕我直言看起来更简洁)时,功能和我的理解都崩溃了:/

这是一个概念证明,它应该在 IntelliJ 或 Android Studio 中运行,没有太多异常依赖:

// Room automatically emits new values on dbFlow when the relevant table is updated
val dbFlow = MutableStateFlow("cachedValue")

// refresh simulates fetchSomethingFromApi().also { someDao.updateData(it) }
val refresh = suspend {
    delay(1000) // simulate API delay
    stream.value = "freshValueFromAPI"
}

suspend fun doesNotWork(): Flow<String> = dbFlow
    .onStart {
        coroutinescope {
            launch {
                refresh()
            }
        }
    }

suspend fun thisWorks(): Flow<String> = flow {
    coroutinescope {
        launch {
            refresh()
        }
        dbFlow.collect {
            emit(it)
        }
    }
}


如何测试:

runBlocking {
    thisWorks().take(2).collect {
        println(it)
    }
}

或:

runBlocking {
    doesNotWork().take(2).collect {
        println(it)
    }
}

我希望两者产生相同的结果,然而带有 .onStart {} 的那个永远不会发出缓存值,所以 .take(2) 最终会超时(因为它只发出一次) .

这里发生了什么?

解决方法

这种行为的原因是
a) onStart { ... } 在收集流之前执行。
举个简单的例子:

flow {
    emit("foo")  
}.onStart {
    println("bar")
}.collect {
    println(it)
}

生产

bar
foo

和 b) coroutineScope {...} 等待,直到块内启动的所有子协程都完成
另一个例子:

suspend fun foo() {
    coroutineScope {
        launch {
            delay(1000)
        }
    }
}

调用这个函数需要大约 1000 毫秒,因为 coroutineScope 会等到内部子协程完成

现在到你的例子

suspend fun doesNotWork(): Flow<String> = dbFlow
    .onStart {
        coroutineScope {
            launch {
                refresh()
            }
        }
    }

根据 b),这与

具有相同的行为
suspend fun doesNotWork(): Flow<String> = dbFlow
    .onStart {
        refresh()
    }

由于onStart{...}是在收集流之前执行的,所以这和写是一样的

suspend fun doesNotWork(): Flow<String> = flow {
    refresh()
    // could be simplified  to emitAll(dbFlow)
    dbFlow.collect {
        emit(it)
    }
}

现在您看到这与您的工作示例有何不同。 您首先从 api 刷新,然后开始从数据库发出值。 虽然您的工作示例启动了一个新的协程,该协程从您的 api 异步刷新并立即开始从您的数据库发出值。