将callbackflow转换为sharedflow

问题描述

我只是从协程/流(通常是kotlin)的使用开始,我正在努力将callbackFlow转换为sharedFlow。

我整理了以下简单示例,只是为了展示我尝试过的方法,但没有成功。我的代码更加复杂,但是我相信这个示例反映了我要解决的问题。

fun main() = runBlocking {

    getMySharedFlow().collect{
        println("collector 1 value: $it")
    }

    getMySharedFlow().collect{
        println("collector 2 value: $it")
    }

}

val sharedFlow = MutableSharedFlow<Int>()

suspend fun getMySharedFlow(): SharedFlow<Int> {
    println("inside sharedflow")
    getMyCallbackFlow().collect{
        println("emitting to sharedflow value: $it")
        sharedFlow.emit(it)
    }
    return sharedFlow
}

fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
    println("inside callbackflow producer")
    fetchSomethingContinuously {
        println("fetched something")
        offer(1)
        offer(2)
        offer(3)
    }
    awaitClose()
}

fun fetchSomethingContinuously(myCallBack: ()->Unit) {
    println("fetching something...")
    myCallBack()
}

这个想法是fetchSomethingContinuously仅被调用一次,与sharedFlow的收集器数量无关。但是,从输出中可以看到,收集器永远不会获取值:

inside sharedflow
inside callbackflow producer
fetching something...
fetched something
emitting to sharedflow value: 1
emitting to sharedflow value: 2
emitting to sharedflow value: 3

我查看了shareIn运算符,但不确定如何正确使用它。

我如何实现这样的目标?任何提示将不胜感激。

解决方法

因此,您在这里所缺少的是以下事实:对collectemit()awaitClose()的调用正在挂起,并且只有在完成相应的操作后才能结束。

函数getMySharedFlow()甚至没有返回以在其上应用收集,因为它正在收集callbackFlowcallbackFlow停留在对awaitClose()的调用上反过来并没有完成,因为fetchSomethingContinuously没有用close()函数结束回调。

您需要意识到,您必须在这里创建一些显式的并行性,而不是生活在悬浮的世界中。您的示例代码的有效变体为:

val sharedFlow = MutableSharedFlow<Int>()

suspend fun startSharedFlow() {
    println("Starting Shared Flow callback collection")

    getMyCallbackFlow().collect {
        println("emitting to sharedflow value: $it")
        sharedFlow.emit(it)
    }
}

fun main() = runBlocking<Unit> {

    launch {
        startSharedFlow()
    }

    launch {
        sharedFlow.collect {
            println("collector 1 value: $it")
        }
    }

    launch {
        sharedFlow.collect {
            println("collector 2 value: $it")
        }
    }

}


fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
    println("inside callbackflow producer")
    fetchSomethingContinuously {
        println("fetched something")
        offer(1)
        offer(2)
        offer(3)
        //close() - call close here if you need to signal that this callback is done sending elements
    }
    awaitClose()
}

fun fetchSomethingContinuously(myCallBack: () -> Unit) {
    println("fetching something...")
    myCallBack()
}

launch的调用允许异步执行发射和收集值。

此外,关于shareIn()运算符,它只是从指定的上游创建了一个SharedFlow,就像您想要的那样。另外,您可以使用started参数指定何时开始共享。有关here的更多信息。

这是您在示例中使用它的方式:

fun main() = runBlocking<Unit> {

    val sharedFlow = getMyCallbackFlow().shareIn(this,started = SharingStarted.Eagerly)

    launch {
        sharedFlow.collect {
            println("collector 1 value: $it")
        }
    }

    launch {
        sharedFlow.collect {
            println("collector 2 value: $it")
        }
    }

}


fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
    println("inside callbackflow producer")
    fetchSomethingContinuously {
        println("fetched something")
        offer(1)
        offer(2)
        offer(3)
        //close() - call close here if you need to signal that this callback is done sending elements
    }
    awaitClose()
}

fun fetchSomethingContinuously(myCallBack: () -> Unit) {
    println("fetching something...")
    myCallBack()
}