如何从 Kotlin 协程流中的几次发出中仅触发一个结果?

问题描述

我有这样的流程:

fun createrawDataFlow() = callbackFlow<String> {
    SensorProProvider.getInstance(this@MainActivity).registerDataCallback { sensorProDeviceInfo,bytes ->
        val dataString = bytes.map { it.toString() }.reduce { acc,s -> "$acc,$s" }
        val hexString = HEXUtils.bytetoHex(bytes)
        Log.e("onDataReceived","deviceInfo: ${sensorProDeviceInfo.deviceidentify},dataSize:${bytes.size},data:$dataString")
        offer(hexString)
    }
    awaitClose {  }
}


GlobalScope.launch(dispatchers.IO) {
    createrawDataFlow()
        .map {
            Log.e("onDataReceived","map2: ${Thread.currentThread().name}")
            // what I want is collecting 10 emits of sensor's data,and return a list of them
            // arraylistof<String>(/* 10 hexStrings here*/)
        }
        .flowOn(dispatchers.IO)
        .collect {
            Log.e("onDataReceived","thread: ${Thread.currentThread().name},hexData:$it")
        }
}

就像代码中的注释一样。我想从流中收集 10 个十六进制字符串,因为这些字符串来自同一时间段,然后将它们打包在一个数组列表中以供返回。我怎样才能做到这一点?是否有任何类似于 map 的运算符来执行此操作?顺便说一句,原谅我糟糕的英语。

解决方法

如果您需要批量收集,并且不想取消原始流,您可以调整您的发射流函数,使其保存值的缓存。

/*
 * Returns a list of at least [batchSize] integers.
 */
fun aFlow(cacheSize: Int = 10): Flow<List<Int>> {
  var counter: Int = 0
  val cache: MutableList<Int> = mutableListOf()

  return flow {
    while(currentCoroutineContext().isActive) {
      cache.add(counter++)

      if (cache.size >= cacheSize) {
        emit(cache)
        cache.clear()
      }

      delay(500L) // the delay is just to simulate incoming sensor data
    }
  }
}

通用解决方案

为了使它更通用,我在流上创建了一个通用扩展函数,您可以将其应用于您想要返回批处理列表的任何流。

考虑我们有一个 infiniteFlow 的整数:

fun infiniteFlow(): Flow<Int> {
    var counter: Int = 0

    return flow {
        while (currentCoroutineContext().isActive) {
            emit(counter++)
            delay(250L) // the delay is just to simulate incoming sensor data
        }
    }
}

还有这个 batch 扩展函数:

/**
 * Maps the Flow<T> to Flow<List<T>>. The list size is at least [batchSize]
 */
fun <T> Flow<T>.batch(batchSize: Int = 10): Flow<List<T>> {
    val cache: MutableList<T> = mutableListOf()

    return map {
        cache.apply { add(it) }
    }.filter { it.size >= batchSize }
        .map {
            mutableListOf<T>().apply { // copy the list and clears the cache
                addAll(cache)
                cache.clear()
            }
        }
}

注意:这只是一个例子。它没有针对边缘情况进行优化或测试!

然后你可以像这样使用这个函数:

infiniteFlow().batch(batchSize = 12).collect { println(it) }
,

首先你申请take

fun <T> Flow<T>.take(count: Int): Flow<T>

返回包含第一个 count 元素的流。当 count 元素被消耗时,原始流被取消。如果计数不是正数,则抛出 IllegalArgumentException。

然后你申请toCollection

suspend fun <T,C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C

将给定的流收集到目的地

如果需要,您可以在 taketoCollection 之间放置其他操作。这就是它的整体外观:

val hexList: List<String> = createRawDataFlow().take(10) // take first 10 items
  .map { ... } // do some (optional) mapping
  .toCollection(mutableListOf())

// after the flow is cancelled (all items consumed) you have the elements in hexList

文档