问题描述
我有以下代码要尝试进行异步处理,但是我需要关闭连接才能完成此操作,但我不知道如何做。 websockets模块已经使用async并可以正常工作,但是由于我将Pysher用作websocket服务,因此我需要使用Pusher。
有一个Pyhser库,它处理Pusher进程,但不是异步的。
这就是我所做的:
import pysher # pysher uses websocket_lib
import traceback
import ujson
trading_pairs: List[str] = await self.get_trading_pairs()
pusher = pysher.Pusher(PUSHER_KEY,PUSHER_CLUSTER)
async def handle_update(raw_msg,**kwargs):
try:
msg = ujson.loads(raw_msg)
msg["trading_pair"] = kwargs["trading_pair"]
msg["timestamp"] = time.time()
yeld msg
except Exception as e:
print('handle_update ',e)
traceback.format_exc()
finally:
pusher.disconnect()
def connect_handler(data,**kwargs):
try:
for trading_pair in trading_pairs:
channel = pusher.subscribe(f"market-{trading_pair}-global")
channel.bind("update",handle_update,trading_pair=trading_pair,channel=f"market-{trading_pair}-global")
except Exception as e:
traceback.format_exc()
pusher.connection.bind("pusher:connection_established",connect_handler,trading_pairs=trading_pairs)
pusher.connect()
我要复制的代码是这样的:
import websockets
import traceback
import ujson
async with websockets.connect(stream_url) as ws:
async for raw_msg in self._inner_messages(ws):
msg = ujson.loads(raw_msg)
yeld msg
async def _inner_messages(self):
try:
while True:
try:
msg: str = await asyncio.wait_for(ws.recv(),timeout=5)
yield msg
except asyncio.TimeoutError:
try:
pong_waiter = await ws.ping()
await asyncio.wait_for(pong_waiter,timeout=5)
except asyncio.TimeoutError:
raise
except asyncio.TimeoutError:
traceback.print_exec()
finally:
await ws.close()
第一个代码永远循环,第二个代码正常工作。如果我使用websocket,则只会收到订阅数据,而不会收到数据本身(在这种情况下是交易对数据)。
该示例使用websockets,而pysher使用websocket。 Websocket允许绑定事件。
我也尝试使用pysherasync lib,但没有成功,因为它仅接收与订阅相关的数据(“ subscription success msg”)
Pusher Websocket订阅如何异步实现,并绑定了一个功能来处理实际数据的接收(订阅未成功通过msg)?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)