Python使用asyncio创建websocket连接

问题描述

我正在尝试通过websocket连接并使用asyncio进行连接,但是很难。 我想连接到服务端(STREAM_URL)并使用ClientHelper.process_user_message函数打印它发送回给我的消息。

我的代码如下:

from twisted.internet.protocol import ReconnectingClientFactory
from autobahn.asyncio.websocket import WebSocketClientFactory,WebSocketClientProtocol
import Client
import asyncio

Class ClientHelper(Client):
   def __init__(self,api_key,api_secret):
      super().__init__(api_key,api_secret)
      self.loop = asyncio.get_event_loop()

   async def start_websockets(self):        
      self.sm = SocketManager(self,self.loop)
      await self.sm.start_socket(self.process_user_message)

   async def process_user_message(self,msg):
      self.msg = msg
      print(msg)

   async def main(self)
      await self.start_websockets()
      while True:
          # do something with messages

    def start(self):
       self.loop.run_until_complete(self.main())

class SocketManager:
   STREAM_URL = url

   def __init__(self,client,loop,user_timeout=DEFAULT_USER_TIMEOUT):
      self._client = client
      self._loop = loop

   async def start_socket(self,path,callback):
      factory = ClientFactory(self.STREAM_URL)
      factory.protocol = ClientProtocol
      factory.callback = callback
      factory.reconnect = True        
        
      coro = self._loop.create_connection(factory,host,port,ssl=True)
      task = self._loop.create_task(coro)

class ClientProtocol(WebSocketClientProtocol):

    def __init__(self):
        self.ws = None
        super().__init__()

    def onConnect(self,response):
        # reset the delay after reconnecting
        self.factory.resetDelay()

    def onMessage(self,payload,isBinary):
        if not isBinary:
            try:
                payload_obj = json.loads(payload.decode('utf8'))
                self.factory.callback(payload_obj)
            except ValueError:
                pass
        else:
            self.factory.callback(payload_obj)

class MyReconnectingClientFactory(ReconnectingClientFactory):
    initialDelay = 0.1
    maxDelay = 10
    maxRetries = 5

class ClientFactory(WebSocketClientFactory,MyReconnectingClientFactory):

    protocol = ClientProtocol
    _reconnect_error_payload = {'e': 'error','m': 'Max reconnect retries reached'}

    def __init__(self,url):
        self.ws = None
        super().__init__(url)
        
    def clientConnectionFailed(self,connector,reason):
        self.retry(connector)
        if self.retries > self.maxRetries:
            self.callback(self._reconnect_error_payload)

    def clientConnectionLost(self,reason):
        self.retry(connector)
        if self.retries > self.maxRetries:
            self.callback(self._reconnect_error_payload)

在所有这些事件循环中,我都很迷茫,无法正常工作。请帮忙一些帮助!

谢谢你们!

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)