在币安twisted.internet websocket 和asyncio 线程之间共享数据

问题描述

我正在使用以下库:

from binance.websockets import BinanceSocketManager
from twisted.internet import reactor    

创建一个 websocket 以从 API(Binance API)获取数据并以 1 秒为间隔打印比特币的价格:

conn_key = bsm.start_symbol_ticker_socket('BTCUSDT',asset_price_stream)

每次有新的更新时,都会执行函数asset_price_stream,以新数据为参数。这有效(例如,我可以简单地将数据打印到 asset_price_stream 中的控制台)。

现在,我想用 asyncio 函数共享这些数据。目前,我正在为此使用 janus 队列。

在资产价格流中:

price_update_queue_object.queue.sync_q.put_Nowait({msg['s']: msg['a']})

在异步线程中:

async def price_updater(update_queue):
    while True:
        priceupdate = await update_queue.async_q.get()
        print(priceupdate)
        update_queue.async_q.task_done()

我在 asset_price_stream 中使用 sync_q 接口,在 asyncio 函数中使用 async_q 接口。如果我也在 asset_price_stream 中使用 async_q 接口,则会出现错误

Unhandled Error
Traceback (most recent call last):
  File "/home/elias/.local/lib/python3.7/site-packages/twisted/python/log.py",line 101,in callWithLogger
    return callWithContext({"system": lp},func,*args,**kw)
  File "/home/elias/.local/lib/python3.7/site-packages/twisted/python/log.py",line 85,in callWithContext
    return context.call({ILogContext: newCtx},**kw)
  File "/home/elias/.local/lib/python3.7/site-packages/twisted/python/context.py",line 118,in callWithContext
    return self.currentContext().callWithContext(ctx,line 83,in callWithContext
    return func(*args,**kw)
--- <exception caught here> ---
  File "/home/elias/.local/lib/python3.7/site-packages/twisted/internet/posixbase.py",line 687,in _doReadOrWrite
    why = selectable.doRead()
  File "/home/elias/.local/lib/python3.7/site-packages/twisted/internet/tcp.py",line 246,in doRead
    return self._dataReceived(data)
  File "/home/elias/.local/lib/python3.7/site-packages/twisted/internet/tcp.py",line 251,in _dataReceived
    rval = self.protocol.dataReceived(data)
  File "/home/elias/.local/lib/python3.7/site-packages/twisted/protocols/tls.py",line 324,in dataReceived
    self._flushReceiveBIO()
  File "/home/elias/.local/lib/python3.7/site-packages/twisted/protocols/tls.py",line 290,in _flushReceiveBIO
    ProtocolWrapper.dataReceived(self,bytes)
  File "/home/elias/.local/lib/python3.7/site-packages/twisted/protocols/policies.py",line 107,in dataReceived
    self.wrappedProtocol.dataReceived(data)
  File "/home/elias/.local/lib/python3.7/site-packages/autobahn/twisted/websocket.py",in dataReceived
    self._dataReceived(data)
  File "/home/elias/.local/lib/python3.7/site-packages/autobahn/websocket/protocol.py",line 1207,in _dataReceived
    self.consumeData()
  File "/home/elias/.local/lib/python3.7/site-packages/autobahn/websocket/protocol.py",line 1219,in consumeData
    while self.processData() and self.state != WebSocketProtocol.STATE_CLOSED:
  File "/home/elias/.local/lib/python3.7/site-packages/autobahn/websocket/protocol.py",line 1579,in processData
    fr = self.onFrameEnd()
  File "/home/elias/.local/lib/python3.7/site-packages/autobahn/websocket/protocol.py",line 1704,in onFrameEnd
    self._onMessageEnd()
  File "/home/elias/.local/lib/python3.7/site-packages/autobahn/twisted/websocket.py",line 318,in _onMessageEnd
    self.onMessageEnd()
  File "/home/elias/.local/lib/python3.7/site-packages/autobahn/websocket/protocol.py",line 628,in onMessageEnd
    self._onMessage(payload,self.message_is_binary)
  File "/home/elias/.local/lib/python3.7/site-packages/autobahn/twisted/websocket.py",line 321,in _onMessage
    self.onMessage(payload,isBinary)
  File "/home/elias/.local/lib/python3.7/site-packages/binance/websockets.py",line 31,in onMessage
    self.factory.callback(payload_obj)
  File "dollarbot.py",line 425,in asset_price_stream
    price_update_queue_object.queue.async_q.put_Nowait({msg['s']: msg['a']})
  File "/home/elias/.local/lib/python3.7/site-packages/janus/__init__.py",line 438,in put_Nowait
    self._parent._notify_async_not_empty(threadsafe=False)
  File "/home/elias/.local/lib/python3.7/site-packages/janus/__init__.py",line 158,in _notify_async_not_empty
    self._call_soon(task_maker)
  File "/home/elias/.local/lib/python3.7/site-packages/janus/__init__.py",line 60,in checked_call_soon
    self._loop.call_soon(callback,*args)
  File "/usr/lib/python3.7/asyncio/base_events.py",line 690,in call_soon
    self._check_thread()
  File "/usr/lib/python3.7/asyncio/base_events.py",line 728,in _check_thread
    "Non-thread-safe operation invoked on an event loop other "
builtins.RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one

当使用sync_q 时,它可以正常工作。异步线程可以打印价格更新。但有时,它只是卡住了,我不知道为什么。 API 仍在提供数据(因为我已经仔细检查过),但它停止通过队列到达异步线程。我不知道为什么会发生这种情况,而且它并不总是可重现的(我可以连续 5 次运行相同的代码而无需修改,四次有效,一次无效)。有趣的是:只要我按下 CTRL-C 中止程序,数据就会立即到达队列,而之前没有! (在程序等待关闭所有 asyncio 线程的几秒钟内)所以我觉得同步异步队列通信的某些内容错误的/与我按下 CTRL-C 时中止的其他内容同时发生。

所以我的问题是:我该如何改进该程序?有没有比 janus.Queue() 更好的方法将数据从twisted.internet websocket 发送到异步线程?我尝试了一些不同的事情(例如访问全局对象,但我无法从 asset_price_stream 访问 asyncio.lock() 因为它不是异步函数......所以它不会是线程安全的)。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)