问题描述
这段代码是这样工作的:
名为“ids”的列表包含 ID 号。我通过 ID 号下载特定的消息。 'nDownload' 是列表索引。队列的大小值等于 5。
我从列表中选取项目,一次下载一条消息并将其添加到队列中。 当 nDownload 等于 6 时:
- 出现QueueFull 异常。
- 创建 5 个工作器。
- 工作人员从消息中提取元数据用于其他目的。
- await queue.join() 阻塞直到队列中的所有项目都被获取和处理。
- 结束 -> 删除工作人员。
代码有效,直到现在我都没有问题。
nDownload = 0
workers = []
while (nDownload <= len(ids)):
try:
async for msg in get_messages(channel,ids=ids[nDownload]):
nDownload = nDownload + 1
try:
queue.put_Nowait(msg)
except (asyncio.QueueFull,IndexError) as qErr:
nDownload = nDownload - 1
workers = [asyncio.create_task(worker(queue)) for _ in range(5)]
await queue.join()
for cancel in workers:
cancel.cancel()
except IndexError as iErr:
break
问题: 有时消息有不同的大小。例如:
消息 1 = 8 分钟内下载了 100MB
消息 2 = 5 秒内下载了 1MB
一旦它下载了最短的消息(消息 2),我就会在队列中获得一个空闲的“插槽”。 不幸的是,我必须等待消息 1,因为 queue.join()
此时如何将新项目添加到队列中?
为什么我要使用 queue.join() ? 因为我不知道如何将最多 5 条消息添加到队列中,等待下载并继续 我真的需要下载一组消息而不是一次全部 谢谢
编辑: 是的,我的工人是这样定义的(简化)
async def worker(queue):
while True:
queue_msg = await queue.get()
loop = asyncio.get_event_loop()
try:
task = loop.create_task(extract(queue_msg))
await asyncio.wait_for(task,timeout=timeout)
except errors.Fail:
#Here I have to requeue the message when it fails,#so it requeues the ID in order to download the same msg later
await queue.put(queue_msg.id)
except asyncio.TimeoutError:
#requeue the msg etcc...
finally:
queue.task_done()
你的回答很聪明,谢谢 但是,我选择队列“大小 > 1”,因为我需要在“提取”任务失败时重新获取消息。 (sry我没告诉你) 我不知道如果队列大小 = 1 会发生什么并且我尝试添加项目。这个有点难
解决方法
目前还不清楚您的约束是什么,但如果我理解正确的话:
- 您最多想同时下载 5 个东西
- 您不想浪费时间 - 工作人员处理完一件物品后,应该立即获得一件新物品
队列大小应该与您的目的无关,它仅在工作程序暂时比 get_messages
快时用作缓冲区。我什至会从队列大小 1 开始,并试验较大的值是否有助于提高性能。
在 QueueFull
上生成任务似乎很奇怪且没有必要。处理生产者-消费者模式的惯用方法是创建固定数量的消费者,并让他们在到达时处理多个项目。您没有显示 worker
,因此不清楚每个工作人员是只处理一条消息还是多条消息。
我将循环重写为:
queue = asyncio.Queue(1)
workers = [asyncio.create_task(worker(queue)) for _ in range(5)]
for current in ids:
async for msg in get_messages(channel,id=current):
# enqueue msg,waiting (if needed) for a free slot in the queue
await queue.put(msg)
# wait for the remaining enqueued items to be processed
await queue.join()
# cancel the now-idle workers,which wait for a new message
# that will never arrive
for w in workers:
w.cancel()
一个worker的定义如下:
async def worker(queue):
while True:
msg = await queue.get()
... process msg ...
queue.task_done()