问题描述
我正在尝试将Observable.Using
与Observable.Retry
组合
我可以使用Retry
处理第一种错误。
但是我该如何处理瞬态错误?
下面是一个充满希望的示例,希望可以说明问题。
如果可能,该解决方案应允许我以“面向铁路”的相同方式编写代码。
该解决方案应适用于冷热观测。
var subscription = Observable
.Using(() => new Foo(),foo => foo.Observe())
.Select(i => ThrowIf(i,2)) // Fatal error
.Retry() // Foo should be disposed (works)
.Select(i => ThrowIf(i,5)) // transient error
.Retry() // Foo should NOT be disposed here
.Subscribe(Console.WriteLine);
Console.ReadLine();
subscription.dispose();
class Foo : Idisposable
{
public Foo() => Console.WriteLine(nameof(Foo) + " created");
public void dispose() => Console.WriteLine(nameof(Foo) + " disposed");
public IObservable<Int32> Observe()
{
return Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(_ => DateTime.Now.Second % 10);
}
}
Int32 ThrowIf(Int32 i,Int32 n)
{
if (i == n)
{
Console.WriteLine("!" + i);
throw new Exception();
}
return i;
}
样本输出:
1
!2
Foo disposed
Foo created
3
4
!5
Foo disposed <=
Foo created <= this should not happen
6
7
编辑,进行澄清:
错误源的顺序很重要:
致命,然后是瞬时。
在真实代码中,Idisposable
是UdpClient
,
致命错误源是UdpClient.ReceiveAsync()
和
而 transient 是解析接收到的数据的函数。
解决方法
我认为不可能使用一条铁路来解决这个问题。如果可以添加分支,可以使用以下解决方案:
var observable = Observable
.Using(() => new Foo(),foo => foo.Observe()
.Select(i => ThrowIf(i,5))
.Retry() // Foo should NOT be disposed
)
.Select(i => ThrowIf(i,3))
.Retry(); // Foo should be disposed
Using
内部的分支处理瞬态错误,而外部(主要)铁路则处理致命错误。