使用 System.Reactive 的扩展对代码进行多线程处理

问题描述

以下代码从 Binance 下载从开始日期到结束日期的历史 OHLCV 数据。由于 Binance 允许我们一次仅下载 1000 根蜡烛,因此我按照DownloadAsync 的方式进行了操作。对代码的任何建议也表示赞赏。

实际问题是关于使 DownloadAsync 多线程化,以加快进程,因为想象以 5m 的间隔下载 2018 年到 2021 年的蜡烛图。我更喜欢使用 System.Reactive,但我想其他解决方案也是受欢迎的,因为很难将代码表示为多线程版本。

下面的代码可以测试。

using Binance.Net;
using Binance.Net.Enums;
using Binance.Net.Interfaces;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using System.Linq;
using System.Text.RegularExpressions;
using System.Reactive.Linq;
using System.Threading;

namespace DownloadCandleDataTest
{
    public class DataProvider
    {
        private Exchange _exchange;

        public DataProvider(Exchange exchange)
        {
            _exchange = exchange;
        }

        public async Task<List<OHLCV>> DownloadAsync(string pair,KlineInterval interval,DateTime startDate,DateTime endDate,int startupCandleCount)
        {
            DateTime start = startDate;
            DateTime end = endDate;

            var tempStartDate = start;
            var tempEndDate = end;
            var tempList = new List<OHLCV>();

            for (int i = 0; tempStartDate < tempEndDate; i++)
            {
                var candles = await _exchange.GetCandlesAsync(pair,interval,tempStartDate,tempEndDate,100).ConfigureAwait(false);

                if (candles.Count > 0)
                {
                    // Remove the first candle when i > 0,to prevent duplicates
                    if (i > 0)
                    {
                        candles.RemoveAt(0);
                    }

                    var first = candles.First();
                    var last = candles.Last();
                    Console.WriteLine($"First: {first.Timestamp} | Last: {last.Timestamp}");

                    tempList.AddRange(candles);

                    tempStartDate = last.Timestamp;
                }
            }

            // Find duplicates
            var groups = tempList.GroupBy(g => g.Timestamp).Where(e => e.Skip(1).Any());

            foreach (var group in groups)
            {
                Console.WriteLine(group.Key);
                foreach (var ohclv in group)
                {
                    Console.WriteLine("\t" + ohclv.Timestamp);
                }
            }


            return null;
        }
    }

    class Program
    {
        public static void StartBackgroundWork()
        {
            Console.WriteLine("Shows use of Start to start on a background thread:");
            var o = Observable.Start(() =>
            {
                //This starts on a background thread.
                Console.WriteLine("From background thread. Does not block main thread.");
                Console.WriteLine("Calculating...");
                Thread.Sleep(3000);
                Console.WriteLine("Background work completed.");
            }).Finally(() => Console.WriteLine("Main thread completed."));
            Console.WriteLine("\r\n\t In Main Thread...\r\n");
            o.Wait();   // Wait for completion of background operation.
        }

        static async Task Main(string[] args)
        {
            using var exchange = new Exchange();
            var dataProvider = new DataProvider(exchange);
            await dataProvider.DownloadAsync("TRXUSDT",KlineInterval.FiveMinutes,new DateTime(2019,1,1),2),100).ConfigureAwait(false);

            Console.ReadLine();
        }
    }

    public class OHLCV
    {
        public DateTime Timestamp { get; set; }
        public decimal Open { get; set; }
        public decimal High { get; set; }
        public decimal Low { get; set; }
        public decimal Close { get; set; }
        public decimal Volume { get; set; }
    }

    public static class Extensions
    {
        public static OHLCV ToCandle(this IBinanceKline candle)
        {
            return new OHLCV
            {
                Timestamp = candle.OpenTime,Open = candle.Open,High = candle.High,Low = candle.Low,Close = candle.Close,Volume = candle.BaseVolume,};
        }
    }

    public class Exchange : IDisposable
    {
        private readonly IBinanceClient _client;

        public Exchange()
        {
            _client = new BinanceClient();
        }

        public async Task<List<OHLCV>> GetCandlesAsync(string pair,DateTime? startTime = null,DateTime? endTime = null,int? limit = null)
        {
            var result = await _client.Spot.Market.GetKlinesAsync(pair,startTime,endTime,limit).ConfigureAwait(false);

            if (result.Success)
            {
                return result.Data?.Select(e => e.ToCandle()).ToList();
            }

            return null;
        }

        public void Dispose()
        {
            if (_client != null)
            {
                _client.Dispose();
            }
        }
    }
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)