问题描述
我正在尝试使用 sharedFlow
使用 kotlin 协程创建一种轮询机制,并希望在没有订阅者时停止并在至少有一个订阅者时处于活动状态。我的问题是,在这种情况下,sharedFlow
是正确的选择还是我应该使用 channel
。我尝试使用 channelFlow
但我不知道如何关闭块体外部的通道(而不是 cancel
作业)。有人可以帮忙吗?这是片段。
fun poll(id: String) = channelFlow {
while (!isClosedForSend) {
try {
send(repository.getDetails(id))
delay(MIN_REFRESH_TIME_MS)
} catch (throwable: Throwable) {
Timber.e("error -> ${throwable.message}")
}
invokeOnClose { Timber.e("channel flow closed.") }
}
}
解决方法
首先,当你调用channelFlow(block)时,不需要手动关闭通道。块执行完成后通道会自动关闭。
我认为“生产”协程构建器功能可能是您所需要的。但不幸的是,它仍然是一个实验性的 api。
fun poll(id: String) = someScope.produce {
invokeOnClose { Timber.e("channel flow closed.") }
while (true) {
try {
send(repository.getDetails(id))
// delay(MIN_REFRESH_TIME_MS) //no need
} catch (throwable: Throwable) {
Timber.e("error -> ${throwable.message}")
}
}
}
fun main() = runBlocking {
val channel = poll("hello")
channel.receive()
channel.cancel()
}
如果不调用返回的channel的receive()方法,produce函数就会挂起,所以不需要延迟。
更新:使用 broadcast
在多个 ReceiveChannel 之间共享值。
fun poll(id: String) = someScope.broadcast {
invokeOnClose { Timber.e("channel flow closed.") }
while (true) {
try {
send(repository.getDetails(id))
// delay(MIN_REFRESH_TIME_MS) //no need
} catch (throwable: Throwable) {
Timber.e("error -> ${throwable.message}")
}
}
}
fun main() = runBlocking {
val broadcast = poll("hello")
val channel1 = broadcast.openSubscription()
val channel2 = broadcast.openSubscription()
channel1.receive()
channel2.receive()
broadcast.cancel()
}
,
您可以使用 SharedFlow,它以广播方式发出值(在所有收集器都使用前一个值之前不会发出新值)。
val sharedFlow = MutableSharedFlow<String>()
val scope = CoroutineScope(Job() + Dispatchers.IO)
var producer: Job()
scope.launch {
val producer = launch() {
sharedFlow.emit(...)
}
sharedFlow.subscriptionCount
.map {count -> count > 0}
.distinctUntilChanged()
.collect { isActive -> if (isActive) stopProducing() else startProducing()
}
fun CoroutineScope.startProducing() {
producer = launch() {
sharedFlow.emit(...)
}
}
fun stopProducing() {
producer.cancel()
}