问题描述
我正在观看import asyncio: Learn Python's AsyncIO #3 - Using Coroutines。讲师给出了以下示例:
import asyncio
import datetime
async def keep_printing(name):
while True:
print(name,end=" ")
print(datetime.datetime.now())
await asyncio.sleep(0.5)
async def main():
group_task = asyncio.gather(
keep_printing("First"),keep_printing("Second"),keep_printing("Third")
)
try:
await asyncio.wait_for(group_task,3)
except asyncio.TimeoutError:
print("Time's up!")
if __name__ == "__main__":
asyncio.run(main())
输出有一个例外:
First 2020-08-11 14:53:12.079830
Second 2020-08-11 14:53:12.079830
Third 2020-08-11 14:53:12.080828
First 2020-08-11 14:53:12.580865
Second 2020-08-11 14:53:12.580865
Third 2020-08-11 14:53:12.581901
First 2020-08-11 14:53:13.081979
Second 2020-08-11 14:53:13.082408
Third 2020-08-11 14:53:13.082408
First 2020-08-11 14:53:13.583497
Second 2020-08-11 14:53:13.583935
Third 2020-08-11 14:53:13.584946
First 2020-08-11 14:53:14.079666
Second 2020-08-11 14:53:14.081169
Third 2020-08-11 14:53:14.115689
First 2020-08-11 14:53:14.570694
Second 2020-08-11 14:53:14.571668
Third 2020-08-11 14:53:14.635769
First 2020-08-11 14:53:15.074124
Second 2020-08-11 14:53:15.074900
Time's up!
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError
讲师试图通过在CancelledError
中添加try/except
来处理keep_printing
:
async def keep_printing(name):
while True:
print(name,end=" ")
print(datetime.datetime.now())
try:
await asyncio.sleep(0.5)
except asyncio.CancelledError:
print(name,"was cancelled!")
break
但是,仍然发生相同的异常:
# keep printing datetimes
...
First was cancelled!
Second was cancelled!
Third was cancelled!
Time's up!
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError
然后,讲师只是继续学习其他主题,而从没有再回到这个示例来演示如何解决它。幸运的是,通过实验,我发现可以通过在try/except
异步函数的except asyncio.TimeoutError:
下添加另一个main
来解决此问题:
async def main():
group_task = asyncio.gather(
keep_printing("First"),3)
except asyncio.TimeoutError:
print("Time's up!")
try:
await group_task
except asyncio.CancelledError:
print("Main was cancelled!")
最终输出是:
# keep printing datetimes
...
First was cancelled!
Second was cancelled!
Third was cancelled!
Time's up!
Main was cancelled!
实际上,对于此版本的main
,我们甚至不需要try...except asyncio.CancelledError
中的keep_printing
。仍然可以正常工作。
那是为什么?为什么在CancelledError
中捕获main
而不在keep_printing
中捕获有效?视频讲师处理此异常的方式只会让我更加困惑。首先,他不需要更改keep_printing
的任何代码!
解决方法
当aw由于超时而被取消时,wait_for等待aw被取消。如果在协程中处理CancelledError,则会收到超时错误。 在3.7版中进行了更改。
示例
import asyncio
import datetime
async def keep_printing(name):
print(datetime.datetime.now())
try:
await asyncio.sleep(3600)
except asyncio.exceptions.CancelledError:
print("done")
async def main():
try:
await asyncio.wait_for(keep_printing("First"),timeout=3)
except asyncio.exceptions.TimeoutError:
print("timeouted")
if __name__ == "__main__":
asyncio.run(main())
用于从Task或Future检索结果的collect方法,您有一个无限循环,并且永不返回任何结果。如果aws序列中的任何Task或Future被取消(wait_for发生了什么情况),则将其视为引发CancelledError的情况–在这种情况下不会取消collect()调用。这是为了防止取消一个已提交的任务/功能导致其他任务/功能被取消。
对于保护性聚集方法,您可以将其覆盖到屏蔽层上。
import asyncio
import datetime
async def keep_printing(name):
while True:
print(name,datetime.datetime.now())
try:
await asyncio.sleep(0.5)
except asyncio.exceptions.CancelledError:
print(f"canceled {name}")
return None
async def main():
group_task = asyncio.shield(asyncio.gather(
keep_printing("First"),keep_printing("Second"),keep_printing("Third"))
)
try:
await asyncio.wait_for(group_task,3)
except asyncio.exceptions.TimeoutError:
print("Done")
if __name__ == "__main__":
asyncio.run(main())
,
让我们找出正在发生的事情:
- 此代码调度要执行的三个协程,并返回汇总结果的
Future
对象group_task
(内部类_GatheringFuture
的实例)。
group_task = asyncio.gather(
keep_printing("First"),keep_printing("Third")
)
- 此代码等待将来超时完成。而且,如果发生超时,它将取消并增加
asyncio.TimeoutError
。
try:
await asyncio.wait_for(group_task,3)
except asyncio.TimeoutError:
print("Time's up!")
- 发生超时。让我们看一下异步库
task.py
。wait_for
执行以下操作:
timeout_handle = loop.call_later(timeout,_release_waiter,waiter)
...
await waiter
...
await _cancel_and_wait(fut,loop=loop) # _GatheringFuture.cancel() inside
raise exceptions.TimeoutError()
- 当我们执行
_GatheringFuture.cancel()
时,如果实际上取消了任何子任务,则CancelledError
会传播
class _GatheringFuture(futures.Future):
...
def cancel(self):
...
for child in self._children:
if child.cancel():
ret = True
if ret:
# If any child tasks were actually cancelled,we should
# propagate the cancellation request regardless of
# *return_exceptions* argument. See issue 32684.
self._cancel_requested = True
return ret
后来
...
if outer._cancel_requested:
# If gather is being cancelled we must propagate the
# cancellation regardless of *return_exceptions* argument.
# See issue 32684.
outer.set_exception(exceptions.CancelledError())
else:
outer.set_result(results)
- 因此,从收集
future
中提取结果或异常更为正确
async def main():
group_task = asyncio.gather(
keep_printing("First"),keep_printing("Third")
)
try:
await asyncio.wait_for(group_task,3)
except asyncio.TimeoutError:
print("Time's up!")
try:
result = await group_task
except asyncio.CancelledError:
print("Gather was cancelled")
,
我认为您需要将 await
放在 asyncio.gather
之前。
所以这个调用来自您的代码:
group_task = asyncio.gather(
keep_printing("First"),keep_printing("Third")
)
需要改成:
group_task = await asyncio.gather(
keep_printing("First"),keep_printing("Third")
)
不知道为什么,我还在学习这些东西。