问题描述
我正在编写一个作业调度程序,其中我在N个协程(N 这里的问题是我的超时时间是5秒,并且我正在4个内核中调度3个作业。作业A需要2秒,作业B需要6秒,作业C需要7秒。 我们有这样的东西: 但是,由于A已完成,因此在t = 2时 如何使B和C都失败?我不知何故需要在调用 我曾经想过要做一个自己的簿记工作,即允许每个作业继续运行多少时间,然后将其中的 minimum (最小)传递给 尽管这需要我做大量的手工记账工作,但我不禁对浮点问题感到疑惑,这些问题使我决定现在不是时候取消工作了,尽管确实如此。因此,我忍不住想起来有些容易。不胜感激。def update_run_set(waiting,running,max_concurrency):
number_to_add = min(len(waiting),max_concurrency - len(running))
for i in range(0,number_to_add):
next_one = waiting.pop()
running.add(next_one)
async def _run_test_invocations_asynchronously(jobs:List[MyJob],max_concurrency:int,timeout_seconds:int):
running = set() # These tasks are actively being run
waiting = set() # These tasks have not yet started
waiting = {_run_job_coroutine(job) for job in jobs}
update_run_set(waiting,max_concurrency)
while len(running) > 0:
done,running = await asyncio.wait(running,timeout=timeout_seconds,return_when=asyncio.FirsT_COMPLETED)
if not done:
timeout_count = len(running)
[r.cancel() for r in running] # Start cancelling the timed out jobs
done,running = await asyncio.wait(running) # Wait for cancellation to finish
assert(len(done) == timeout_count)
assert(len(running) == 0)
else:
for d in done:
job_return_code = await d
if len(waiting) > 0:
update_run_set(waiting,max_concurrency)
assert(len(running) > 0)
t=0 t=1 t=2 t=3 t=4 t=5 t=6 t=7
-------|-------|-------|-------|-------|-------|-------|-------|
AAAAAAAAAAAAAAA
BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB
CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC
asyncio.await()
调用返回。然后,它循环回到顶部并再次运行。此时,B已经运行了2秒钟,但是由于它开始倒计时,并且仅剩4秒钟,直到它完成为止,B似乎成功了。因此,在4秒钟之后,我们再次返回,B成功了,然后我们开始循环,现在C完成了。asyncio.wait()
的过程中保留时间。asyncio.wait()
。然后,当某事超时时,我只能取消那些剩余时间等于我为timeout_seconds
传递的值的作业。
解决方法
您可以将每个作业包装到检查其超时的协程中,例如使用asyncio.wait_for
。可以使用asyncio.Semaphore
在同一协程中限制并行调用的数量。将这两个结合在一起,您只需要打一次wait()
,甚至只需要gather()
。例如(未试用):
# Run the job,limiting concurrency and time. This code could likely
# be part of _run_job_coroutine,omitted from the question.
async def _run_job_with_limits(job,sem,timeout):
async with sem:
try:
await asyncio.wait_for(_run_job_coroutine(job),timeout)
except asyncio.TimeoutError:
# timed out and canceled,decide what you want to return
pass
async def _run_test_invocations_async(jobs,max_concurrency,timeout):
sem = asyncio.Semaphore(max_concurrency)
return await asyncio.gather(
*(_run_job_with_limits(job,timeout) for job in jobs)
)