将操作应用于所有先前发出的项目

问题描述

我有一个项目队列,一旦服务器可达,它们就会发送:
val queueHistory: Observable<QueuedItem>

队列项是:
data class QueuedItem(val item: Item,val sent: Boolean = false)

queueHistory 永远不会完成,它只记录项目何时排队等待发送 onNext(QueuedItem(item1,false),然后记录它已发送 onNext(QueuedItem(item1,true)

我想要做的是获取当前有多少未发送项目的数量

我的问题的主要来源是列表没有完成,我最初想使用 collect 但需要一个完整的列表。

我正在尝试使用 scan 和类似
queueHistory.scan { items: ScannedItems,item -> ScannedItems(arrayOf(*items,item),0) }
到目前为止,我可以保留当前遇到的项目列表,但扫描希望所有内容都为相同类型。

我的另一个想法是

queueHistory
            .groupBy { it.item }
            .flatMapSingle { it.toList() }
            .map { it.size % 2 }

但是 toList() 需要一个有限列表。

任何想法将不胜感激!

解决方法

有一个 #scan 重载,它接受一个种子值和一个 lambda,它接受两个参数(prev,current)。 prev 参数与种子类型相同,current 参数与上游类型相同。

示例

class So65587608 {
    @Test
    fun `65587608`() {
        val producer = PublishSubject.create<QueuedItem>()

        val map = producer.scan(mutableListOf<QueuedItem>(),{ list,curr ->
            // when send = true -> remove
            if (curr.sent) {
                list.removeIf { it.item == curr.item }
            } else if (!list.any { it.item == curr.item }) {
                list.add(curr)
            }
            list.toMutableList()
        }).map { list -> list as List<QueuedItem> }

        map.subscribe {
            println(it)
        }

        val test = map.test()

        producer.onNext(QueuedItem("1"))
        producer.onNext(QueuedItem("1",true))
        producer.onNext(QueuedItem("2",true))
        producer.onNext(QueuedItem("3",true))
        producer.onNext(QueuedItem("4"))
        producer.onNext(QueuedItem("5"))
        producer.onNext(QueuedItem("4",true))
    }

    data class QueuedItem(val item: String,val sent: Boolean = false)
}

输出

[] // seed value
[QueuedItem(item=1,sent=false)]
[]
[]
[]
[QueuedItem(item=4,sent=false)]
[QueuedItem(item=4,sent=false),QueuedItem(item=5,sent=false)]
[QueuedItem(item=5,sent=false)]

注意

您必须在每次#scan 迭代时复制一份列表,或者使用持久数据集合中的不可变列表。

此外,这可能不是一个好的实现,因为列表是未绑定的,这可能会占用您所有的内存。如果列表足够大,线性搜索也可能需要一些时间,这可能不太好。人们应该思考如何更好地完成循环。