问题描述
全部,
@celery_app.task
def add(x,y):
return x + y
@celery_app.task
def task_no(n):
return f'Finished task {n}.'
@celery_app.task
def add_bunch():
return chord([add.si(1,1),add.si(2,2)])(task_no.si('1'))
@celery_app.task
def do_it_all():
chain(
add_bunch.si(),task_no.si('2')
).apply_async()
如果我运行 do_it_all() ,我会得到以下输出:
[INFO/MainProcess] Received task: lumi_translation.celery_tasks.add_bunch[d40dc179-602d-4414-9fbd-ee8d62fe7604]
[INFO/ForkPoolWorker-1] Task lumi_translation.celery_tasks.add_bunch[d40dc179-602d-4414-9fbd-ee8d62fe7604] succeeded in 0.01651039347052574s: <AsyncResult: d5564664-1e6f-445f-a172-442fef547422>
[INFO/MainProcess] Received task: lumi_translation.celery_tasks.add[fbc7288a-1f76-447a-ac2b-906ddaa6c00c]
[INFO/ForkPoolWorker-1] Task lumi_translation.celery_tasks.add[fbc7288a-1f76-447a-ac2b-906ddaa6c00c] succeeded in 0.0005592871457338333s: 2
[INFO/MainProcess] Received task: lumi_translation.celery_tasks.add[472d6142-355d-466b-8ee4-0d8cc7e1d96e]
[INFO/ForkPoolWorker-1] Task lumi_translation.celery_tasks.add[472d6142-355d-466b-8ee4-0d8cc7e1d96e] succeeded in 0.0012424923479557037s: 4
[INFO/MainProcess] Received task: lumi_translation.celery_tasks.task_no[faa013e7-42c5-4321-b132-e749169810ee]
[INFO/ForkPoolWorker-1] Task lumi_translation.celery_tasks.task_no[faa013e7-42c5-4321-b132-e749169810ee] succeeded in 0.0003700973466038704s: 'Finished task 2.'
[INFO/MainProcess] Received task: lumi_translation.celery_tasks.task_no[d5564664-1e6f-445f-a172-442fef547422]
[INFO/ForkPoolWorker-1] Task lumi_translation.celery_tasks.task_no[d5564664-1e6f-445f-a172-442fef547422] succeeded in 0.0003337441012263298s: 'Finished task 1.'
add_bunch
任务即使在子任务尚未完成时也会成功;因此,任务 2 在任务 1 之前完成。只有当所有子任务都成功完成时,有没有办法使 add_bunch
任务问题成功?在上面的例子中,有没有办法确保任务 1 在任务 2 之前完成?
解决方法
add_bunch() “什么都不做”,它只是创建一个 Chord 对象并返回它。这总是会成功,即。返回一个有效的 Chord 对象,当然,除非它不能分配更多内存......
,我的一位同事向我展示了一种解决方法来执行此操作。他说他花了非常不合理的时间来解决这个问题。我把它放在这里是为了省去其他人的麻烦。
对 add_bunch()
任务的可行重写是:
@celery_app.task(bind=True)
def add_bunch(self):
self.replace(
chord(header=[add.si(1,1),add.si(2,2)],body=task_no.si('1'))
)