如何根据多个RxJava可完成结果的结果执行操作

问题描述

已经花了我一段时间了,我在管理必须在Kotlin中使用Rx的需求时迷失了。

让我解释一下。

有一组ID,其等价项需要根据服务器的成功从服务器中删除,并最终在本地删除

基本上

  1. 进行网络通话以删除单个id(受支持的网络通话返回Completable
  2. 如果收到complete(成功)回调,则将id存储在list(内存)中
  3. 对所有id进行删除,请执行第一步和第二步
  4. 每个网络呼叫完成后,将列表传递出去,以从本地数据库删除

因此这些功能可用,无法修改

  1. fun deleteId(id: String): Completable { networkCall.deleteId(id) }
  2. fun deleteIds(ids: List<String>): Unit { localDb.deleteId(ids) }

这是我尝试过的方法,但显然不完整且被卡住...

val deleted = copyOnWriteArrayList<String>()
val error = copyOnWriteArrayList<String>()
items?.filter { it.isChecked }
    ?.map { Pair(it.id,dataManager.deleteId(it.id)) }
    ?.forEach { (Id,deleteOp) ->
        deleteOp.subscribeOn(Schedulers.io())
                .subscribe(object: CompletableObserver {
                    override fun onComplete() { deleted.add(Id) }

                    override fun onSubscribe(d: disposable) { disposableManager += d }

                    override fun onError(e: Throwable) { error.add(Id) }

                })
    }

所以现在这里存在多个问题,其中之一是要求我无法找到一个地方来知道所有请求均已完成,从而启动localDb删除

是否有一种方法可以按照上述命令链的方式使用Flowable.fromIterable()zipmerge来实现上述方案?

解决方法

如果我正确理解了您的用例,则应该这样做:

// ids of items to delete,for illustration lets have some temp set
val ids = setOf<String>("1","2","3","4")
val deleteIdSingles = mutableListOf<Single<String>>()
ids.forEach { id ->
    deleteIdSingles.add(
        api.deleteId(id)
            // when request successfully completes,return its id wrapped in a Single,instead of Completable
            .toSingle<String> { id }
            // return a flag when this request fails,so that the stream is not closed and other requests would still be executed
            .onErrorReturn { "FAILED" }
    )
}

Single.merge(deleteIdSingles)
    // collect the results of the singles (i.e. the ids of successful deletes),and emit a set of those ids once all the singles has completed
    .collect(
        { mutableListOf() },{ deletedIds: MutableList<String>,id: String -> if (id != "FAILED") deletedIds.add(id) }
    )
    .observeOn(Schedulers.io())
    .subscribe(
        { deletedIds ->
                db.deleteIds(deletedIds)
        },{ error ->
            // todo: onError
        })