问题描述
fun getUniqueEventAsFlow(receiverId: String): Flow<Any> = callbackFlow {
RxEventBus().register(
receiverId,FirstUniqueEvent::class.java,false
) { amEvent ->
offer(amEvent)
}
// Suspend until either onCompleted or external cancellation are invoked
awaitClose {
unsubscribeFromEventBus(receiverId)
cancel()
}
}.flowOn(dispatchers.Default)
.catch { throwable ->
reportError(throwable as Exception)
}
我想做的是包装以下内容,以便可以自动调用它,因为我在代码中有许多类似的功能:
// Suspend until either onCompleted or external cancellation are invoked
awaitClose {
unsubscribeFromEventBus(receiverId)
cancel()
}
}.flowOn(dispatchers.Default)
.catch { throwable ->
reportError(throwable as Exception)
}
我想包装 awaitClose 和 flowOn 一次,而不必为每个 callbackFlow 编写它。您知道我可以使用哪种Kotlin高阶构造来实现这一目标吗?
谢谢你, 伊戈尔(Igor)
解决方法
以下是包装awaitClose
和handleErrors
的解决方案:
/**
* Utility function which suspends a coroutine until it is completed or closed.
* Unsubscribes from Rx Bus event and ensures that the scope is cancelled upon completion.
*/
suspend fun finalizeFlow(scope: ProducerScope<Any>,receiverId: String) {
scope.awaitClose {
unsubscribeFromEventBus(receiverId)
scope.cancel()
}
scope.invokeOnClose {
Logger.debug(
javaClass.canonicalName,"Closed Flow channel for receiverId $receiverId"
)
}
}
/**
* Extension function which does error handling on [Flow].
*/
fun <T> Flow<T>.handleErrors(): Flow<T> = catch { throwable ->
reportError(throwable as Exception)
}