Micronaut RxHttpClient 响应处理

问题描述

我编写了以下代码上传文件

@Slf4j
@Singleton
public class UploadService {

    @Inject
    @Client("${upload.url}")
    private RxHttpClient httpClient;

    public Flowable<HttpResponse<UploadFileResponse>> uploadFile(Path localFile) {

        MultipartBody requestBody = MultipartBody.builder()
            .addPart("file",localFile.getFileName().toString(),localFile.toFile())
            .build();

        return httpClient.exchange(
            HttpRequest.POST("/upload",requestBody).contentType(MULTIPART_FORM_DATA_TYPE),UploadFileResponse.class
        );
    }
}

我正在尝试在另一张地图中运行它:

.....
return files
    .filter(file -> file.getLocalFile() != null)
    .parallel(transferParallelism)
    .runOn(Schedulers.io())
    .map(file ->
        file.withUploaded(
           uploadService.uploadFile(file.getLocalFile())
               .doOnError(throwable -> log.error("File " + file.getLocalFile().getFileName().toString() + " is not uploaded"))
               .map(it -> it.getBody(UploadFileResponse.class)
               .map(UploadFileResponse::isuploaded)
               .orElse(false))
               .blockingFirst()
        )
    )
    .sequential().toList()
    .map(....);
.......

如果我的存储不可用,我有以下堆栈跟踪:

io.reactivex.exceptions.UndeliverableException: The exception Could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has Nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.RuntimeException: java.lang.InterruptedException
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
    at io.reactivex.internal.operators.parallel.ParallelJoin$JoinSubscription.onError(ParallelJoin.java:191)
    at io.reactivex.internal.operators.parallel.ParallelJoin$JoinInnerSubscriber.onError(ParallelJoin.java:527)
    at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onError(RxInstrumentedSubscriber.java:66)
    at io.reactivex.internal.operators.parallel.ParallelMap$ParallelMapSubscriber.onError(ParallelMap.java:131)
    at io.reactivex.internal.operators.parallel.ParallelMap$ParallelMapSubscriber.onNext(ParallelMap.java:117)
    at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onNext(RxInstrumentedSubscriber.java:59)
    at io.reactivex.internal.operators.parallel.ParallelRunOn$RunOnSubscriber.run(ParallelRunOn.java:273)
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
    at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
    at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:46)
    at io.reactivex.internal.subscribers.BlockingBaseSubscriber.blockingGet(BlockingBaseSubscriber.java:72)
    at io.reactivex.Flowable.blockingFirst(Flowable.java:5699)
    at com.wefi.commcache.FwCommCacheUpdater.lambda$updateCommCachePartitions$3(FwCommCacheUpdater.java:77)
    at io.reactivex.internal.operators.parallel.ParallelMap$ParallelMapSubscriber.onNext(ParallelMap.java:113)
... 9 more
Caused by: java.lang.InterruptedException
    at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1047)
    at java.base/java.util.concurrent.CountDownLatch.await(CountDownLatch.java:232)
    at io.reactivex.internal.subscribers.BlockingBaseSubscriber.blockingGet(BlockingBaseSubscriber.java:65)
... 12 more

看起来 .doOnError() 不起作用。 我是 RxJava 的新手。有人能解释一下如何处理吗? 我是否正确调用我的代码以获得布尔结果?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)