Observable.Interval 同时执行一个方法两次 编辑编辑 2

问题描述

ExecuteSellAsync 方法同时被调用两次,您可以在下面的日志中看到。当我在 Observable.Interval(TimeSpan.FromSeconds(15)) 上放置 15 秒时它工作正常。我怎样才能防止这种情况?也许是锁定之类的东西,或者我不知道。

2021-02-12 19:04:09 [11] DEBUG LiveTradeManager Order ID: 263010769 | Pair: DOGEUSDT | Order side: Sell | Status: New | Price: 0.06783960 | Last filled price: 0.00000000 | Stop price: 0.00000000 | Quantity: 0.00000000 | Quote quantity: 0.00000000 | Commission: 0 
2021-02-12 19:04:09 [11] DEBUG LiveTradeManager Order ID: 263010769 | Pair: DOGEUSDT | Order side: Sell | Status: Filled | Price: 0.06783960 | Last filled price: 0.06784260 | Stop price: 0.00000000 | Quantity: 5420.00000000 | Quote quantity: 367.70689200 | Commission: 0.00201210 BNB
2021-02-12 19:04:09 [11] DEBUG LiveTradeManager Sell order was filled | Close date: 2021/02/12 17:04:09 | Close rate (price): 0.06784260
2021-02-12 19:04:13 [9] INFO  Wallets Wallets synced.
2021-02-12 19:04:14 [10] DEBUG LiveTradeManager Timer triggered. Price: 0.06783910 | Timestamp: 2/12/2021 5:03:00 PM | Close: 0.06790680
2021-02-12 19:04:17 [9] DEBUG BinanceSpotClient Limit sell order has Failed | Error code: -2010 | Error message: Account has insufficient balance for requested action. | Pair: DOGEUSDT | Quantity: 0.00000000 | Price: 0.06782540

_throttlerObservable = Observable.Interval(TimeSpan.FromSeconds(5))
   .SelectMany(_ => Observable.FromAsync(async () =>
   {
       var lastCandle = _candles.Last();

       _logger.Debug($"Timer triggered. Price: {_ticker.LastPrice} | Open time: {lastCandle.Timestamp} | Close: {lastCandle.Close}");

       if (_orderSide == OrderSide.Sell)
       {
           var Trade = _Trades.FirstOrDefault(e => e.Pair.Equals(_TradeOptions.Pair) && e.IsOpen);

           if (Trade.NotNull())
           {
               var shouldSell = _TradingStrategy.ShouldSell(Trade,_ticker.LastPrice,_TradeAdvice);

               if (shouldSell.SellFlag)
               {
                   await ExecuteSellAsync(Trade,lastCandle.Timestamp,shouldSell.SellType).ConfigureAwait(false);
               }
           }
       }
   }))
   .Subscribe();

编辑

我明白问题出在哪里了。 _TradingStrategy.ShouldSell 执行需要几秒钟,并且下一次执行同时开始执行下一次检查。我可以在该逻辑中使用 lock 吗?

这就是解决问题的方法,但我需要锁定整个支票:

bool test = false;
public (bool SellFlag,SellType SellType) ShouldSell(Trade Trade,decimal rate,TradeAdvice TradeAdvice,decimal? low = null,decimal? high = null)
{
    if (!test)
    {
        test = true;

        // my logic is here. It takes a few seconds.

        test = false;
    }

    return (false,SellType.None);
}

编辑 2

可测试的代码Observable.Interval 每秒执行一次,ShouldSellAsync 的逻辑需要 5 秒来执行。一旦 _completed 变为 true,则不再打印该消息。它执行该消息 5 次,而我预计它只执行一次。

using System;
using System.Linq;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace RxNETdispose
{
    class Program
    {
        private static bool _completed = false;

        public static async Task ShouldSellAsync()
        {
            if (!_completed)
            {
                await Task.Delay(5000);

                Console.WriteLine($"{DateTime.UtcNow} - ShouldSell called");

                _completed = true;
            }
        }

        static void Main(string[] args)
        {
            Observable.Interval(TimeSpan.FromSeconds(1))
                .SelectMany(_ => Observable.FromAsync(async () =>
                {
                    await ShouldSellAsync().ConfigureAwait(false);
                }))
            .Subscribe();

            Console.ReadLine();
        }
    }
}

解决方法

SelectMany 确实引入了并发性。我们希望控制这种并发性,因此这里的答案是使用您自己的运算符,以确保对 ExecuteSellAsync 的调用之间存在固定间隔。

幸运的是,有一种漂亮但不明显的方法可以使用 Rx 调度程序来做到这一点。

我们正在寻找的方法是这样的:

public static IDisposable ScheduleAsync(this IScheduler scheduler,TimeSpan dueTime,Func<IScheduler,CancellationToken,Task<IDisposable>> action)

要使用此调用,需要将 Func<IScheduler,Task<IDisposable>> action 定义为递归,以便在对 ExecuteSellAsync 的调用完成后调用自身重新调度。

因此,为了每 2.0 秒执行一次,例如,我们这样做:

Func<IScheduler,Task<IDisposable>> handler = null;
handler = async (s,ct) =>
{
    await ExecuteSellAsync();
    return s.ScheduleAsync(TimeSpan.FromSeconds(2.0),handler);
};

我们可以通过调用它来启动它:

IDisposable subscription = Scheduler.Default.ScheduleAsync(TimeSpan.Zero,handler);

当然,就像所有好的 Rx 操作一样,我们可以调用 subscription.Dispose() 来停止它运行。

这是一个完整的例子:

async Task Main()
{
    Func<IScheduler,Task<IDisposable>> handler = null;
    handler = async (s,ct) =>
    {
        await ExecuteSellAsync();
        return s.ScheduleAsync(TimeSpan.FromSeconds(2.0),handler);
    };
    
    IDisposable subscription = Scheduler.Default.ScheduleAsync(TimeSpan.Zero,handler);

    await Task.Delay(TimeSpan.FromSeconds(9.0));
    
    subscription.Dispose();
 }

private DateTime then = DateTime.Now;
private int __counter = 0;

async Task ExecuteSellAsync()
{
    var counter = Interlocked.Increment(ref __counter);
    Console.WriteLine($"ExecuteSellAsync() Start {counter} - {DateTime.Now.Subtract(then).TotalSeconds}");
    await Task.Delay(TimeSpan.FromSeconds(2.0));
    Console.WriteLine($"ExecuteSellAsync() End {counter} - {DateTime.Now.Subtract(then).TotalSeconds}");
}

当我运行这个时,我得到这个输出:

ExecuteSellAsync() Start 1 - 0.0019952
ExecuteSellAsync() End 1 - 2.0095866
ExecuteSellAsync() Start 2 - 4.0185182
ExecuteSellAsync() End 2 - 6.0199157
ExecuteSellAsync() Start 3 - 8.0303588
ExecuteSellAsync() End 3 - 10.0417079

请注意,ExecuteSellAsync() 不会协同取消,因此它会一直运行到完成。不难把它改成async Task ExecuteSellAsync(CancellationToken ct),让它合作取消。

现在,这可以扩展为一个很好的 observable。

试试这个:

IObservable<Unit> query =
    Observable.Create<Unit>(o =>
    {
        Func<IScheduler,Task<IDisposable>> handler = null;
        handler = async (s,ct) =>
        {
            if (ct.IsCancellationRequested)
            {
                o.OnCompleted();
            }
            else
            {
                await ExecuteSellAsync();
                o.OnNext(Unit.Default);
            }
            return s.ScheduleAsync(TimeSpan.FromSeconds(2.0),handler);
        };
    
        return Scheduler.Default.ScheduleAsync(TimeSpan.Zero,handler);
    });

IDisposable subscription = query.Take(3).Subscribe(x => Console.WriteLine("U"),() => Console.WriteLine("C"));

await Task.Delay(TimeSpan.FromSeconds(11.0));

subscription.Dispose();

输出如下:

ExecuteSellAsync() Start 1 - 0.0009972
ExecuteSellAsync() End 1 - 2.0115375
U
ExecuteSellAsync() Start 2 - 4.0128375
ExecuteSellAsync() End 2 - 6.0282818
U
ExecuteSellAsync() Start 3 - 8.0370135
ExecuteSellAsync() End 3 - 10.0521106
U
C

注意它完成了。如果您在 subscription.Dispose(); 自然完成之前调用它,那么它会正常运行并且不会发出 OnComplete 通知。

让我们用一组很好的扩展方法来包装它:

public static class ObservableEx
{
    public static IObservable<Unit> IntervalAsync(TimeSpan period,Func<Task> actionAsync,IScheduler scheduler) =>
        TimerAsync(period,period,actionAsync,scheduler);

    public static IObservable<T> IntervalAsync<T>(TimeSpan period,Func<Task<T>> functionAsync,functionAsync,scheduler);

    public static IObservable<Unit> TimerAsync(TimeSpan dueTime,TimeSpan period,IScheduler scheduler) =>
        Observable.Create<Unit>(o =>
        {
            Func<IScheduler,Task<IDisposable>> handler = null;
            handler = async (s,ct) =>
            {
                if (ct.IsCancellationRequested)
                {
                    o.OnCompleted();
                }
                else
                {
                    await actionAsync();
                    o.OnNext(Unit.Default);
                }
                return s.ScheduleAsync(period,handler);
            };
            return scheduler.ScheduleAsync(dueTime,handler);
        });

    public static IObservable<T> TimerAsync<T>(TimeSpan dueTime,IScheduler scheduler) =>
        Observable.Create<T>(o =>
        {
            Func<IScheduler,ct) =>
            {
                if (ct.IsCancellationRequested)
                {
                    o.OnCompleted();
                }
                else
                {
                    o.OnNext(await functionAsync());
                }
                return s.ScheduleAsync(period,handler);
        });
}

现在,显然有一堆我没有写的重载 - 使用默认调度程序的重载和允许协作取消的重载 - 但我希望你能明白。

现在有了这些扩展方法,我可以做到这一点:

IDisposable subscription =
    ObservableEx
        .IntervalAsync(TimeSpan.FromSeconds(2.0),() => ExecuteSellAsync(),Scheduler.Default)
        .Take(3)
        .Subscribe(x => Console.WriteLine("U"),() => Console.WriteLine("C"));

我得到和以前一样的输出。

我还没有完全测试扩展方法。他们可能需要更多的爱和关注。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...