TPL 数据流使用旧数据而不是最新数据

问题描述

我使用 TPL 数据流是为了一次为每个交易品种执行一项任务。前两个 Operation taking... 消息是正确的,但接下来的消息使用的是旧数据。换句话说,它使用下面截图中标有绿色的旧数据,而不是最新的数据(蓝色标的)。

enter image description here

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using system.threading.channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Binance.Net;
using Binance.Net.Enums;
using Binance.Net.Interfaces;
using Binance.Net.Objects.Spot;
using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Logging;

namespace SubscribetoCandlesEventFixTest
{
    public class BinanceSocketHandler
    {
        private readonly IBinanceClient _client;
        private readonly IBinanceSocketClient _socketClient;

        public BinanceSocketHandler()
        {
            _client = new BinanceClient(new BinanceClientOptions
            {
                ApiCredentials = new ApiCredentials("not required","not required"),AutoTimestamp = true,AutoTimestampRecalculationInterval = TimeSpan.FromMinutes(30),#if DEBUG
                LogVerbosity = LogVerbosity.Debug
#endif
            });

            _socketClient = new BinanceSocketClient(new BinanceSocketClientOptions
            {
                ApiCredentials = new ApiCredentials("not required",AutoReconnect = true,ReconnectInterval = TimeSpan.FromSeconds(15),#if DEBUG
                LogVerbosity = LogVerbosity.Debug
#endif
            });
        }

        private Dictionary<string,ActionBlock<IBinanceStreamKlineData>> _ab = new();

        public async Task StartAsync(CancellationToken cancellationToken)
        {
            var symbols = new[] { "TRXUSDT","BTCUSDT" };
            var interval = KlineInterval.OneMinute;
            
            foreach (var symbol in symbols)
            {
                _ab[symbol] = new ActionBlock<IBinanceStreamKlineData>(
                    async data =>
                    {
                        Console.WriteLine($"Operation taking 10 seconds to execute... | Symbol: {data.Symbol} | Timestamp: {data.Data.OpenTime} | Price: {data.Data.Close}");

                        await Task.Delay(10000,cancellationToken).ConfigureAwait(false);
                    },new ExecutionDataflowBlockOptions
                    {
                        MaxDegreeOfParallelism = 1
                    });

                await _socketClient.Spot.SubscribetoKlineUpdatesAsync(symbol,interval,async data =>
                    {
                        if (data.Data.Final)
                        {
                            Console.WriteLine(
                                $"[{DateTime.UtcNow}] [{data.Symbol}] New final candle | Timestamp: {data.Data.OpenTime} | Price: {data.Data.Close}");
                        }
                        else
                        {
                            Console.WriteLine(
                                $"[{DateTime.UtcNow}] [{data.Symbol}] Candle update | Timestamp: {data.Data.OpenTime} | Price: {data.Data.Close}");

                            // Todo: Use the most up-to-date value
                            await _ab[symbol].SendAsync(data,cancellationToken).ConfigureAwait(false);
                        }
                    }).ConfigureAwait(false);
            }
        }

        public async Task StopAsync()
        {
            foreach (var symbol in _ab.Keys)
            {
                _ab[symbol].Complete();
                await _ab[symbol].Completion.ConfigureAwait(false);
            }
        }
    }

    class Program
    {
        static async Task Main(string[] args)
        {
            var test = new BinanceSocketHandler();
            await test.StartAsync(new CancellationToken()).ConfigureAwait(false);

            Console.ReadLine();
        }
    }
}

解决方法

TPL Dataflow 将按顺序处理所有项目;这就是它要做的。您可以尝试使用 BroadcastBlock 来执行最新的方法,但是由于该块链接到另一个块,您可能最终会得到一个正在处理的,一个等待处理的,以及第三个是实际被覆盖的那个。

如果您希望它比这更严格(即,一个正在处理中,一个等待也被覆盖),那么我建议使用 Channels。具体来说,使用 BoundedChannelFullMode.DropOldest 的有界通道。