问题描述
使用 bind=True
的任务
from celery import states
@celery.task(name="crunch.task",bind=True)
def crunch(self,data):
try:
pass
# ... run computation with data here
except Exception as exc:
self.update_state(
state=states.FAILURE,Meta={"details": "error details here"}
)
raise exc
这里的重要功能是我使用 bind=True
将任务作为 self
参数传递给函数。这允许访问任务的 update_state
方法,这对于错误处理非常有用。
小组作业
我现在想使用 celery.group
在批处理作业中运行此任务。
@celery.task(name="batch.task",bind=True)
def batch(self,data_list):
try:
# HERE! IT SEEM WRONG TO PASS `SELF` INTO THE CHILD TASKS
job = group([crunch(self,data) for data in data_list]) # <-- here `self` should be created by celery!
job.async_apply()
except Exception as exc:
self.update_state(
state=states.FAILURE,Meta={"details": "error details here"}
)
raise exc
如何使用 celery.group
制作由任务组成的 bind=True
?
解决方法
您需要使用签名,
crunch.si(data)