asyncio.CancelledError和“从未检索到_GatheringFuture异常”的怪异行为

问题描述

我正在观看import asyncio: Learn Python's AsyncIO #3 - Using Coroutines。讲师给出了以下示例:

import asyncio
import datetime

async def keep_printing(name):
    while True:
        print(name,end=" ")
        print(datetime.datetime.now())
        await asyncio.sleep(0.5)

async def main():
    group_task = asyncio.gather(
                     keep_printing("First"),keep_printing("Second"),keep_printing("Third")
                 )
    try:
        await asyncio.wait_for(group_task,3)
    except asyncio.TimeoutError:
        print("Time's up!")


if __name__ == "__main__":
    asyncio.run(main())

输出有一个例外:

First 2020-08-11 14:53:12.079830
Second 2020-08-11 14:53:12.079830
Third 2020-08-11 14:53:12.080828 
First 2020-08-11 14:53:12.580865
Second 2020-08-11 14:53:12.580865
Third 2020-08-11 14:53:12.581901 
First 2020-08-11 14:53:13.081979
Second 2020-08-11 14:53:13.082408
Third 2020-08-11 14:53:13.082408 
First 2020-08-11 14:53:13.583497
Second 2020-08-11 14:53:13.583935
Third 2020-08-11 14:53:13.584946
First 2020-08-11 14:53:14.079666
Second 2020-08-11 14:53:14.081169
Third 2020-08-11 14:53:14.115689
First 2020-08-11 14:53:14.570694
Second 2020-08-11 14:53:14.571668
Third 2020-08-11 14:53:14.635769
First 2020-08-11 14:53:15.074124
Second 2020-08-11 14:53:15.074900
Time's up!
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError

讲师试图通过在CancelledError中添加try/except来处理keep_printing

async def keep_printing(name):
    while True:
        print(name,end=" ")
        print(datetime.datetime.now())
        try:
            await asyncio.sleep(0.5)
        except asyncio.CancelledError:
            print(name,"was cancelled!")
            break

但是,仍然发生相同的异常:

# keep printing datetimes
...
First was cancelled!
Second was cancelled!
Third was cancelled!
Time's up!
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError

然后,讲师只是继续学习其他主题,而从没有再回到这个示例来演示如何解决它。幸运的是,通过实验,我发现可以通过在try/except异步函数的except asyncio.TimeoutError:下添加另一个main来解决此问题:

async def main():
    group_task = asyncio.gather(
                     keep_printing("First"),3)
    except asyncio.TimeoutError:
        print("Time's up!")
        try:
            await group_task
        except asyncio.CancelledError:
            print("Main was cancelled!")

最终输出是:

# keep printing datetimes
...
First was cancelled!
Second was cancelled!
Third was cancelled!
Time's up!
Main was cancelled!

实际上,对于此版本的main,我们甚至不需要try...except asyncio.CancelledError中的keep_printing。仍然可以正常工作。

那是为什么?为什么在CancelledError中捕获main而不在keep_printing中捕获有效?视频讲师处理此异常的方式只会让我更加困惑。首先,他不需要更改keep_printing的任何代码!

解决方法

当aw由于超时而被取消时,wait_for等待aw被取消。如果在协程中处理CancelledError,则会收到超时错误。 在3.7版中进行了更改。
示例

import asyncio
import datetime

async def keep_printing(name):
    print(datetime.datetime.now())
    try:
        await asyncio.sleep(3600)
    except asyncio.exceptions.CancelledError:
        print("done")

async def main():
    try:
        await asyncio.wait_for(keep_printing("First"),timeout=3)
    except asyncio.exceptions.TimeoutError:
        print("timeouted")


if __name__ == "__main__":
    asyncio.run(main())

用于从Task或Future检索结果的collect方法,您有一个无限循环,并且永不返回任何结果。如果aws序列中的任何Task或Future被取消(wait_for发生了什么情况),则将其视为引发CancelledError的情况–在这种情况下不会取消collect()调用。这是为了防止取消一个已提交的任务/功能导致其他任务/功能被取消。
对于保护性聚集方法,您可以将其覆盖到屏蔽层上。

import asyncio
import datetime


async def keep_printing(name):
    while True:
        print(name,datetime.datetime.now())
        try:
            await asyncio.sleep(0.5)
        except asyncio.exceptions.CancelledError:
            print(f"canceled {name}")
            return None

async def main():
    group_task = asyncio.shield(asyncio.gather(
                     keep_printing("First"),keep_printing("Second"),keep_printing("Third"))
                    )
    try:
        await asyncio.wait_for(group_task,3)
    except asyncio.exceptions.TimeoutError:
        print("Done")


if __name__ == "__main__":
    asyncio.run(main())
,

让我们找出正在发生的事情:

  1. 此代码调度要执行的三个协程,并返回汇总结果的Future对象group_task(内部类_GatheringFuture的实例)。
group_task = asyncio.gather(
                     keep_printing("First"),keep_printing("Third")
                 )
  1. 此代码等待将来超时完成。而且,如果发生超时,它将取消并增加asyncio.TimeoutError
    try:
        await asyncio.wait_for(group_task,3)
    except asyncio.TimeoutError:
        print("Time's up!")
  1. 发生超时。让我们看一下异步库task.pywait_for执行以下操作:
timeout_handle = loop.call_later(timeout,_release_waiter,waiter)
...
await waiter
...
await _cancel_and_wait(fut,loop=loop)  # _GatheringFuture.cancel() inside
raise exceptions.TimeoutError()
  1. 当我们执行_GatheringFuture.cancel()时,如果实际上取消了任何子任务,则CancelledError会传播
class _GatheringFuture(futures.Future):
    ...
    def cancel(self):
        ...
        for child in self._children:
            if child.cancel():
                ret = True
        if ret:
            # If any child tasks were actually cancelled,we should
            # propagate the cancellation request regardless of
            # *return_exceptions* argument.  See issue 32684.
            self._cancel_requested = True
        return ret

后来

...
if outer._cancel_requested:
    # If gather is being cancelled we must propagate the
    # cancellation regardless of *return_exceptions* argument.
    # See issue 32684.
    outer.set_exception(exceptions.CancelledError())
else:
    outer.set_result(results)
  1. 因此,从收集future中提取结果或异常更为正确
async def main():
    group_task = asyncio.gather(
                     keep_printing("First"),keep_printing("Third")
                 )
    try:
        await asyncio.wait_for(group_task,3)
    except asyncio.TimeoutError:
        print("Time's up!")

    try:
        result = await group_task
    except asyncio.CancelledError:
        print("Gather was cancelled")
,

我认为您需要将 await 放在 asyncio.gather 之前。 所以这个调用来自您的代码:

    group_task = asyncio.gather(
                     keep_printing("First"),keep_printing("Third")
                 )

需要改成:

    group_task = await asyncio.gather(
                     keep_printing("First"),keep_printing("Third")
                 )

不知道为什么,我还在学习这些东西。

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...