然后和最后使用 Flowable 反应式 x Java

问题描述

尝试使用 Flowable,然后执行,最后使用 RxJava3。

public String post(Publisher<CompletedFileUpload> files) {
    return Flowable.frompublisher(files).doOnNext(file -> {
        MultipartBody requestBody = MultipartBody.builder()
                .addPart("file",file.getFilename(),MediaType.MULTIPART_FORM_DATA_TYPE,file.getBytes())
                .addPart("id","asdasdsds")
                .build();
    }).doOnComplete((value) -> {
        return this.iProduct.post(requestBody);
    });
}

上面的代码错误,但我想要实现的内容在下面的场景中进行了描述

  1. 迭代文件
  2. 向 requestBody 添加 file.getFilename() 和字节
  3. 然后调用返回字符串的 this.iProduct.post(requestBody)
  4. 最后返回字符串值

解决方法

解决这个问题的一种方法是:

  1. 使用 Publisher<CompletedFileUpload> files 运算符收集来自 toList() 的所有排放
  2. 通过使用 map() 运算符循环遍历第 1 步中创建的列表来构建请求。
  3. 发布请求并返回结果字符串(也使用 map() 运算符。

这个脚手架看起来像这样:

public String post(Publisher<CompletedFileUpload> files) {
    final Single<MultipartBody> requestSingle = 
        Flowable.fromPublisher(files)
            .toList()
            .map(list -> {
                final MultipartBody.Builder builder = MultipartBody.Builder();

                for(file : list) {
                    builder.addPart(...)
                }

                return builder.build();
            })
            .map(requestBody -> this.iProduct.post(requestBody));

    return requestSingle.blockingGet();
}

这里有两点值得注意:

  • toList() 运算符将 Flowable 转换为 Single
  • 您的示例混合了异步代码(所有 Rx 内容)和同步代码(post 方法返回 String,而不是延迟操作/值)。 Rx 运算符是从一种反应类型转换为另一种的有用方法,但在您的情况下,您需要一种通过调用这些异步操作并等待结果值来桥接同步世界的方法。这就是最后调用 blockingGet() 的原因。