问题描述
以下是通用 websocket 流媒体的(工作)代码。
它创建一个守护线程,从中执行 asyncio.run(...)
。
asyncio 代码产生 2 个任务,它们永远不会完成。
如何正确销毁这个对象?
其中一项任务是执行保活“ping”,因此我可以使用标志轻松退出该循环。但是另一个阻塞了来自 websocket 的消息。
import json
import aiohttp
import asyncio
import gzip
import asyncio
from threading import Thread
class WebSocket:
KEEPALIVE_INTERVAL_S = 10
def __init__(self,url,on_connect,on_msg):
self.url = url
self.on_connect = on_connect
self.on_msg = on_msg
self.streams = {}
self.worker_thread = Thread(name='WebSocket',target=self.thread_func,daemon=True).start()
def thread_func(self):
asyncio.run(self.aio_run())
async def aio_run(self):
async with aiohttp.ClientSession() as session:
self.ws = await session.ws_connect(self.url)
await self.on_connect(self)
async def ping():
while True:
print('KEEPALIVE')
await self.ws.ping()
await asyncio.sleep(WebSocket.KEEPALIVE_INTERVAL_S)
async def main_loop():
async for msg in self.ws:
def extract_data(msg):
if msg.type == aiohttp.WSMsgType.BINARY:
as_bytes = gzip.decompress(msg.data)
as_string = as_bytes.decode('utf8')
as_json = json.loads(as_string)
return as_json
elif msg.type == aiohttp.WSMsgType.TEXT:
return json.loads(msg.data)
elif msg.type == aiohttp.WSMsgType.ERROR:
print('⛔️ aiohttp.WSMsgType.ERROR')
return msg.data
data = extract_data(msg)
self.on_msg(data)
# May want this approach if we want to handle graceful shutdown
# W.task_ping = asyncio.create_task(ping())
# W.task_main_loop = asyncio.create_task(main_loop())
await asyncio.gather(
ping(),main_loop()
)
async def send_json(self,J):
await self.ws.send_json(J)
解决方法
我建议使用 asyncio.run_coroutine_threadsafe
而不是 asyncio.run
。它返回一个您可以取消的 concurrent.futures.Future
对象:
def thread_func(self):
self.future = asyncio.run_coroutine_threadsafe(
self.aio_run(),asyncio.get_event_loop()
)
# somewhere else
self.future.cancel()
另一种方法是将 ping
和 main_loop
设为任务,并在必要时取消它们:
# inside `aio_run`
self.task_ping = asyncio.create_task(ping())
self.main_loop_task = asyncio.create_task(main_loop())
await asyncio.gather(
self.task_ping,self.main_loop_task
return_exceptions=True
)
# somewhere else
self.task_ping.cancel()
self.main_loop_task.cancel()
这不会改变 aio_run
也应该与 asyncio.run_coroutine_threadsafe
一起调用的事实。 asyncio.run
应该用作 asyncio 程序的主要入口点,并且应该只调用一次。
我想建议解决方案的另一种变体。在完成协程(任务)时,我更喜欢尽量减少 cancel()
的使用(但不排除),因为有时它会使调试业务逻辑变得困难(请记住,asyncio.CancelledError
不继承自Exception
).
在您的情况下,代码可能如下所示(仅更改):
class WebSocket:
KEEPALIVE_INTERVAL_S = 10
def __init__(self,url,on_connect,on_msg):
# ...
self.worker_thread = Thread(name='WebSocket',target=self.thread_func)
self.worker_thread.start()
async def aio_run(self):
self._loop = asyncio.get_event_loop()
# ...
self._ping_task = asyncio.create_task(ping())
self._main_task = asyncio.create_task(main_loop())
await asyncio.gather(
self._ping_task,self._main_task,return_exceptions=True
)
# ...
async def stop_ping(self):
self._ping_task.cancel()
try:
await self._ping_task
except asyncio.CancelledError:
pass
async def _stop(self):
# wait ping end before socket closing
await self.stop_ping()
# lead to correct exit from `async for msg in self.ws`
await self.ws.close()
def stop(self):
# wait stopping ping and closing socket
asyncio.run_coroutine_threadsafe(
self._stop(),self._loop
).result()
self.worker_thread.join() # wait thread finish