问题描述
我有一个Websocket订阅了客户的新闻提要。收到消息后,我想采取以下措施:
from client import Client,SocketManager
import asyncio
from concurrent.futures import ThreadPoolExecutor
from time import time
class client(Client):
def __init__():
self.loop = asyncio.get_event_loop()
async def start_websockets(self):
self.sm = await SocketManager.create(self.loop,self,self.handle_evt)
await self.sm.subscribe()
while not self.received:
await asyncio.sleep(10,loop=self.loop)
async def handle_evt(self,msg):
self.msg = msg
subject1 = msg['data']['subjetct']
subject2 = msg['data']['subjetct2']
with ThreadPoolExecutor(max_workers=2) as executor:
future_buy = executor.submit(self.process,subject1)
future_sell = executor.submit(self.process,subject2)
future_buy.result()
future_sell.result()
def process(self,subject):
if 'type' in subject:
# do something
else:
# do something
async def main(self):
await self.start_websockets()
while True:
if time() > start + 60*5:
start = time()
print(self.msg)
def start(self):
self.loop.run_until_complete(self.main())
但是,它似乎卡在了ThreadPoolExecutor中。我可能在某个地方缺少等待,但是我对asyncio不太熟悉。 有人可以帮我这个忙吗?
解决方法
通过loop.run_in_executor
使用asyncio
的内置执行程序和包装器:
async def handle_evt(self,msg):
self.msg = msg
subject1 = msg['data']['subjetct']
subject2 = msg['data']['subjetct2']
loop = asyncio.get_running_loop()
future_buy = loop.run_in_executor(None,self.process,subject1)
future_sell = loop.run_in_executor(None,subject2)
# allow other coroutines to run while waiting
await future_buy
await future_sell
concurrent.futures.Future
本质上是同步的–等待Future.result()
阻塞当前线程。因此,在事件循环中执行它会阻塞事件循环,包括所有协程。
asyncio
包装器提供了asyncio.Future
,它允许异步等待结果。