问题描述
我有这样的流程:
fun createrawDataFlow() = callbackFlow<String> {
SensorProProvider.getInstance(this@MainActivity).registerDataCallback { sensorProDeviceInfo,bytes ->
val dataString = bytes.map { it.toString() }.reduce { acc,s -> "$acc,$s" }
val hexString = HEXUtils.bytetoHex(bytes)
Log.e("onDataReceived","deviceInfo: ${sensorProDeviceInfo.deviceidentify},dataSize:${bytes.size},data:$dataString")
offer(hexString)
}
awaitClose { }
}
GlobalScope.launch(dispatchers.IO) {
createrawDataFlow()
.map {
Log.e("onDataReceived","map2: ${Thread.currentThread().name}")
// what I want is collecting 10 emits of sensor's data,and return a list of them
// arraylistof<String>(/* 10 hexStrings here*/)
}
.flowOn(dispatchers.IO)
.collect {
Log.e("onDataReceived","thread: ${Thread.currentThread().name},hexData:$it")
}
}
就像代码中的注释一样。我想从流中收集 10 个十六进制字符串,因为这些字符串来自同一时间段,然后将它们打包在一个数组列表中以供返回。我怎样才能做到这一点?是否有任何类似于 map 的运算符来执行此操作?顺便说一句,原谅我糟糕的英语。
解决方法
如果您需要批量收集,并且不想取消原始流,您可以调整您的发射流函数,使其保存值的缓存。
/*
* Returns a list of at least [batchSize] integers.
*/
fun aFlow(cacheSize: Int = 10): Flow<List<Int>> {
var counter: Int = 0
val cache: MutableList<Int> = mutableListOf()
return flow {
while(currentCoroutineContext().isActive) {
cache.add(counter++)
if (cache.size >= cacheSize) {
emit(cache)
cache.clear()
}
delay(500L) // the delay is just to simulate incoming sensor data
}
}
}
通用解决方案
为了使它更通用,我在流上创建了一个通用扩展函数,您可以将其应用于您想要返回批处理列表的任何流。
考虑我们有一个 infiniteFlow
的整数:
fun infiniteFlow(): Flow<Int> {
var counter: Int = 0
return flow {
while (currentCoroutineContext().isActive) {
emit(counter++)
delay(250L) // the delay is just to simulate incoming sensor data
}
}
}
还有这个 batch
扩展函数:
/**
* Maps the Flow<T> to Flow<List<T>>. The list size is at least [batchSize]
*/
fun <T> Flow<T>.batch(batchSize: Int = 10): Flow<List<T>> {
val cache: MutableList<T> = mutableListOf()
return map {
cache.apply { add(it) }
}.filter { it.size >= batchSize }
.map {
mutableListOf<T>().apply { // copy the list and clears the cache
addAll(cache)
cache.clear()
}
}
}
注意:这只是一个例子。它没有针对边缘情况进行优化或测试!
然后你可以像这样使用这个函数:
infiniteFlow().batch(batchSize = 12).collect { println(it) }
,
首先你申请take
fun <T> Flow<T>.take(count: Int): Flow<T>
返回包含第一个 count 元素的流。当 count 元素被消耗时,原始流被取消。如果计数不是正数,则抛出 IllegalArgumentException。
然后你申请toCollection
suspend fun <T,C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C
将给定的流收集到目的地
如果需要,您可以在 take
和 toCollection
之间放置其他操作。这就是它的整体外观:
val hexList: List<String> = createRawDataFlow().take(10) // take first 10 items
.map { ... } // do some (optional) mapping
.toCollection(mutableListOf())
// after the flow is cancelled (all items consumed) you have the elements in hexList
文档