问题描述
我是芹菜的新手,我正在尝试使用Chain,Groups和Chord设计芹菜的工作流程。到目前为止,这是我所做的:
def __chainfileprocessing(config):
filelist,src = get_files_for_processing(config)
for fileattr in filelist:
chain( download_file.s(fileattr,src),importdata.s(fileattr,post_processing.s(fileattr,src)
).apply_async()
当前执行顺序:
- 所有download_file()任务都已执行
- 然后所有import_data()任务都会被执行
- 然后所有post_processing()任务都将被执行
我需要什么:
对于文件列表中的每个项目,任务应以download_file() => import_data() => post_processing()
的顺序执行。
解决方法
您的代码正在执行您想要的内容。对于文件列表中的每个项目,chain
将按照以下顺序启动任务:download_file
,然后依次为import_date
和post_processing
。
您的代码的作用是:
- 异步启动文件A的一系列任务(依次为
download_file
,然后依次为importdata
和post_processing
)。这将启动文件A的download_file
。完成后,它将启动文件A的importdata
任务。apply_async
立即返回;它不会等待任何任务完成。 - 异步启动文件B的任务链(
download_file
,然后是importdata
,然后是post_processing
);这将启动文件B的download_file
。完成后,它将启动文件B的importdata
任务。文件A的download_file
任务可能正在运行,但是此调用不知道那;只是将一个任务添加到队列中。 - 等
在循环结束时,发送给celery的是download_file
文件A ... n。每个download_file
任务完成时,它将提交链中的下一个任务。
不同文件的任务之间没有依赖关系;从您发布的代码来看,似乎没有必要(为什么文件2必须等待文件1完成?)