如何使用bind = True从任务中创建一个组?

问题描述

使用 bind=True 的任务

我有一个 celery 任务,它运行持续几秒钟的计算。

from celery import states

@celery.task(name="crunch.task",bind=True)
def crunch(self,data):
    try:
        pass
        # ... run computation with data here
    except Exception as exc:
        self.update_state(
            state=states.FAILURE,Meta={"details": "error details here"}
        )
        raise exc

这里的重要功能是我使用 bind=True 将任务作为 self 参数传递给函数。这允许访问任务的 update_state 方法,这对于错误处理非常有用。

小组作业

我现在想使用 celery.group 在批处理作业中运行此任务。

@celery.task(name="batch.task",bind=True)
def batch(self,data_list):
    try:
        # HERE! IT SEEM WRONG TO PASS `SELF` INTO THE CHILD TASKS
        job = group([crunch(self,data) for data in data_list])  # <-- here `self` should be created by celery!
        job.async_apply()
    except Exception as exc:
        self.update_state(
            state=states.FAILURE,Meta={"details": "error details here"}
        )
        raise exc

如何使用 celery.group 制作由任务组成的 bind=True

解决方法

您需要使用签名,

crunch.si(data)