如何处理来自binance websocket的多个流数据?

问题描述

我正在使用 unicorn_binance_websocket_api 来传输 100 种加密货币和来自 2 个不同时间范围的价格数据, 我想处理这些数据以存储不同加密货币在其时间范围内的收盘价,然后执行我的策略以查看我需要交易哪个加密货币和时间范围

我将分享有关如何为单个加密货币和单个时间范围编写策略的代码

from unicorn_binance_websocket_api.unicorn_binance_websocket_api_manager import
BinanceWebSocketapimanager
import json,numpy,talib

binance_websocket_api_manager = BinanceWebSocketapimanager(exchange="binance.com-futures")

binance_websocket_api_manager.create_stream('kline_1m','btcusdt')


closes =[]

RSI_PERIOD = 14
RSI_OVERBOUGHT = 70
RSI_OVERSOLD = 30

while True:
received_stream_data_json = binance_websocket_api_manager.pop_stream_data_from_stream_buffer()
if received_stream_data_json:
    json_data = json.loads(received_stream_data_json)
    candle_data = json_data.get('data',{})
    candle = candle_data.get('k',{})

    symboll = candle.get('s',{})
    timeframe = candle.get('i',{})
    close_prices = candle.get('c',{})
    open_prices = candle.get('o',{})
    is_candle_closed = candle.get('x',{})

    if is_candle_closed:
        closes.append(float(close_prices))

    if len(closes) > RSI_PERIOD:
        np_closes = numpy.array(closes)
        rsi = talib.RSI(np_closes,RSI_PERIOD)
        
    if (rsi[-1] > RSI_OVERBOUGHT):
        print("SELL")

    elif (rsi[-1] < RSI_OVERSOLD):
        print('BUY')

解决方法

您只需使用 subscribe_to_stream 函数并附加您想要观看的其他频道和市场。我试图通过 python-binance 库手动编写它,但它看起来很粗糙、笨拙且效率低下。所以我找到了你的问题并决定改用这个独角兽库,我得说,它非常棒。这是我的解决方案,您不需要使用 asyncio btw

   class BinanceWs:

    def __init__(self,channels,markets):
        market = 'btcusdt'
        tf = 'kline_1w'
        self.binance_websocket_api_manager = BinanceWebSocketApiManager(exchange="binance.com-futures")
        stream = self.binance_websocket_api_manager.create_stream(tf,market)
        self.binance_websocket_api_manager.subscribe_to_stream(stream,markets)

    async def run(self):
        while True:
            received_stream_data_json = self.binance_websocket_api_manager.pop_stream_data_from_stream_buffer()
            if received_stream_data_json:
                json_data = json.loads(received_stream_data_json)
                candle_data = json_data.get('data',{})

                candle = candle_data.get('k',{})
                symbol = candle.get('s',{})
                timeframe = candle.get('i',{})
                close_prices = candle.get('c',{})
                open_prices = candle.get('o',{})
                is_candle_closed = candle.get('x',{})
                print(candle_data)
                # do stuff with data ... 

async def main():
    tasks = []

    channels = ['kline_1m','kline_5m','kline_15m','kline_30m','kline_1h','kline_12h','miniTicker']
    markets = {'btcusdt','ethusdt','ltcusdt'}

    print(f'Main starting streams ... ')
    kl_socket = BinanceWs(channels=channels,markets=markets)
    task = await kl_socket.run()
    tasks.append(task)
    print(tasks)
    await asyncio.gather(*tasks)


if __name__ == "__main__":
    asyncio.run(main())

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...