如何在 celery 任务上动态设置任务路由?

问题描述

我有一个 API 端点,它从用户那里获取一些数据并使用 1 个或多个 celery 工作器处理它。

例如,有 3 个不同的任务:

@celery.task
def a(data):
    res = do_a_work(data)
    if res.status:
         b.apply_async(res)
    else:
         return {'message': 'a task Failed'}


@celery.task
def b(data):
    res = do_b_work(data)
    if res.status:
        c.apply_async(res)
    else:
         return {'message': 'b task Failed'}


@celery.task
def c(data):
    res = do_c_work(data)
    return {'result': res}

API 端点根据数据向 ab 提交任务。

可能的流程:

  • a -> b -> c -> 结果
  • b -> c -> 结果
  • a -> 结果(如果 a 状态为 False)
  • a -> b -> 结果(如果 b 状态为 False)
  • b -> 结果(如果 b 状态为 False)

我计划使用 celery 链,在那里我可以定义应在该数据上执行哪些任务。但问题是,如果任务 a 或任务 b 状态为 False,我没有找到停止处理的方法

另一种方法是在任务示例中从任务内部提交额外的任务,但问题是用户需要获得一个 task_id,稍后他才能获得结果。

我可以使用第三种方法解决这个问题吗?

解决方法

chord 是正确的方法,您只需重写 do_a_workdo_b_work 以引发异常而不返回 True/{{1 }} 价值。一旦您重写这些函数以引发异常,您就可以使用和弦和设置错误来编写流程。

打电话给False,你可能会做一些类似 beloe 的事情

a