将现有的 celery worker 用于 Airflow 的 Celeryexecutor worker

问题描述

我正在尝试将动态工作流引入我的环境,其中涉及不同模型推理的多个步骤,其中一个模型的输出被馈送到另一个模型中。目前,我们很少有 Celery 工作人员分布在主机上来管理推理链。随着复杂性的增加,我们正在尝试动态构建工作流程。为此,我使用 Celeryexecutor 进行了动态 DAG 设置。现在,有没有办法可以保留当前的 ​​Celery 设置并将气流驱动的任务路由到相同的工作人员?我知道这些工作人员的设置应该可以访问 DAG 文件夹和与气流服务器相同的环境。我想知道 celery worker 需要如何在这些服务器中启动,以便气流可以路由以前由 python 应用程序手动工作流完成的相同任务。如果我使用命令“airflow celery worker”启动工作人员,我将无法访问我的应用程序任务。如果我以目前的方式启动 celery,即“celery -A proj”,则气流与它无关。寻找使之奏效的想法。

解决方法

谢谢@DejanLekic。我让它工作了(尽管 DAG 任务调度延迟太多,我放弃了这种方法)。如果有人想看看这是如何完成的,我做了以下几件事来让它发挥作用。

  1. 更改airflow.cfg 以更改执行器、队列和结果后端设置(显而易见)
  2. 如果我们必须使用在气流保护伞外生成的 Celery worker,请将 celery_app_name 设置更改为 celery.execute 而不是 airflow.executors.celery_execute,并将 Executor 更改为“LocalExecutor”。我还没有测试过这个,但通过在项目的 celery 应用程序中注册气流的任务,甚至可以避免切换到 celery 执行器。
  3. 每个任务现在将调用 send_task(),然后将返回的 AsynResult 对象存储在 Xcom(隐式或显式)或 Redis(隐式推送到队列)中,然后子任务将收集 Asyncresult(它将是从 Xcom 或 Redis 获取值的隐式调用),然后调用 .get() 以获取上一步的结果。

注意:没有必要在 DAG 的两个任务之间拆分 send_task() 和 .get()。通过在父母和孩子之间拆分它们,我试图利用任务之间的滞后。但就我而言,任务的 celery 执行完成得比气流在调度相关任务时的固有延迟要快。