问题描述
我创建了一个 observable,其中包含通过运行异步方法将一个项目转换为另一个项目。
IObservable<Summary> obs = scanner.Scans
.SelectMany(b => GetAssignment(b))
.SelectMany(b => VerifyAssignment(b))
.SelectMany(b => ConfirmAssignmentData(b))
.SelectMany(b => UploadAsset(b))
.Select(assignment => new Summary())
.Catch(LogException());
我想让这个防故障,所以如果在处理过程中抛出异常,我应该记录异常,但忽略异常并继续下一次扫描(由{{1}推送的下一个项目})
当前代码捕获任何异常,但一旦抛出异常,序列就会结束。
我怎样才能让它“吞下”异常(记录它),但继续下一个项目?
解决方法
这个问题假定了一个根本性的误解:根据 Observable 契约,行为良好的 observable 在 OnError
通知后终止。对于您的情况,没有“只记录并继续”选项,因为没有什么可继续的。通过 OnError
抛出异常的 observable 已经完成,kaput,finito,一去不复返了。
评论中提到了 Retry
,这可能适用:如果您有一个像这样的可观察管道:
someHotSource
.SelectMany(e => f(e)) //operator1
.SelectMany(e => g(e)) //operator2
.Subscribe(e => {});
然后异常可能发生在操作员之一,杀死管道,但源可能仍然存在。 Retry
然后将尝试重新创建具有相同功能的新管道。
您可以尝试使用 Materialize
和 Dematerialize
来“欺骗”Observable 合约,但您会逆流而上。作弊的技巧是确保管道的任何部分都不会看到“原始”OnError
,因为该运算符将终止。而是 Materialize
将 OnError
变成 Notification
,它不会爆炸。看起来像这样:
鉴于这样的行为良好的管道:
var someHotSource = new Subject<int>();
var f = new Func<int,IObservable<int>>(i => Observable.Return(i));
var g = new Func<int,IObservable<int>>(i =>
{
if(i % 13 == 0)
return Observable.Throw<int>(new Exception());
return Observable.Return(i);
});
var LogException = new Action<Exception>(e => Console.WriteLine("Exception"));
var p1 = someHotSource
.SelectMany(e => f(e)) //operator1
.SelectMany(e => g(e)) //operator2: suspect
.Subscribe(e => Console.WriteLine(e));
...你可以这样作弊:
var p2 = someHotSource
.SelectMany(e => f(e)) //operator1
.SuspectSelectMany(e => g(e),LogException) //operator2: suspect
.Subscribe(e => Console.WriteLine(e));
public static class X
{
public static IObservable<Notification<T>> IgnoreOnCompleted<T>(this IObservable<Notification<T>> source)
{
return source
.SelectMany(n => n.Kind == NotificationKind.OnCompleted
? Observable.Empty<Notification<T>>()
: Observable.Return(n)
);
}
public static IObservable<U> SuspectSelectMany<T,U>(this IObservable<T> source,Func<T,IObservable<U>> selector,Action<Exception> handler)
{
var x = source
.Materialize()
.SelectMany(e => selector(e.Value).Materialize().IgnoreOnCompleted()) //execute suspect selector,turn immediately into notifications
.SelectMany(e =>
{
if (e.Kind == NotificationKind.OnError)
{
handler(e.Exception);
return Observable.Empty<Notification<U>>();
}
else
return Observable.Return(e);
}) //error logging/suppression
.Dematerialize();
return x;
}
}
然后给出以下运行程序代码:
someHotSource.OnNext(1);
someHotSource.OnNext(12);
someHotSource.OnNext(13);
someHotSource.OnNext(15);
p1
会爆炸。 p2
将产生以下输出:
1
12
Exception
15
,
Rx 是一种函数式范例,因此使用函数式方法来解决这个问题非常有用。
答案是引入另一个可以处理错误的 monad,例如 Nullable<T>
可以处理具有空值的整数,但在这种情况下,一个类可以表示值或异常。
public class Exceptional
{
public static Exceptional<T> From<T>(T value) => new Exceptional<T>(value);
public static Exceptional<T> From<T>(Exception ex) => new Exceptional<T>(ex);
public static Exceptional<T> From<T>(Func<T> factory) => new Exceptional<T>(factory);
}
public class Exceptional<T>
{
public bool HasException { get; private set; }
public Exception Exception { get; private set; }
public T Value { get; private set; }
public Exceptional(T value)
{
this.HasException = false;
this.Value = value;
}
public Exceptional(Exception exception)
{
this.HasException = true;
this.Exception = exception;
}
public Exceptional(Func<T> factory)
{
try
{
this.Value = factory();
this.HasException = false;
}
catch (Exception ex)
{
this.Exception = ex;
this.HasException = true;
}
}
public override string ToString() =>
this.HasException
? this.Exception.GetType().Name
: (this.Value != null ? this.Value.ToString() : "null");
}
public static class ExceptionalExtensions
{
public static Exceptional<T> ToExceptional<T>(this T value) => Exceptional.From(value);
public static Exceptional<T> ToExceptional<T>(this Func<T> factory) => Exceptional.From(factory);
public static Exceptional<U> Select<T,U>(this Exceptional<T> value,U> m) =>
value.SelectMany(t => Exceptional.From(() => m(t)));
public static Exceptional<U> SelectMany<T,Exceptional<U>> k) =>
value.HasException ? Exceptional.From<U>(value.Exception) : k(value.Value);
public static Exceptional<V> SelectMany<T,U,V>(this Exceptional<T> value,Exceptional<U>> k,V> m) =>
value.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t,u))));
}
那么,让我们从创建一个抛出异常的 Rx 查询开始。
IObservable<int> query =
Observable
.Range(0,10)
.Select(x => 5 - x)
.Select(x => 100 / x)
.Select(x => x + 5);
如果我运行 observable,我会得到这个:
让我们用 Exceptional
转换它,看看它如何让我们在发生错误时继续处理。
IObservable<Exceptional<int>> query =
Observable
.Range(0,10)
.Select(x => x.ToExceptional())
.Select(x => x.Select(y => 5 - y))
.Select(x => x.Select(y => 100 / y))
.Select(x => x.Select(y => y + 5));
现在当我运行它时,我得到了这个:
现在我可以测试每个结果,看看 HasException
是否为 true
并记录每个异常,同时 observable 继续。
最后,通过引入一种进一步的扩展方法,可以很容易地清理查询,使其看起来与原始查询几乎相同。
public static IObservable<Exceptional<U>> Select<T,U>(this IObservable<Exceptional<T>> source,U> m) =>
source.Select(x => x.SelectMany(y => Exceptional.From(() => m(y))));
这将 observables 和 exceptions 结合到一个 Select
运算符中。
现在查询可以是这样的:
IObservable<Exceptional<int>> query =
Observable
.Range(0,10)
.Select(x => x.ToExceptional())
.Select(x => 5 - x)
.Select(x => 100 / x)
.Select(x => x + 5);
我之前得到了相同的结果。
最后,我可以通过添加另外两个扩展方法来使用查询语法:
public static IObservable<Exceptional<U>> SelectMany<T,Exceptional<U>> k) =>
source.Select(t => k(t));
public static IObservable<Exceptional<V>> SelectMany<T,V>(this IObservable<T> source,V> m) =>
source.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t,u))));
这允许:
IObservable<Exceptional<int>> query =
from n in Observable.Range(0,10)
from x in n.ToExceptional()
let a = 5 - x
let b = 100 / a
select b + 5;
同样,我得到了和以前一样的结果。
,您可以在下面使用特定于应用程序的运算符 LogAndIgnoreError
:
/// <summary>Ensures that the source sequence will always complete successfully.
/// In case of failure the error is logged.</summary>
public static IObservable<T> LogAndIgnoreError<T>(this IObservable<T> source)
{
return source.Catch((Exception error) =>
{
// Application-specific logging
Console.WriteLine($"Log - {error.GetType().Name}: {error.Message}");
return Observable.Empty<T>();
});
}
然后,您可以将此运算符附加到您想要忽略其错误的任何序列。
用法示例:
IObservable<Summary> obs = scanner.Scans
.SelectMany(b => GetAssignment(b).LogAndIgnoreError())
.SelectMany(b => VerifyAssignment(b).LogAndIgnoreError())
.SelectMany(b => ConfirmAssignmentData(b).LogAndIgnoreError())
.SelectMany(b => UploadAsset(b).LogAndIgnoreError())
.Select(assignment => new Summary())
.LogAndIgnoreError();