Python不断通过websocket接收消息

问题描述

我正在尝试通过websocket连接并使它保持活动状态,以便从服务器连续检索消息。我写了clientHelper并在Internet上找到了socketManager和ReconnectingWebsocket,但是我对它到底出了什么问题不太了解,因为我没有收到任何关于clientHelper.process_user_message函数的信息。 @H_502_1@

有人可以指出我的错误吗?@H_502_1@

import websockets as ws
import asyncio
from client import Client

class ClientHelper(Client):
   def __init__(self,api_key,api_secret):
      super().__init__(api_key,api_secret)
      self.loop = asyncio.get_event_loop()

   def _request(self,method,uri,signed,force_params=False,**kwargs):
       kwargs = self._get_request_kwargs(method,force_params,**kwargs)
       response = getattr(self.session,method)(uri,**kwargs)
       return self._handle_response(response,method)

   def _request(self,path,signed=False,version=API_VERSION,**kwargs):
       uri = self._create_api_uri(path,version)
       return self._request(method,**kwargs)

   def get_listen_key(self):
        res = self._request('post','userDataStream',signed=True,data={})
        return res['listenKey']

   async def start_websockets(self):        
      self.sm = SocketManager(self,self.loop)
      await self.sm.start_socket(self.process_user_message)

   async def process_user_message(self,msg):
      self.msg = msg
      print(msg)

   async def main(self)
      await self.start_websockets()
      while True:
          await client.getInfo()
          if 'data' in self.msg:
              print(self.msg['data'])

    def start(self):
       self.loop.run_until_complete(self.main())

class SocketManager:
   STREAM_URL = url

   def __init__(self,client,loop,user_timeout=DEFAULT_USER_TIMEOUT):
      self._client = client
      self._loop = loop
      self._conns = None

   async def _start_user_socket(self,coro,prefix='ws/'):
        if path in self._conns:
            return False

        self._conns[path] = ReconnectingWebsocket(self._loop,prefix)

        return path

   async def start_user_socket(self,coro):
        user_listen_key = await self._client.stream_get_listen_key() # manage to get the key from serveur
        conn_key = await self._start_user_socket('user',user_listen_key,coro)
        return conn_key

class ReconnectingWebsocket:
    STREAM_URL = url
    MAX_RECONNECTS = 5
    MAX_RECONNECT_SECONDS = 60
    MIN_RECONNECT_WAIT = 0.1
    TIMEOUT = 10

    def __init__(self,prefix='ws/'):
        self._loop = loop
        self._log = logging.getLogger(__name__)
        self._path = path
        self._coro = coro
        self._prefix = prefix
        self._reconnects = 0
        self._conn = None
        self._socket = None
        self._connect()

    def _connect(self):
        self._conn = asyncio.ensure_future(self._run(),loop=self._loop)

    async def _run(self):
        keep_waiting = True

        async with ws.connect(self.STREAM_URL) as socket:
            self._socket = socket
            self._reconnects = 0
            
            try:
                while keep_waiting:
                    try:
                        #evt = await self._coro(evt_obj)
                        evt = await asyncio.wait_for(self._socket.recv(),timeout=self.TIMEOUT)
                    except asyncio.TimeoutError:
                        #self._log.debug("no message in {} seconds".format(self.TIMEOUT))
                        print("no message in {} seconds".format(self.TIMEOUT))
                        await self.send_ping()
                    except asyncio.CancelledError:
                        #self._log.debug("cancelled error")
                        print("cancelled error")
                        await self.send_ping()
                    else:
                        try:
                            evt_obj = json.loads(evt)
                        except ValueError:
                            #self._log.debug('error parsing evt json:{}'.format(evt))
                            print('error parsing evt json:{}'.format(evt))
                        else:
                            await self._coro(evt_obj)
            except ws.ConnectionClosed as e:
                #self._log.debug('ws connection closed:{}'.format(e))
                print('ws connection closed:{}'.format(e))
                await self._reconnect()
            except Exception as e:
                #self._log.debug('ws exception:{}'.format(e))
                print('ws exception:{}'.format(e))
                await self._reconnect()
            
    def _get_reconnect_wait(self,attempts: int) -> int:
        expo = 2 ** attempts
        return round(random() * min(self.MAX_RECONNECT_SECONDS,expo - 1) + 1)

    async def _reconnect(self):
        await self.cancel()
        self._reconnects += 1
        if self._reconnects < self.MAX_RECONNECTS:
            reconnect_wait = self._get_reconnect_wait(self._reconnects)
            await asyncio.sleep(reconnect_wait)
            self._connect()
        else:
            self._log.error('Max reconnections {} reached:'.format(self.MAX_RECONNECTS))

    async def send_ping(self):
        if self._socket:
            await self._socket.ping()

    async def cancel(self):
        self._conn.cancel()
        self._socket = None

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)