问题描述
我有以下方法,在其中一个步骤中,我将调用throwACheckedExceptionMethod()方法。
该方法有机会引发一个检查异常。现在,我不想在这里处理它。我想要订阅的方法 处理它。
我该怎么做? Cos现在无法编译cos,我不在这里处理错误。 添加doOnError()或在方法级别抛出错误也无济于事。我似乎在这里缺少有关RX Java的东西。
请给我一些帮助吗?谢谢。
private Observable<Void> myMethod(Observable<KafkaConsumerRecord<String,String>> records) throws Exception /*Doesn't make a diff*/ {
return client.rxGetConnection()
.flatMapCompletable(connection -> connection.rxSetAutoCommit(false)
.toCompletable()
.andThen(records.flatMapSingle(record -> throwACheckedExceptionMethod(connection,record)).toCompletable()) // line in question
.andThen(connection.rxCommit().toCompletable())
.andThen(connection.rxSetAutoCommit(true).toCompletable())
)
.toObservable();
}
private Single<UpdateResult> throwACheckedExceptionMethod(sqlConnection connection,KafkaConsumerRecord<String,String> record) throws Exception {
// some operation which may return a Single.
throw new Exception();
}
private Observable<Void> processtransaction(Observable<KafkaConsumerRecord<String,String>> records) {
return client.rxGetConnection()
.flatMapCompletable(connection -> connection.rxSetAutoCommit(false)
.toCompletable()
.andThen(records.flatMapSingle(record -> {
try {
return throwACheckedExceptionMethod(connection,record);
} catch (Exception throwables) {
throwables.printstacktrace();
}
})
.toCompletable())
.andThen(connection.rxCommit().toCompletable())
.andThen(connection.rxSetAutoCommit(true).toCompletable())
)
.toObservable();
}
解决方法
throwACheckedExceptionMethod
方法不应引发异常,因为引发异常会破坏可观察的链。
相反,它应该返回一个观察到的错误,它们发出 错误Single.error
:
private Single<UpdateResult> throwACheckedExceptionMethod(SQLConnection connection,KafkaConsumerRecord<String,String> record) throws Exception {
// some operation which may return a UpdateResult
return Single.error(new Exception());
}
现在,异常传播到订户的onError
方法。