使用RxJava的多个POST请求

问题描述

我有一个图书清单。

open class Book(
    @PrimaryKey
    var id: String? = null,var title: String? = null,var author: String? = null
): RealmObject()

在进行循环时,我过滤了一些书并使用过滤后的书创建了一个Observable。我将每个可观察对象添加到数组中。

val listInserts = ArrayList<Observable<Book>>()
for loop(..) {
  if (localBook condition) {
    val postObservable = networkApiAdapter.insert(localBook)
    listInserts.add(postObservable)
  }
}

我合并(或合并)可观察对象,希望有一些顺序的POST请求。

        Observable.concat(listInserts)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                { s ->
                    println(s)
                },{ err ->
                    println(err)
                },{ println("onComplete") }
            )

在我的MongoDb + Flask服务器上始终仅收到一个POST请求,我也遇到此错误

com.google.gson.JsonSyntaxException: java.lang.IllegalStateException: Expected BEGIN_OBJECT but was NUMBER at line 1 column 4 path $

networkApiAdapter功能改造

class NetworkAPIAdapter private constructor() {
    fun insert(dto: Book): Observable<Book> {
        println(dto.toString())
        return bookService.insert(dto.title!!,dto.author!!)
    }

    interface BooksService {
        @FormUrlEncoded
        @POST(URL_ORDERS_ALL)
        fun insert(
            @Field("title") title: String,@Field("author") author: String
        ): Observable<Book>
    }
}

任何帮助都将受到欢迎。我不知道如何进行多次请求。我已经尝试了使用zip,repeatUntil,flatMap的许多解决方案,但没有一个奏效。

解决此问题后,我必须删除所有本地书籍并执行GET请求。一切都可以使用RxJava进行。

解决方法

您的BooksService.insert被定义为一个POST请求,该请求期望服务器以Book JSON字符串(即Observable<Book>)的形式响应服务器。我认为这不是您的服务器在POST请求通过后返回的内容。检查您的POST请求返回的内容,我认为它返回的是一些Int(可能是响应状态)而不是Book,这可以解释您的错误:

com.google.gson.JsonSyntaxException: java.lang.IllegalStateException: Expected BEGIN_OBJECT but was NUMBER at line 1 column 4 path $

如果是这种情况,则串联的Observable会在引发错误后立即终止,即在您的第一个POST请求之后立即终止,因此其余请求将不会执行。

如果我的假设正确,请将您的服务更改为:

interface BooksService {
        @FormUrlEncoded
        @POST(URL_ORDERS_ALL)
        fun insert(
            @Field("title") title: String,@Field("author") author: String
        ): Observable<Int>
    }

此外,如果您希望其他请求即使前一个抛出错误也要执行,请执行以下操作:

val postObservable = networkApiAdapter.insert(localBook).onErrorReturn(_ -> SOME_INT_FLAG)

更新

要在所有POST请求发送后执行所需的逻辑,请尝试:

    Observable.concat(listInserts)
        // complete this observable after all POST requests have been sent
        // not that it does not necesarily mean that all responses were 200,you should implement yourself what happens to items that were not successfully `POST`ed
        .take(listInserts.size)
        .doOnComplete { 
            // called when this observable completes,i.e. when all POST requests have been sent
            executeFinalActions()
        }
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(
            { s->
                println(s)
            },{ err ->
                println(err)
            },{ println("onComplete") }
        )

并实现executeFinalActions()函数:

private fun executeFinalActions() {
    realm.executeTransaction(Realm::deleteAll)
    networkApiAdapter.fetchAll()
    .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(
            { books ->
                // todo: store books
            },{ err ->
                println(err)
            }
        )
}

PS:您可能想看看Observable.take()的作用:http://reactivex.io/documentation/operators/take.html