可观察的计时器处理 第一种情况第二种情况编辑第一种情况第二种情况结论

问题描述

我正在使用 Reactive .NET 扩展,我想知道它的处理方式。我知道在某些情况下最好这样处理它:.takeuntil(Observable.Timer(TimeSpan.FromMinutes(x)))。我

第一种情况

在这种情况下,我有一个计时器,它在 x 秒后触发,然后它完成并应该被处理。

public void ScheduleOrderCancellationIfNotFilled(string pair,long orderId,int waitSecondsBeforeCancel)
{
    Observable.Timer(TimeSpan.FromSeconds(waitSecondsBeforeCancel))
        .Do(e =>
        {
            var result = _client.Spot.Order.Getorder(pair,orderId);

            if (result.Success)
            {
                if (result.Data?.Status != OrderStatus.Filled)
                {
                    _client.Spot.Order.CancelOrder(pair,orderId);
                }
            }
        })
        .Subscribe();
}

第二种情况

在这种情况下,计时器在第一秒运行,然后每 29 分钟重复一次。这应该一直存在,直到它的定义类被处理。我相信这个应该与 Idisposable 实现一起处理。怎么样?

var keepAliveListenKey = Observable.Timer(TimeSpan.FromSeconds(1),TimeSpan.FromMinutes(29))
    .Do(async e =>
    {
        await KeepAliveListenKeyAsync().ConfigureAwait(false);
    })
    .Subscribe();

编辑

我还希望它使用 Subject<T>,这样可以更轻松地处理和重置订阅

例如。 Reset and Dispose observable subscriber,Reactive Extensions(@Enigmativity)

public class UploadDicomSet : ImportBaseSet
{
    Idisposable subscription;
    Subject<IObservable<long>> subject = new Subject<IObservable<long>>();

    public UploadDicomSet()
    {
        subscription = subject.Switch().Subscribe(s => CheckUploadSetList(s));
        subject.OnNext(Observable.Interval(TimeSpan.FromMinutes(2)));
    }

    void CheckUploadSetList(long interval)
    {
        subject.OnNext(Observable.Never<long>());
        // Do other things
    }

    public void AddDicomFile(SharedLib.DicomFile dicomFile)
    {
        subject.OnNext(Observable.Interval(TimeSpan.FromMinutes(2)));
        // Reset the subscription to go off in 2 minutes from Now
        // Do other things
    }
}

解决方法

在第一种情况下,它会被自动处理。实际上,这是实现自动订阅管理的常用方法,而且绝对是处理 rx 的好方法。

在第二种情况下,您过度设计了。 Observable.Timer(TimeSpan.FromSeconds(1),TimeSpan.FromSeconds(1)) 本身足以随时间生成一系列递增的 long。由于此流本质上是无穷无尽的,您是对的 - 需要显式订阅管理。所以它就足够了:

var sub = Observable.Timer(TimeSpan.FromSeconds(1),TimeSpan.FromSeconds(1)).Subscribe()

...然后sub.Dispose()它。

附言请注意,在您的代码中,您 .Do async/await。很可能那不是你想要的。您希望 SelectMany 确保正确等待 async 操作并处理异常。


在评论部分回答您的问题:

如果使用 Subject 来处理呢?

嗯,没什么特别的。 IObserver<>IObservable<> 都是由此类实现的,因此它类似于经典的 .NET 事件(要对某个事件调用的回调列表)。就您的问题和用例而言,它在任何意义上都没有区别。

你能举一个关于 .Do 与异常处理的例子吗?

当然。这个想法是你想将你的 async/await 封装成一些 Task<T>IObservable<T> 这样既保留取消信号又保留错误信号。为此,必须使用 .SelectMany 方法(就像 LINQ 中的 SelectMany,同样的想法)。所以只需将您的 .Do 更改为 .SelectMany

Observable
    .Timer(TimeSpan.FromSeconds(1),TimeSpan.FromSeconds(1))
    .SelectMany(_ => Observable.FromAsync(() => /* that's the point where your Task<> becomes Observable */ myTask))

我又糊涂了。我需要 IObservable (Select) 还是 IObservable (SelectMany)

很可能,您不需要开关。为什么?因为它的创建主要是为了避免 IO 竞争条件,这样每当发出新事件时,当前事件(可能由于自然并行或异步工作流而正在进行)保证被取消(即取消订阅)。否则竞争条件会(并且将会)损害您的状态。

相反,SelectMany 将确保所有这些都按顺序发生,它们确实以某种总顺序发生。什么都不会被取消。您将完成(等待,如果您愿意)当前回调,然后触发下一个。当然,这种行为可以通过适当的 IScheduler 来改变,但那是另外一回事了。

,

Reactive Observable Subscription Disposal (@Enigmativity)

Subscribe 扩展方法返回的 Disposable 仅返回以允许您在 observable 自然结束之前手动取消订阅 observable。

如果 observable 完成 - 带有 OnCompleted 或 OnError - 那么订阅已经为你处理了。

需要注意的一件重要事情:垃圾收集器永远不会对 observable 订阅调用 .Dispose(),因此,如果您的订阅在您的订阅超出范围之前没有(或可能没有)自然结束,您必须处理它们。

第一种情况

在第一种情况下,看起来我不需要手动 .Dispose() 订阅,因为它自然结束。

Dispose 在最后被触发。

var xs = Observable.Create<long>(o =>
{
    var d = Observable.Timer(TimeSpan.FromSeconds(5))
        .Do(e =>
        {
            Console.WriteLine("5 seconds elapsed.");
        })
        .Subscribe(o);

    return Disposable.Create(() =>
    {
        Console.WriteLine("Disposed!");
        d.Dispose();
    });
});

var subscription = xs.Subscribe(x => Console.WriteLine(x));

第二种情况

但在第二种情况下,如果它没有“自然地”结束,我应该处理它。

除非手动处理,否则不会触发处理。

var xs = Observable.Create<long>(o =>
{
    var d = Observable.Timer(TimeSpan.FromSeconds(1),TimeSpan.FromSeconds(1))
        .Do(e =>
        {
            Console.WriteLine("Test.");
        })
        .Subscribe(o);

    return Disposable.Create(() =>
    {
        Console.WriteLine("Disposed!");
        d.Dispose();
    });
});

var subscription = xs.Subscribe(x => Console.WriteLine(x));

结论

他举了这么好的例子,如果你问自己同样的问题,值得一看。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...