取消的异步任务导致“任务已销毁,但未决”

问题描述

我正在尝试编写一个作业调度程序,该作业调度程序可以跨N个内核有效地调度M个作业。一项工作完成后,应立即开始一项新的工作。另外,我们应该支持超时,这样任何任务都不会花费超过一定时间的时间。这是我为主循环设计的:

import asyncio
import sys

max_concurrency = 4

async def _sleep_asynchronously(time):
    index,seconds = time
    await asyncio.sleep(seconds)
    return (index,seconds)

def select_invocations(waiting,num_running):
    count = max_concurrency - num_running
    selected = waiting[:count]
    waiting = waiting[count:]
    return selected,waiting

async def _run_everything_asynchronously():
    tasks = []
    timeouts = [ 4,3,1,2,0.5,7,0.25,4.5,5]
    timeouts = list(enumerate(timeouts))

    pending,waiting = select_invocations(tasks,0)
    running = {_sleep_asynchronously(timeout) for timeout in timeouts}

    while len(running) > 0:
        try:
            done,running = await asyncio.wait(running,timeout=0.5,return_when=asyncio.FIRST_COMPLETED)
            if not done:
                for r in running:
                    r.cancel()
                    await r
            else:
                for d in done:
                    index,timeout = await d
                    print("Index {} finished waiting for {} seconds".format(index,timeout))

        except asyncio.CancelledError as e:
            running.clear()

        if len(waiting) > 0:
            pending,len(running))
            running = {_sleep_asynchronously(timeout) for timeout in timeouts}

if 'win32' in sys.platform:
    asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())

loop = asyncio.get_event_loop()
rc = loop.run_until_complete(_run_everything_asynchronously())
loop.close()

sys.exit(0)

如果我运行它,这是我的输出:

Index 6 finished waiting for 0.25 seconds
Index 4 finished waiting for 0.5 seconds
Index 9 finished waiting for 1 seconds
Index 2 finished waiting for 1 seconds
Task was destroyed but it is pending!
task: <Task pending coro=<_sleep_asynchronously() done,defined at D:\src\asynciotest.py:6> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E74D13BDF8>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<_sleep_asynchronously() done,defined at D:\src\asynciotest.py:6> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E74D352438>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<_sleep_asynchronously() done,defined at D:\src\asynciotest.py:6> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E74D37F9D8>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<_sleep_asynchronously() done,defined at D:\src\asynciotest.py:6> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E74D37FBE8>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<_sleep_asynchronously() done,defined at D:\src\asynciotest.py:6> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E74D3A15B8>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<_sleep_asynchronously() done,defined at D:\src\asynciotest.py:6> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E74D3E7498>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<_sleep_asynchronously() done,defined at D:\src\asynciotest.py:6> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E74D3E74C8>()]>>

我在这里想念什么?如何正确清理已取消的任务?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)