使用链和组在芹菜中设计工作流程

问题描述

我是芹菜的新手,我正在尝试使用Chain,Groups和Chord设计芹菜的工作流程。到目前为止,这是我所做的:

def __chainfileprocessing(config):
    filelist,src = get_files_for_processing(config)
    for fileattr in filelist:
        chain(  download_file.s(fileattr,src),importdata.s(fileattr,post_processing.s(fileattr,src)
             ).apply_async()

当前执行顺序:

  • 所有download_file()任务都已执行
  • 然后所有import_data()任务都会被执行
  • 然后所有post_processing()任务都将被执行

我需要什么:

对于文件列表中的每个项目,任务应以download_file() => import_data() => post_processing()的顺序执行。

解决方法

您的代码正在执行您想要的内容。对于文件列表中的每个项目,chain将按照以下顺序启动任务:download_file,然后依次为import_datepost_processing

您的代码的作用是:

  • 异步启动文件A的一系列任务(依次为download_file,然后依次为importdatapost_processing)。这将启动文件A的download_file。完成后,它将启动文件A的importdata任务。apply_async立即返回;它不会等待任何任务完成。
  • 异步启动文件B的任务链(download_file,然后是importdata,然后是post_processing);这将启动文件B的download_file。完成后,它将启动文件B的importdata任务。文件A的download_file任务可能正在运行,但是此调用不知道那;只是将一个任务添加到队列中。

在循环结束时,发送给celery的是download_file文件A ... n。每个download_file任务完成时,它将提交链中的下一个任务。

不同文件的任务之间没有依赖关系;从您发布的代码来看,似乎没有必要(为什么文件2必须等待文件1完成?)