Kotlin SharedFlow 合并操作在特定情况下具有 zip 行为


我正在组合两个 SharedFlows,然后执行长时间的工作操作。




val _numbers = MutableSharedFlow<Int>(replay = 0,extraBufferCapacity = 1,onBufferOverflow = BufferOverflow.DROP_OLDEST)
val numbers: SharedFlow<Int> = _numbers
val _strings = MutableSharedFlow<String>(replay = 0,onBufferOverflow = BufferOverflow.DROP_OLDEST)
val strings: SharedFlow<String> = _strings

combine(numbers,strings) { (number,strings) ->
    println("values $number - $strings. Starting to perform a long working job")

runBlocking {

    // This is the initial values. I always kNow this at start.

    // Depending of user action,number or string is emitted.


    // In a specific situation both values need to change but I only want to trigger the long working job once


values 0 - a. Starting to perform a long working job
values 1 - a. Starting to perform a long working job
values 2 - a. Starting to perform a long working job
values 3 - a. Starting to perform a long working job
values 4 - a. Starting to perform a long working job
values 4 - b. Starting to perform a long working job
values 4 - c. Starting to perform a long working job
values 4 - d. Starting to perform a long working job
values 4 - e. Starting to perform a long working job
values 10 - e. Starting to perform a long working job
values 10 - Z. Starting to perform a long working job


values 0 - a. Starting to perform a long working job
values 1 - a. Starting to perform a long working job
values 2 - a. Starting to perform a long working job
values 3 - a. Starting to perform a long working job
values 4 - a. Starting to perform a long working job
values 4 - b. Starting to perform a long working job
values 4 - c. Starting to perform a long working job
values 4 - d. Starting to perform a long working job
values 4 - e. Starting to perform a long working job
values 10 - Z. Starting to perform a long working job

由于缓冲区溢出,有时我可以实现我想要的(最新的),但在其他情况下,我对 values 10 - e. Starting to perform a long working job 不感兴趣。




如果你想保持 2 个流,单事件和双事件之间的区别必须是基于时间的。您将无法区分字符串然后数字的快速更新和“双重更新”。

如果基于时间对您来说没问题,那么在长时间处理之前使用 debounce 应该是可行的方法:

combine(numbers,strings) { (number,string) -> number to string }
    .onEach { (number,string) ->
        println("values $number - $string. Starting to perform a long working job")

此处,combine 仅从 2 个流构建对,但仍获取所有事件,然后 debounce 忽略快速连续事件并仅发送快速系列中的最新事件。这也带来了轻微的延迟,但这完全取决于您想要实现的目标。

如果基于时间的区分不适合您,您需要一种让生产者以不同于 2 个单事件的方式发送双事件的方法。为此,您可以使用单个事件流,例如,您可以像这样定义事件:

sealed class Event {
    data class SingleNumberUpdate(val value: Int): Event()
    data class SingleStringUpdate(val value: String): Event()
    data class DoubleUpdate(val num: Int,val str: String): Event()


flow {
    var num = 0
    var str = "a"
    emit(num to str)
    events.collect { e ->
        when (e) {
            is Event.SingleNumberUpdate -> {
                num = e.value
            is Event.SingleStringUpdate -> {
                str = e.value
            is Event.DoubleUpdate -> {
                num = e.num
                str = e.str
        emit(num to str)
.onEach { (number,strings) ->
    println("values $number - $strings. Starting to perform a long working job")