如何使用Python和Pusher实现异步websocket?

问题描述

我有以下代码要尝试进行异步处理,但是我需要关闭连接才能完成此操作,但我不知道如何做。 websockets模块已经使用async并可以正常工作,但是由于我将Pysher用作websocket服务,因此我需要使用Pusher

有一个Pyhser库,它处理Pusher进程,但不是异步的。

这就是我所做的:

import pysher # pysher uses websocket_lib
import traceback
import ujson


trading_pairs: List[str] = await self.get_trading_pairs()

pusher = pysher.Pusher(PUSHER_KEY,PUSHER_CLUSTER)


async def handle_update(raw_msg,**kwargs):
    try:
        msg = ujson.loads(raw_msg)
        msg["trading_pair"] = kwargs["trading_pair"]
        msg["timestamp"] = time.time()
        yeld msg
    except Exception as e:
        print('handle_update ',e)
        traceback.format_exc()
    finally:
        pusher.disconnect()

 def connect_handler(data,**kwargs):
     try:
         for trading_pair in trading_pairs:
             channel = pusher.subscribe(f"market-{trading_pair}-global")
             channel.bind("update",handle_update,trading_pair=trading_pair,channel=f"market-{trading_pair}-global")
     except Exception as e:
        traceback.format_exc()

pusher.connection.bind("pusher:connection_established",connect_handler,trading_pairs=trading_pairs)
pusher.connect()

我要复制的代码是这样的:

import websockets
import traceback
import ujson

async with websockets.connect(stream_url) as ws:
   async for raw_msg in self._inner_messages(ws):
       msg = ujson.loads(raw_msg)
       yeld msg

async def _inner_messages(self):
    try:
        while True:
            try:
                msg: str = await asyncio.wait_for(ws.recv(),timeout=5)
                yield msg
            except asyncio.TimeoutError:
                try:
                    pong_waiter = await ws.ping()
                    await asyncio.wait_for(pong_waiter,timeout=5)
                except asyncio.TimeoutError:
                    raise
    except asyncio.TimeoutError:
        traceback.print_exec()
    finally:
        await ws.close()

第一个代码永远循环,第二个代码正常工作。如果我使用websocket,则只会收到订阅数据,而不会收到数据本身(在这种情况下是交易对数据)。

该示例使用websockets,而pysher使用websocket。 Websocket允许绑定事件。

我也尝试使用pysherasync lib,但没有成功,因为它仅接收与订阅相关的数据(“ subscription success msg”)

Pusher Websocket订阅如何异步实现,并绑定了一个功能来处理实际数据的接收(订阅未成功通过msg)?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...