为什么在ConflatedBroadcastChannel上创建的Flow只能够接收最后一个元素?

问题描述

以下代码仅打印10000,即仅最后一个元素

val channel = broadcastChannel<Int>(Channel.CONFLATED)
val flowJob = channel.asFlow().buffer(Channel.UNLIMITED).onEach {
    println(it)
}.launchIn(GlobalScope)

for (i in 0..100) {
    channel.offer(i*i)
}
flowJob.join()

可以在playground中运行代码

但是,由于Flow是在单独的调度线程中启动的,并且将值发送到Channel,并且由于Flow具有无限的缓冲区,因此它应该接收每个元素,直到调用onEach为止。但是,为什么只有最后一个元素能够被接收?

这是预期的行为还是某些错误?如果它具有预期的行为,那么有人将如何尝试仅将最新元素推入该流,但是所有具有特定缓冲区的流都可以接收该元素。

解决方法

这里的问题是Channel.CONFLATED。取自文档:

Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,so that the receiver always gets the most recently sent element.
Back-to-send sent elements are _conflated_ -- only the the most recently sent element is received,while previously sent elements **are lost**.
Sender to this channel never suspends and [offer] always returns `true`.

This channel is created by `Channel(Channel.CONFLATED)` factory function invocation.

This implementation is fully lock-free.

所以这就是为什么您仅获得最新(最后)元素的原因。我会改用UNLIMITED Channel

val channel = Channel<Int>(Channel.UNLIMITED)
val flowJob = channel.consumeAsFlow().onEach {
    println(it)
}.launchIn(GlobalScope)

for (i in 0..100) {
    channel.offer(i*i)
}
flowJob.join()
,

正如一些评论所述,使用Channel.CONFLATED仅存储最后一个值,即使您的流程有缓冲区,您也愿意提供channel

join()也将暂停,直到Job未完成为止(在您的情况下,这是无限的原因),这就是您需要超时的原因。

 val channel = Channel<Int>(Channel.RENDEZVOUS)
 val flowJob = channel.consumeAsFlow().onEach {
     println(it)
 }.launchIn(GlobalScope)

GlobalScope.launch{
    for (i in 0..100) {
        channel.send(i * i)
    }
    channel.close()
}
flowJob.join()

请查看此解决方案(playground link),使用Channel.RENDEZVOUS,您的频道仅在其他元素已被使用时才接受新元素。 这就是为什么我们必须使用send而不是offer的原因,send挂起直到可以发送元素,而offer返回一个布尔值,指示send是否成功。 最后,我们必须close频道,以使join()直到永恒都不会暂停。