问题描述
val channel = broadcastChannel<Int>(Channel.CONFLATED)
val flowJob = channel.asFlow().buffer(Channel.UNLIMITED).onEach {
println(it)
}.launchIn(GlobalScope)
for (i in 0..100) {
channel.offer(i*i)
}
flowJob.join()
可以在playground中运行代码。
但是,由于Flow是在单独的调度线程中启动的,并且将值发送到Channel,并且由于Flow具有无限的缓冲区,因此它应该接收每个元素,直到调用onEach为止。但是,为什么只有最后一个元素能够被接收?
这是预期的行为还是某些错误?如果它具有预期的行为,那么有人将如何尝试仅将最新元素推入该流,但是所有具有特定缓冲区的流都可以接收该元素。
解决方法
这里的问题是Channel.CONFLATED
。取自文档:
Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,so that the receiver always gets the most recently sent element.
Back-to-send sent elements are _conflated_ -- only the the most recently sent element is received,while previously sent elements **are lost**.
Sender to this channel never suspends and [offer] always returns `true`.
This channel is created by `Channel(Channel.CONFLATED)` factory function invocation.
This implementation is fully lock-free.
所以这就是为什么您仅获得最新(最后)元素的原因。我会改用UNLIMITED
Channel
:
val channel = Channel<Int>(Channel.UNLIMITED)
val flowJob = channel.consumeAsFlow().onEach {
println(it)
}.launchIn(GlobalScope)
for (i in 0..100) {
channel.offer(i*i)
}
flowJob.join()
,
正如一些评论所述,使用Channel.CONFLATED
仅存储最后一个值,即使您的流程有缓冲区,您也愿意提供channel
。
join()
也将暂停,直到Job
未完成为止(在您的情况下,这是无限的原因),这就是您需要超时的原因。
val channel = Channel<Int>(Channel.RENDEZVOUS)
val flowJob = channel.consumeAsFlow().onEach {
println(it)
}.launchIn(GlobalScope)
GlobalScope.launch{
for (i in 0..100) {
channel.send(i * i)
}
channel.close()
}
flowJob.join()
请查看此解决方案(playground link),使用Channel.RENDEZVOUS
,您的频道仅在其他元素已被使用时才接受新元素。
这就是为什么我们必须使用send
而不是offer
的原因,send
挂起直到可以发送元素,而offer
返回一个布尔值,指示send是否成功。
最后,我们必须close
频道,以使join()
直到永恒都不会暂停。