使用Observable.Using和Observable.Retry处理错误

问题描述

我正在尝试将Observable.UsingObservable.Retry组合

我有两种可能发生的错误

  1. 致命错误,需要处置并重新创建基础Idisposable
  2. 暂时性错误,不需要处理Idisposable

我可以使用Retry处理第一种错误
但是我该如何处理瞬态错误

下面是一个充满希望的示例,希望可以说明问题。
如果可能,该解决方案应允许我以“面向铁路”的相同方式编写代码
解决方案应适用于冷热观测。

var subscription = Observable
                  .Using(() => new Foo(),foo => foo.Observe())
                  .Select(i => ThrowIf(i,2))  // Fatal error
                  .Retry()                     // Foo should be disposed (works)
                  .Select(i => ThrowIf(i,5))  // transient error
                  .Retry()                     // Foo should NOT be disposed here   
                  .Subscribe(Console.WriteLine);

Console.ReadLine();
subscription.dispose();


class Foo : Idisposable
{
    public Foo()          => Console.WriteLine(nameof(Foo) + " created");
    public void dispose() => Console.WriteLine(nameof(Foo) + " disposed");

    public IObservable<Int32> Observe()
    {
        return Observable
              .Interval(TimeSpan.FromSeconds(1))
              .Select(_ => DateTime.Now.Second % 10);
    }
}


Int32 ThrowIf(Int32 i,Int32 n)
{
    if (i == n)
    {
        Console.WriteLine("!" + i);
        throw new Exception();
    }

    return i;
}

样本输出

1
!2
Foo disposed
Foo created
3
4
!5
Foo disposed     <= 
Foo created      <= this should not happen
6
7

编辑,进行澄清:

错误源的顺序很重要:
致命,然后是瞬时

在真实代码中,IdisposableUdpClient
致命错误源是UdpClient.ReceiveAsync()
transient 是解析接收到的数据的函数

解决方法

我认为不可能使用一条铁路来解决这个问题。如果可以添加分支,可以使用以下解决方案:

var observable = Observable
    .Using(() => new Foo(),foo => foo.Observe()
        .Select(i => ThrowIf(i,5))
        .Retry() // Foo should NOT be disposed
    )
    .Select(i => ThrowIf(i,3))
    .Retry(); // Foo should be disposed

Using内部的分支处理瞬态错误,而外部(主要)铁路则处理致命错误。