问题描述
是否有一种方法可以跟踪和弦的进度,最好是在tqdm栏中?
例如,如果我们使用the documentation exemple,我们将创建以下文件:
#proj/tasks.py
@app.task
def add(x,y):
return x + y
@app.task
def tsum(numbers):
return sum(numbers)
,然后运行以下脚本:
from celery import chord
from proj.tasks import add,tsum
chord(add.s(i,i)
for i in range(100))(tsum.s()).get()
我们如何在和弦上追踪进度?
- 由于chord()对象不是函数,因此我们无法使用update_state。
- 我们无法使用collect(),因为chord()(callback)会阻塞脚本,直到结果准备就绪为止。
理想情况下,我会设想类似this custom tqdm subclass for Dask之类的东西,但是我一直找不到类似的解决方案。
任何帮助或暗示,不胜感激!
解决方法
所以我找到了解决方法。
首先,chord()(callback)实际上不会阻止脚本,只有.get()部分会阻止。将所有任务发布到代理可能会花费很长时间。幸运的是,有一种简单的方法可以通过信号跟踪此发布过程。我们可以在发布开始之前创建进度条,并修改example handler from the documentation以对其进行更新:
from tqdm import tqdm
from celery.signals import after_task_publish
publish_pbar = tqdm(total=100,desc="Publishing tasks")
@after_task_publish.connect(sender='tasks.add')
def task_sent_handler(sender=None,headers=None,body=None,**kwargs):
publish_pbar.update(1)
c = chord(add.s(i,i)
for i in range(100))(tsum.s())
# The script will resume once all tasks are published so close the pbar
publish_pbar.close()
但是,这仅适用于发布任务,因为在发送任务的信号中执行了this signal。 task_success信号是在工作进程中执行的,因此,此技巧只能在工作日志中使用(据我所知)。
因此,为了跟踪所有任务已发布并且脚本恢复后的进度,我从app.control.inspect().stats()转到了工作人员统计信息。这将返回具有各种统计信息的字典,其中包括已完成的任务。这是我的实现:
tasks_pbar = tqdm(total=100,desc="Executing tasks")
previous_total = 0
current_total = 0
while current_total<100:
current_total = 0
for key in app.control.inspect().stats():
current_total += app.control.inspect().stats()[key]['total']['tasks.add']
if current_total > previous_total:
tasks_pbar.update(current_total-previous_total)
previous_total = current_total
results = c.get()
tasks_pbar.close()
最后,我认为可能有必要give names进行任务,以进行信号处理程序过滤和stats()dict过滤,因此请不要忘记将其添加到任务中:
#proj/tasks.py
@app.task(name='tasks.add')
def add(x,y):
return x + y
如果有人可以找到更好的解决方案,请分享!