可观察管道中的异常处理

问题描述

我创建了一个 observable,其中包含通过运行异步方法一个项目转换为另一个项目。

IObservable<Summary> obs = scanner.Scans
                    .SelectMany(b => GetAssignment(b))
                    .SelectMany(b => VerifyAssignment(b))
                    .SelectMany(b => ConfirmAssignmentData(b))
                    .SelectMany(b => UploadAsset(b))
                    .Select(assignment => new Summary())
                    .Catch(LogException());

我想让这个防故障,所以如果在处理过程中抛出异常,我应该记录异常,但忽略异常并继续下一次扫描(由{{1}推送的下一个项目})

当前代码捕获任何异常,但一旦抛出异常,序列就会结束。

我怎样才能让它“吞下”异常(记录它),但继续下一个项目?

解决方法

这个问题假定了一个根本性的误解:根据 Observable 契约,行为良好的 observable 在 OnError 通知后终止。对于您的情况,没有“只记录并继续”选项,因为没有什么可继续的。通过 OnError 抛出异常的 observable 已经完成,kaput,finito,一去不复返了。

评论中提到了 Retry,这可能适用:如果您有一个像这样的可观察管道:

someHotSource
    .SelectMany(e => f(e)) //operator1
    .SelectMany(e => g(e)) //operator2
    .Subscribe(e => {});

然后异常可能发生在操作员之一,杀死管道,但源可能仍然存在。 Retry 然后将尝试重新创建具有相同功能的新管道。

您可以尝试使用 MaterializeDematerialize 来“欺骗”Observable 合约,但您会逆流而上。作弊的技巧是确保管道的任何部分都不会看到“原始”OnError,因为该运算符将终止。而是 MaterializeOnError 变成 Notification,它不会爆炸。看起来像这样:

鉴于这样的行为良好的管道:

var someHotSource = new Subject<int>();
var f = new Func<int,IObservable<int>>(i => Observable.Return(i));
var g = new Func<int,IObservable<int>>(i =>
{
    if(i % 13 == 0)
        return Observable.Throw<int>(new Exception());
    return Observable.Return(i);
});

var LogException = new Action<Exception>(e => Console.WriteLine("Exception"));
var p1 = someHotSource
    .SelectMany(e => f(e)) //operator1
    .SelectMany(e => g(e)) //operator2: suspect
    .Subscribe(e => Console.WriteLine(e));

...你可以这样作弊:

var p2 = someHotSource
    .SelectMany(e => f(e)) //operator1
    .SuspectSelectMany(e => g(e),LogException) //operator2: suspect
    .Subscribe(e => Console.WriteLine(e));

public static class X
{
    public static IObservable<Notification<T>> IgnoreOnCompleted<T>(this IObservable<Notification<T>> source)
    {
        return source
            .SelectMany(n => n.Kind == NotificationKind.OnCompleted
                ? Observable.Empty<Notification<T>>()
                : Observable.Return(n)
            );
    }
    
    public static IObservable<U> SuspectSelectMany<T,U>(this IObservable<T> source,Func<T,IObservable<U>> selector,Action<Exception> handler)
    {
        var x = source
            .Materialize()
            .SelectMany(e => selector(e.Value).Materialize().IgnoreOnCompleted()) //execute suspect selector,turn immediately into notifications
            .SelectMany(e =>
            {
                if (e.Kind == NotificationKind.OnError)
                {
                    handler(e.Exception);
                    return Observable.Empty<Notification<U>>();
                }
                else
                    return Observable.Return(e);
            }) //error logging/suppression
            .Dematerialize();
        return x;
    }
}

然后给出以下运行程序代码:

someHotSource.OnNext(1);
someHotSource.OnNext(12);
someHotSource.OnNext(13);
someHotSource.OnNext(15);

p1 会爆炸。 p2 将产生以下输出:

1
12
Exception
15
,

Rx 是一种函数式范例,因此使用函数式方法来解决这个问题非常有用。

答案是引入另一个可以处理错误的 monad,例如 Nullable<T> 可以处理具有空值的整数,但在这种情况下,一个类可以表示值或异常。

public class Exceptional
{
    public static Exceptional<T> From<T>(T value) => new Exceptional<T>(value);
    public static Exceptional<T> From<T>(Exception ex) => new Exceptional<T>(ex);
    public static Exceptional<T> From<T>(Func<T> factory) => new Exceptional<T>(factory);
}

public class Exceptional<T>
{
    public bool HasException { get; private set; }
    public Exception Exception { get; private set; }
    public T Value { get; private set; }

    public Exceptional(T value)
    {
        this.HasException = false;
        this.Value = value;
    }

    public Exceptional(Exception exception)
    {
        this.HasException = true;
        this.Exception = exception;
    }

    public Exceptional(Func<T> factory)
    {
        try
        {
            this.Value = factory();
            this.HasException = false;
        }
        catch (Exception ex)
        {
            this.Exception = ex;
            this.HasException = true;
        }
    }

    public override string ToString() =>
        this.HasException
            ? this.Exception.GetType().Name
            : (this.Value != null ? this.Value.ToString() : "null");
}


public static class ExceptionalExtensions
{
    public static Exceptional<T> ToExceptional<T>(this T value) => Exceptional.From(value);

    public static Exceptional<T> ToExceptional<T>(this Func<T> factory) => Exceptional.From(factory);

    public static Exceptional<U> Select<T,U>(this Exceptional<T> value,U> m) =>
        value.SelectMany(t => Exceptional.From(() => m(t)));

    public static Exceptional<U> SelectMany<T,Exceptional<U>> k) =>
        value.HasException ? Exceptional.From<U>(value.Exception) : k(value.Value);

    public static Exceptional<V> SelectMany<T,U,V>(this Exceptional<T> value,Exceptional<U>> k,V> m) =>
        value.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t,u))));
}

那么,让我们从创建一个抛出异常的 Rx 查询开始。

IObservable<int> query =
    Observable
        .Range(0,10)
        .Select(x => 5 - x)
        .Select(x => 100 / x)
        .Select(x => x + 5);

如果我运行 observable,我会得到这个:

Normal Query

让我们用 Exceptional 转换它,看看它如何让我们在发生错误时继续处理。

IObservable<Exceptional<int>> query =
    Observable
        .Range(0,10)
        .Select(x => x.ToExceptional())
        .Select(x => x.Select(y => 5 - y))
        .Select(x => x.Select(y => 100 / y))
        .Select(x => x.Select(y => y + 5));

现在当我运行它时,我得到了这个:

Query with Exceptional

现在我可以测试每个结果,看看 HasException 是否为 true 并记录每个异常,同时 observable 继续。

最后,通过引入一种进一步的扩展方法,可以很容易地清理查询,使其看起来与原始查询几乎相同。

    public static IObservable<Exceptional<U>> Select<T,U>(this IObservable<Exceptional<T>> source,U> m) =>
        source.Select(x => x.SelectMany(y => Exceptional.From(() => m(y))));

这将 observables 和 exceptions 结合到一个 Select 运算符中。

现在查询可以是这样的:

IObservable<Exceptional<int>> query =
    Observable
        .Range(0,10)
        .Select(x => x.ToExceptional())
        .Select(x => 5 - x)
        .Select(x => 100 / x)
        .Select(x => x + 5);

我之前得到了相同的结果。


最后,我可以通过添加另外两个扩展方法来使用查询语法:

public static IObservable<Exceptional<U>> SelectMany<T,Exceptional<U>> k) =>
    source.Select(t => k(t));

public static IObservable<Exceptional<V>> SelectMany<T,V>(this IObservable<T> source,V> m) =>
    source.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t,u))));

这允许:

IObservable<Exceptional<int>> query =
    from n in Observable.Range(0,10)
    from x in n.ToExceptional()
    let a = 5 - x
    let b = 100 / a
    select b + 5;

同样,我得到了和以前一样的结果。

,

您可以在下面使用特定于应用程序的运算符 LogAndIgnoreError

/// <summary>Ensures that the source sequence will always complete successfully.
/// In case of failure the error is logged.</summary>
public static IObservable<T> LogAndIgnoreError<T>(this IObservable<T> source)
{
    return source.Catch((Exception error) =>
    {
        // Application-specific logging
        Console.WriteLine($"Log - {error.GetType().Name}: {error.Message}");
        return Observable.Empty<T>();
    });
}

然后,您可以将此运算符附加到您想要忽略其错误的任何序列。

用法示例:

IObservable<Summary> obs = scanner.Scans
    .SelectMany(b => GetAssignment(b).LogAndIgnoreError())
    .SelectMany(b => VerifyAssignment(b).LogAndIgnoreError())
    .SelectMany(b => ConfirmAssignmentData(b).LogAndIgnoreError())
    .SelectMany(b => UploadAsset(b).LogAndIgnoreError())
    .Select(assignment => new Summary())
    .LogAndIgnoreError();

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...