带有asyncio的Python多线程

问题描述

我有一个Websocket订阅了客户的新闻提要。收到消息后,我想采取以下措施:

from client import Client,SocketManager
import asyncio
from concurrent.futures import ThreadPoolExecutor
from time import time

class client(Client):
def __init__():
    self.loop = asyncio.get_event_loop()

async def start_websockets(self):        
    self.sm = await SocketManager.create(self.loop,self,self.handle_evt)
    await self.sm.subscribe()
    while not self.received:
        await asyncio.sleep(10,loop=self.loop)

async def handle_evt(self,msg):
      self.msg = msg
      subject1 = msg['data']['subjetct']
      subject2 = msg['data']['subjetct2']

      with ThreadPoolExecutor(max_workers=2) as executor:
            future_buy = executor.submit(self.process,subject1)
            future_sell = executor.submit(self.process,subject2)
            
            future_buy.result()
            future_sell.result()

 def process(self,subject):
      if 'type' in subject:
         # do something
      else:
         # do something
 
 async def main(self):
     await self.start_websockets()
     while True:
        if time() > start + 60*5:
            start = time()
            print(self.msg)
 def start(self):
    self.loop.run_until_complete(self.main())

但是,它似乎卡在了ThreadPoolExecutor中。我可能在某个地方缺少等待,但是我对asyncio不太熟悉。 有人可以帮我这个忙吗?

解决方法

通过loop.run_in_executor使用asyncio的内置执行程序和包装器:

async def handle_evt(self,msg):
      self.msg = msg
      subject1 = msg['data']['subjetct']
      subject2 = msg['data']['subjetct2']

      loop = asyncio.get_running_loop()
      future_buy = loop.run_in_executor(None,self.process,subject1)
      future_sell = loop.run_in_executor(None,subject2)
      # allow other coroutines to run while waiting
      await future_buy
      await future_sell

concurrent.futures.Future本质上是同步的–等待Future.result() 阻塞当前线程。因此,在事件循环中执行它会阻塞事件循环,包括所有协程。

asyncio包装器提供了asyncio.Future,它允许异步等待结果。