问题描述
我正在尝试通过websocket连接并使它保持活动状态,以便从服务器连续检索消息。我写了clientHelper并在Internet上找到了socketManager和ReconnectingWebsocket,但是我对它到底出了什么问题不太了解,因为我没有收到任何关于clientHelper.process_user_message函数的信息。 @H_502_1@
import websockets as ws
import asyncio
from client import Client
class ClientHelper(Client):
def __init__(self,api_key,api_secret):
super().__init__(api_key,api_secret)
self.loop = asyncio.get_event_loop()
def _request(self,method,uri,signed,force_params=False,**kwargs):
kwargs = self._get_request_kwargs(method,force_params,**kwargs)
response = getattr(self.session,method)(uri,**kwargs)
return self._handle_response(response,method)
def _request(self,path,signed=False,version=API_VERSION,**kwargs):
uri = self._create_api_uri(path,version)
return self._request(method,**kwargs)
def get_listen_key(self):
res = self._request('post','userDataStream',signed=True,data={})
return res['listenKey']
async def start_websockets(self):
self.sm = SocketManager(self,self.loop)
await self.sm.start_socket(self.process_user_message)
async def process_user_message(self,msg):
self.msg = msg
print(msg)
async def main(self)
await self.start_websockets()
while True:
await client.getInfo()
if 'data' in self.msg:
print(self.msg['data'])
def start(self):
self.loop.run_until_complete(self.main())
class SocketManager:
STREAM_URL = url
def __init__(self,client,loop,user_timeout=DEFAULT_USER_TIMEOUT):
self._client = client
self._loop = loop
self._conns = None
async def _start_user_socket(self,coro,prefix='ws/'):
if path in self._conns:
return False
self._conns[path] = ReconnectingWebsocket(self._loop,prefix)
return path
async def start_user_socket(self,coro):
user_listen_key = await self._client.stream_get_listen_key() # manage to get the key from serveur
conn_key = await self._start_user_socket('user',user_listen_key,coro)
return conn_key
class ReconnectingWebsocket:
STREAM_URL = url
MAX_RECONNECTS = 5
MAX_RECONNECT_SECONDS = 60
MIN_RECONNECT_WAIT = 0.1
TIMEOUT = 10
def __init__(self,prefix='ws/'):
self._loop = loop
self._log = logging.getLogger(__name__)
self._path = path
self._coro = coro
self._prefix = prefix
self._reconnects = 0
self._conn = None
self._socket = None
self._connect()
def _connect(self):
self._conn = asyncio.ensure_future(self._run(),loop=self._loop)
async def _run(self):
keep_waiting = True
async with ws.connect(self.STREAM_URL) as socket:
self._socket = socket
self._reconnects = 0
try:
while keep_waiting:
try:
#evt = await self._coro(evt_obj)
evt = await asyncio.wait_for(self._socket.recv(),timeout=self.TIMEOUT)
except asyncio.TimeoutError:
#self._log.debug("no message in {} seconds".format(self.TIMEOUT))
print("no message in {} seconds".format(self.TIMEOUT))
await self.send_ping()
except asyncio.CancelledError:
#self._log.debug("cancelled error")
print("cancelled error")
await self.send_ping()
else:
try:
evt_obj = json.loads(evt)
except ValueError:
#self._log.debug('error parsing evt json:{}'.format(evt))
print('error parsing evt json:{}'.format(evt))
else:
await self._coro(evt_obj)
except ws.ConnectionClosed as e:
#self._log.debug('ws connection closed:{}'.format(e))
print('ws connection closed:{}'.format(e))
await self._reconnect()
except Exception as e:
#self._log.debug('ws exception:{}'.format(e))
print('ws exception:{}'.format(e))
await self._reconnect()
def _get_reconnect_wait(self,attempts: int) -> int:
expo = 2 ** attempts
return round(random() * min(self.MAX_RECONNECT_SECONDS,expo - 1) + 1)
async def _reconnect(self):
await self.cancel()
self._reconnects += 1
if self._reconnects < self.MAX_RECONNECTS:
reconnect_wait = self._get_reconnect_wait(self._reconnects)
await asyncio.sleep(reconnect_wait)
self._connect()
else:
self._log.error('Max reconnections {} reached:'.format(self.MAX_RECONNECTS))
async def send_ping(self):
if self._socket:
await self._socket.ping()
async def cancel(self):
self._conn.cancel()
self._socket = None
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)