Python-取消异步任务?

问题描述

我在下面为异步池编写了代码。在__aexit__中,我要在任务完成后取消_worker任务。但是,当我运行代码时,工作任务并没有被取消,并且代码永远运行。任务如下所示:<Task pending coro=<AsyncPool._worker() running at \async_pool.py:17> wait_for=<Future cancelled>>asyncio.wait_for被取消,但工作任务未取消。

class AsyncPool:
    def __init__(self,coroutine,no_of_workers,timeout):
        self._loop           = asyncio.get_event_loop()
        self._queue          = asyncio.Queue()
        self._no_of_workers  = no_of_workers
        self._coroutine      = coroutine
        self._timeout        = timeout
        self._workers        = None

    async def _worker(self): 
        while True:
            try:
                ret = False
                queue_item           = await self._queue.get()
                ret = True
                result               = await asyncio.wait_for(self._coroutine(queue_item),timeout = self._timeout,loop= self._loop)
            except Exception as e:
                print(e)
            finally:
                if ret:
                    self._queue.task_done()


    async def push_to_queue(self,item):
        self._queue.put_Nowait(item)
    
    async def __aenter__(self):
        assert self._workers == None
        self._workers = [asyncio.create_task(self._worker()) for _ in range(self._no_of_workers)]
        return self
    
    async def __aexit__(self,type,value,traceback):
        await self._queue.join()

        for worker in self._workers:
            worker.cancel()

        await asyncio.gather(*self._workers,loop=self._loop,return_exceptions =True)

要使用Asyncpool,

async def something(item):
    print("got",item)
    await asyncio.sleep(item)
 
async def main():
    async with AsyncPool(something,5,2) as pool:
        for i in range(10):
            await pool.push_to_queue(i)
 
asyncio.run(main())

我终端的输出

enter image description here

解决方法

问题在于您的except Exception异常子句还会捕获取消,并忽略它。更令人困惑的是,print(e)仅在出现CancelledError时打印空行,这是输出中的空行来自的地方。 (将其更改为print(type(e))可以显示发生了什么。)

要解决此问题,请将except Exception更改为更具体的内容,例如except asyncio.TimeoutError。在Python 3.8中,不需要进行此更改,在Python 3.8中,asyncio.CancelledError不再源自Exception,而是源自BaseException,因此except Exception不能抓住它。

,

创建asyncio任务然后将其取消后,您仍然有需要“回收”的任务。因此,您想要await worker。但是,一旦您await取消了这样的任务,因为它将永远不会给您返回期望的返回值,则asyncio.CancelledError将被提升,您需要将其捕获在某个地方。

由于这种行为,我认为您不应该gather将它们await} async def __aexit__(self,type,value,traceback): await self._queue.join() for worker in self._workers: worker.cancel() for worker in self._workers: try: await worker except asyncio.CancelledError: print("worker cancelled:",worker) ,因为它们应该立即返回:

{{1}}