无法传播反应式方法中的错误

问题描述

我有以下方法,在其中一个步骤中,我将调用throwACheckedExceptionMethod()方法

方法有机会引发一个检查异常。现在,我不想在这里处理它。我想要订阅方法 处理它。

我该怎么做? Cos现在无法编译cos,我不在这里处理错误添加doOnError()或在方法级别抛出错误也无济于事。我似乎在这里缺少有关RX Java的东西。

请给我一些帮助吗?谢谢。

private Observable<Void> myMethod(Observable<KafkaConsumerRecord<String,String>> records) throws Exception /*Doesn't make a diff*/ {
    return client.rxGetConnection()
            .flatMapCompletable(connection -> connection.rxSetAutoCommit(false)
                    .toCompletable()
                    .andThen(records.flatMapSingle(record -> throwACheckedExceptionMethod(connection,record)).toCompletable()) // line in question
                    .andThen(connection.rxCommit().toCompletable())
                    .andThen(connection.rxSetAutoCommit(true).toCompletable())
            )
            .toObservable();
}

private Single<UpdateResult> throwACheckedExceptionMethod(sqlConnection connection,KafkaConsumerRecord<String,String> record) throws Exception {
    // some operation which may return a Single. 
    throw new Exception();
}

以下是一个可能的选择,但我不想像上面提到的那样处理错误

private Observable<Void> processtransaction(Observable<KafkaConsumerRecord<String,String>> records) {

    return client.rxGetConnection()
            .flatMapCompletable(connection -> connection.rxSetAutoCommit(false)
                    .toCompletable()
                    .andThen(records.flatMapSingle(record -> {
                        try {
                            return throwACheckedExceptionMethod(connection,record);
                        } catch (Exception throwables) {
                            throwables.printstacktrace();
                        }
                    })
                            .toCompletable())
                    .andThen(connection.rxCommit().toCompletable())
                    .andThen(connection.rxSetAutoCommit(true).toCompletable())
            )
            .toObservable();
}

解决方法

throwACheckedExceptionMethod方法不应引发异常,因为引发异常会破坏可观察的链。

相反,它应该返回一个观察到的错误,它们发出 错误Single.error

private Single<UpdateResult> throwACheckedExceptionMethod(SQLConnection connection,KafkaConsumerRecord<String,String> record) throws Exception {
    // some operation which may return a UpdateResult 
    return Single.error(new Exception());
}

现在,异常传播到订户的onError方法。