是否可以在异步 python 中暂停和重新启动任务?

问题描述

问题应该很简单,但我找不到任何相关信息。

我有一个异步 python 程序,其中包含一个运行时间相当长的任务,我希望能够在任意点暂停和重新启动(任意点当然意味着在任何有 await 关键字的地方)。 我希望有类似 task.suspend()task.resume() 的东西,但似乎没有。 在任务或事件循环级别上是否有任何 API 或我需要以某种方式自己做这件事?我不想在每次等待之前放置一个 event.wait()...

谢谢

解决方法

您所要求的是可能的,但并非微不足道。首先,请注意,您永远不能在 every await 上挂起,而只能在导致协程挂起的那些上挂起,例如 asyncio.sleep() 或 {{1}没有准备好返回的数据。等待一个协程立即开始执行它,如果协程可以立即返回,它不会进入事件循环。 stream.read() 仅在等待者(或 等待者等)请求时暂停到事件循环。这些问题中的更多详细信息:[1][2][3][4]

考虑到这一点,您可以使用 this answer 中的技术来拦截协程的每次恢复,并使用附加代码检查任务是否暂停,如果暂停,则在继续之前等待恢复事件。

await

测试:

import asyncio

class Suspendable:
    def __init__(self,target):
        self._target = target
        self._can_run = asyncio.Event()
        self._can_run.set()
        self._task = asyncio.ensure_future(self)

    def __await__(self):
        target_iter = self._target.__await__()
        iter_send,iter_throw = target_iter.send,target_iter.throw
        send,message = iter_send,None
        # This "while" emulates yield from.
        while True:
            # wait for can_run before resuming execution of self._target
            try:
                while not self._can_run.is_set():
                    yield from self._can_run.wait().__await__()
            except BaseException as err:
                send,message = iter_throw,err

            # continue with our regular program
            try:
                signal = send(message)
            except StopIteration as err:
                return err.value
            else:
                send = iter_send
            try:
                message = yield signal
            except BaseException as err:
                send,err

    def suspend(self):
        self._can_run.clear()

    def is_suspended(self):
        return not self._can_run.is_set()

    def resume(self):
        self._can_run.set()

    def get_task(self):
        return self._task