Celery 如何在 for 循环中创建任务组

问题描述

我需要创建一个 celery 组任务,我想在其中等待直到它完成,但文档对我来说不清楚如何实现:

这是我目前的状态:

def import_media(request):
    keys = []
    for obj in s3_resource.Bucket(env.str('S3_BUCKET')).objects.all():
        if obj.key.endswith(('.m4v','.mp4','.m4a','.mp3')):
            keys.append(obj.key)
    for key in keys:
        url = s3_client.generate_presigned_url(
            ClientMethod='get_object',Params={'Bucket': env.str('S3_BUCKET'),'Key': key},ExpiresIn=86400,)
        if not Files.objects.filter(descriptor=strip_descriptor_url_scheme(url)).exists():
            extract_descriptor.apply_async(kwargs={"descriptor": str(url)})

    return None

现在我需要在组内为我拥有的每个 URL 创建一个新任务,我该怎么做?

我现在设法让我的流程像这样工作:

@require_http_methods(("GET"))
def import_media(request):
    keys = []
    urls = []
    for obj in s3_resource.Bucket(env.str('S3_BUCKET')).objects.all():
        if obj.key.endswith(('.m4v',)
        if not Files.objects.filter(descriptor=strip_descriptor_url_scheme(url)).exists():
            new_file = Files.objects.create(descriptor=strip_descriptor_url_scheme(url))
            new_file.save()
            urls.append(url)
    workflow = (
        group([extract_descriptor.s(url) for url in urls]).delay()
    )
    workflow.get(timeout=None,interval=0.5)
    print("hello - Further processing here")
    return None

有什么优化建议吗?至少现在它工作得很好!

提前致谢

解决方法

https://docs.celeryproject.org/en/latest/userguide/canvas.html#groups

不管有没有失败,组都会运行所有任务,如果前一个任务成功,则链式运行下一个任务。您可以做的是,而不是每次在 for 循环中调用 apply_async。您可以使用 signature 方法,该方法应用参数但不执行任务,直到您准备就绪。

from celery import group
    ...
    all_urls= []
    for key in keys:
        url = s3_client.generate_presigned_url(
            ClientMethod='get_object',Params={'Bucket': env.str('S3_BUCKET'),'Key': key},ExpiresIn=86400,)
        if not Files.objects.filter(descriptor=strip_descriptor_url_scheme(url)).exists():
            all_urls.append(url)
    g = group(extract_descriptor.s(kwargs={"descriptor": str(url)}) for url 
    in all_urls) # create group
    result = g() # you may need to call g.apply_sync(),but this executes all tasks in group
    result.ready()  # have all subtasks completed?
    result.successful() # were all subtasks successful?

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...