用tqdm跟踪芹菜和弦任务的进度? 蟒蛇

问题描述

是否有一种方法可以跟踪和弦的进度,最好是在tqdm栏中?

例如,如果我们使用the documentation exemple,我们将创建以下文件

#proj/tasks.py

@app.task
def add(x,y):
    return x + y

@app.task
def tsum(numbers):
    return sum(numbers)

,然后运行以下脚本:

from celery import chord
from proj.tasks import add,tsum

chord(add.s(i,i)
      for i in range(100))(tsum.s()).get()

我们如何在和弦上追踪进度?

  • 由于chord()对象不是函数,因此我们无法使用update_state。
  • 我们无法使用collect(),因为chord()(callback)会阻塞脚本,直到结果准备就绪为止。

理想情况下,我会设想类似this custom tqdm subclass for Dask之类的东西,但是我一直找不到类似的解决方案。

任何帮助或暗示,不胜感激!

解决方法

所以我找到了解决方法。

首先,chord()(callback)实际上不会阻止脚本,只有.get()部分会阻止。将所有任务发布到代理可能会花费很长时间。幸运的是,有一种简单的方法可以通过信号跟踪此发布过程。我们可以在发布开始之前创建进度条,并修改example handler from the documentation以对其进行更新:

from tqdm import tqdm
from celery.signals import after_task_publish

publish_pbar = tqdm(total=100,desc="Publishing tasks")

@after_task_publish.connect(sender='tasks.add')
def task_sent_handler(sender=None,headers=None,body=None,**kwargs):
    publish_pbar.update(1)

c = chord(add.s(i,i)
      for i in range(100))(tsum.s())

# The script will resume once all tasks are published so close the pbar
publish_pbar.close()

但是,这仅适用于发布任务,因为在发送任务的信号中执行了this signal。 task_success信号是在工作进程中执行的,因此,此技巧只能在工作日志中使用(据我所知)。

因此,为了跟踪所有任务已发布并且脚本恢复后的进度,我从app.control.inspect().stats()转到了工作人员统计信息。这将返回具有各种统计信息的字典,其中包括已完成的任务。这是我的实现:

tasks_pbar = tqdm(total=100,desc="Executing tasks")

previous_total = 0
current_total = 0

while current_total<100:

    current_total = 0
    for key in app.control.inspect().stats():
        current_total += app.control.inspect().stats()[key]['total']['tasks.add']

    if current_total > previous_total:
        tasks_pbar.update(current_total-previous_total)

    previous_total = current_total

results = c.get()
tasks_pbar.close()

最后,我认为可能有必要give names进行任务,以进行信号处理程序过滤和stats()dict过滤,因此请不要忘记将其添加到任务中:

#proj/tasks.py

@app.task(name='tasks.add')
def add(x,y):
    return x + y

如果有人可以找到更好的解决方案,请分享!