问题描述
我有一种情况,我想为多个设备调用相同的API,并在完成所有请求后显示结果。 我正在使用改造2。 我对RxJava知之甚少。我认为zip运算符将适合于此。如此实现。
ApiInterface
中的API:
@PUT(AppConstants.BASE_URL + AppConstants.PATH_SEParaTOR + "/user/endpoint")
Observable<ResponseBody> updateInfo(@Header("Authorization") String token,@Query("device_id") String deviceid,@Body JsonObject body);
这里是调用API的方法。它在Map中获取设备ID及其主体。此方法为Map中可用的每个设备ID调用API。
public void updateallInfo(final HashMap<String,String> deviceidMap,final ApiResponseListener listener) {
List<Observable<ResponseBody>> requests = new ArrayList<>();
ArrayList<String> reqIdList = new ArrayList<>();
for (Map.Entry<String,String> entry : map.entrySet()) {
String deviceid = entry.getKey();
String jsonBodyStr = entry.getValue();
Gson gson = new Gson();
JsonObject jsonBody = gson.fromJson(jsonBodyStr,JsonObject.class);
reqIdList.add(deviceid);
requests.add(apiInterface.updateSchedules("accesstoken",deviceid,jsonBody));
}
Observable.zip(requests,new Function<Object[],List<ResponseBody>>() {
@Override
public List<ResponseBody> apply(Object[] objects) throws Exception {
Log.e("onSubscribe","apply : " + objects.length);
List<ResponseBody> dataResponses = new ArrayList<>();
for (Object o : objects) {
dataResponses.add((ResponseBody) o);
}
return dataResponses;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<ResponseBody>>() {
@Override
public void accept(List<ResponseBody> responseBodies) throws Exception {
Log.e("onSubscribe","YOUR DATA IS HERE: " + responseBodies.size());
for (int i = 0; i < responseBodies.size(); i++) {
Log.e(TAG,"Response received for " + i + " is : " + responseBodies.get(i).string());
}
}
},new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("onSubscribe","Throwable: " + throwable);
}
});
}
我想获取每个设备ID的响应(成功/失败)。意味着我需要响应以及需要为其调用API的ID。
使用zip运算符,如果任何API失败,则通过accept(Throwable throwable)
方法接收到失败。如果任何API失败,我认为zip运算符不会调用下一个API。
如何获得所有请求的响应(成功或失败)?
还需要一些指示响应的信息来显示结果的请求/设备ID(某些映射)。
还有其他可以代替zip的运算符吗?
有什么建议/帮助吗?
解决方法
我对Java有点生疏,所以我将用Kotlin编写答案,您自己转换它应该不是问题。
创建一个包含ResponseBody
和deviceId
的助手类:
data class IdentifiedResponseBody(
val deviceId: String,val responseBody: ResponseBody?
)
然后:
// change the signature of your requests list to return IdentifiedResponseBody observables
val requests = mutableListOf<Observable<IdentifiedResponseBody>>()
...
// your stated API have updateInfo instead of updateSchedules,but I will assume they have the same signature
requests.add(
apiInterface.updateSchedules("accessToken",deviceId,jsonBody)
.map { responseBody ->
// map the added observable to return IdentifiedResponseBody
IdentifiedResponseBody(deviceId,responseBody)
}
.onErrorReturn { error ->
// return an item here instead of throwing error,so that the other observables will still execute
IdentifiedResponseBody(deviceId,null)
}
)
最后,使用merge
代替zip
:
Observable.merge(requests)
.subscribeOn(Schedulers.io())
// it's a question if you want to observe these on main thread,depends on context of your application
.subscribe(
{ identifiedResponse ->
// here you get both the deviceId and the responseBody
Log.d("RESPNOSE","deviceId=${identifiedResponse.deviceId},body=${identifiedResponse.responseBody}")
if (responseBody == null || responseBody.hasError()) {
// request for this deviceId failed,handle it
}
},{ error ->
Log.e("onSubscribe","Throwable: " + error)
}
)
请参见merge
:http://reactivex.io/documentation/operators/merge.html
请参见zip
:http://reactivex.io/documentation/operators/zip.html
您应该看到巨大的区别:zip
将您的响应合并到由映射功能定义的单个项目(即您的情况下的响应列表)中,而merge
在此位置单独发出所有响应。他们返回的时间。对于此处的zip
,在所有请求完成时(且仅当)返回组合结果;您可能不希望这种行为,就好像单个请求不会返回响应,您根本不会获得任何响应。
更新
等效的Java应该如下,但是请在尝试之前进行修改,因为我不确定是否正确转换了所有内容:
requests.add(
apiInterface.updateSchedules("accessToken",jsonBody)
.map(new Function<ResponseBody,IdentifiedResponseBody>() {
@Override
public IdentifiedResponseBody apply(ResponseBody responseBody) throws Exception {
return new IdentifiedResponseBody(deviceId,responseBody);
}
})
.onErrorReturn(new Function<Throwable,IdentifiedResponseBody>() {
@Override
public IdentifiedResponseBody apply(Throwable throwable) throws Exception {
return new IdentifiedResponseBody(deviceId,null);
}
})
);
Observable.merge(requests)
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<IdentifiedResponseBody>() {
@Override
public void accept(IdentifiedResponseBody identifiedResponseBody) throws Exception {
// same logic as from kotlin part
}
},new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("onSubscribe","Throwable: " + throwable);
}
});
更新2
在您询问的评论中:
有什么方法可以获取所有请求的最终回调 完成
这是使用Observable
而不是Single
/ Completable
的问题,除非您明确关闭通道或抛出错误,否则它不会完成。在理想的情况下,Observable
应该用于连续发出某些数据的流,例如通向Room DB的开放通道,因为并没有告诉DB将更改多少时间。我承认,在您的情况下,似乎很难应用Observable
以外的其他方法。但是,有一种解决方法:
Observable.merge(requests)
// emits only this much items,then close this channel
.take(requests.size.toLong())
// executed when the channel is closed or disposed
.doFinally {
// todo: finalCallback
}
.subscribeOn(Schedulers.io())
.subscribe(...)
代码再次位于Kotlin中,应该不难转换为Java。请检查Observable.take()
的作用:http://reactivex.io/documentation/operators/take.html