问题描述
我有一个 API 端点,它从用户那里获取一些数据并使用 1 个或多个 celery 工作器处理它。
例如,有 3 个不同的任务:
@celery.task
def a(data):
res = do_a_work(data)
if res.status:
b.apply_async(res)
else:
return {'message': 'a task Failed'}
@celery.task
def b(data):
res = do_b_work(data)
if res.status:
c.apply_async(res)
else:
return {'message': 'b task Failed'}
@celery.task
def c(data):
res = do_c_work(data)
return {'result': res}
API 端点根据数据向 a
或 b
提交任务。
可能的流程:
-
a
->b
->c
-> 结果 -
b
->c
-> 结果 -
a
-> 结果(如果a
状态为 False) -
a
->b
-> 结果(如果b
状态为 False) -
b
-> 结果(如果b
状态为 False)
我计划使用 celery 链,在那里我可以定义应在该数据上执行哪些任务。但问题是,如果任务 a
或任务 b
状态为 False,我没有找到停止处理的方法。
另一种方法是在任务示例中从任务内部提交额外的任务,但问题是用户需要获得一个 task_id,稍后他才能获得结果。
解决方法
chord
是正确的方法,您只需重写 do_a_work
和 do_b_work
以引发异常而不返回 True
/{{1 }} 价值。一旦您重写这些函数以引发异常,您就可以使用和弦和设置错误来编写流程。
打电话给False
,你可能会做一些类似 beloe 的事情
a