如何为Kotlin Coroutines Flow编写扩展功能/包装?

问题描述

我有协同程序代码,它使用的是callbackFlow

fun getUniqueEventAsFlow(receiverId: String): Flow<Any> = callbackFlow {
    RxEventBus().register(
        receiverId,FirstUniqueEvent::class.java,false
    ) { amEvent ->
        offer(amEvent)
    }
    // Suspend until either onCompleted or external cancellation are invoked
    awaitClose {
        unsubscribeFromEventBus(receiverId)
        cancel()
    }
}.flowOn(dispatchers.Default)
    .catch { throwable ->
        reportError(throwable as Exception)
    }

我想做的是包装以下内容,以便可以自动调用它,因为我在代码中有许多类似的功能

        // Suspend until either onCompleted or external cancellation are invoked
        awaitClose {
            unsubscribeFromEventBus(receiverId)
            cancel()
        }
    }.flowOn(dispatchers.Default)
        .catch { throwable ->
            reportError(throwable as Exception)
        }

我想包装 awaitClose flowOn 一次,而不必为每个 callbackFlow 编写它。您知道我可以使用哪种Kotlin高阶构造来实现这一目标吗?

谢谢你, 伊戈尔(Igor)

解决方法

以下是包装awaitClosehandleErrors的解决方案:

/**
 * Utility function which suspends a coroutine until it is completed or closed.
 * Unsubscribes from Rx Bus event and ensures that the scope is cancelled upon completion.
 */
suspend fun finalizeFlow(scope: ProducerScope<Any>,receiverId: String) {
    scope.awaitClose {
        unsubscribeFromEventBus(receiverId)
        scope.cancel()
    }

    scope.invokeOnClose {
        Logger.debug(
            javaClass.canonicalName,"Closed Flow channel for receiverId $receiverId"
        )
    }
}

 /**
  * Extension function which does error handling on [Flow].
  */
 fun <T> Flow<T>.handleErrors(): Flow<T> = catch { throwable ->
     reportError(throwable as Exception)
 }