问题描述
尝试使用 Flowable,然后执行,最后使用 RxJava3。
public String post(Publisher<CompletedFileUpload> files) {
return Flowable.frompublisher(files).doOnNext(file -> {
MultipartBody requestBody = MultipartBody.builder()
.addPart("file",file.getFilename(),MediaType.MULTIPART_FORM_DATA_TYPE,file.getBytes())
.addPart("id","asdasdsds")
.build();
}).doOnComplete((value) -> {
return this.iProduct.post(requestBody);
});
}
上面的代码有错误,但我想要实现的内容在下面的场景中进行了描述
解决方法
解决这个问题的一种方法是:
- 使用
Publisher<CompletedFileUpload> files
运算符收集来自toList()
的所有排放 - 通过使用
map()
运算符循环遍历第 1 步中创建的列表来构建请求。 - 发布请求并返回结果字符串(也使用
map()
运算符。
这个脚手架看起来像这样:
public String post(Publisher<CompletedFileUpload> files) {
final Single<MultipartBody> requestSingle =
Flowable.fromPublisher(files)
.toList()
.map(list -> {
final MultipartBody.Builder builder = MultipartBody.Builder();
for(file : list) {
builder.addPart(...)
}
return builder.build();
})
.map(requestBody -> this.iProduct.post(requestBody));
return requestSingle.blockingGet();
}
这里有两点值得注意:
-
toList()
运算符将Flowable
转换为Single
。 - 您的示例混合了异步代码(所有 Rx 内容)和同步代码(post 方法返回
String
,而不是延迟操作/值)。 Rx 运算符是从一种反应类型转换为另一种的有用方法,但在您的情况下,您需要一种通过调用这些异步操作并等待结果值来桥接同步世界的方法。这就是最后调用blockingGet()
的原因。