问题描述
受.Net TPL的启发,我试图找到一种方法来处理RX管道之外的错误。具体来说,如果发生错误,我希望Observer管道停止运行,并将控制权传递给周围的方法。像这样:
public void testRxJava() {
try {
Observable.range(0,5)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.map(i -> { throw new RuntimeException(); })
.subscribe();
} catch (Exception ex) {
// I was hoping to get here on the main thread,but crashed instead
Log.i("Test","Will never get here");
}
}
这将导致应用程序崩溃,并导致io.reactivex.rxjava3.exceptions.OnErrorNotImplementedException
崩溃,而该catch
子句不会陷入该子句中,而是调用主线程的uncaughtException()
处理程序。
尝试从subscribe()
中的错误处理程序中抛出错误,也再次退回到uncaughtException()
处理程序。
发现有类似的C#问题here。
解决方法
您是否尝试过捕获错误
Observable.range(0,5)
.subscribeOn(Schedulers.newThread())
.doOnError {
//your error caught here
}
.observeOn(AndroidSchedulers.mainThread())
.map({ i -> throw RuntimeException() })
.subscribe()
,
这就是我最终要做的。据我所知,这只是离开ReactiveX管道,然后让周围的代码处理错误。如果有人有更优雅的方式会很高兴:
public void testRxJava() {
try {
// will be null if no error,will hold a Throwable on error
AtomicReference<Throwable> opError = new AtomicReference<>(null);
Observable.range(0,5)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.map(i -> { throw new RuntimeException(); }) // throws
.blockingSubscribe(
result -> Log.i(TAG,"will never happen"),error -> { opError.set(error); } // sets reference to the error without crashing the app
);
// re-throw
if(opError.get() != null) {
throw new Exception(opError.get());
}
} catch (Exception ex) {
Log.e("Test","exception",ex);
}
}