问题描述
我需要创建一个 celery 组任务,我想在其中等待直到它完成,但文档对我来说不清楚如何实现:
这是我目前的状态:
def import_media(request):
keys = []
for obj in s3_resource.Bucket(env.str('S3_BUCKET')).objects.all():
if obj.key.endswith(('.m4v','.mp4','.m4a','.mp3')):
keys.append(obj.key)
for key in keys:
url = s3_client.generate_presigned_url(
ClientMethod='get_object',Params={'Bucket': env.str('S3_BUCKET'),'Key': key},ExpiresIn=86400,)
if not Files.objects.filter(descriptor=strip_descriptor_url_scheme(url)).exists():
extract_descriptor.apply_async(kwargs={"descriptor": str(url)})
return None
现在我需要在组内为我拥有的每个 URL 创建一个新任务,我该怎么做?
我现在设法让我的流程像这样工作:
@require_http_methods(("GET"))
def import_media(request):
keys = []
urls = []
for obj in s3_resource.Bucket(env.str('S3_BUCKET')).objects.all():
if obj.key.endswith(('.m4v',)
if not Files.objects.filter(descriptor=strip_descriptor_url_scheme(url)).exists():
new_file = Files.objects.create(descriptor=strip_descriptor_url_scheme(url))
new_file.save()
urls.append(url)
workflow = (
group([extract_descriptor.s(url) for url in urls]).delay()
)
workflow.get(timeout=None,interval=0.5)
print("hello - Further processing here")
return None
有什么优化建议吗?至少现在它工作得很好!
提前致谢
解决方法
https://docs.celeryproject.org/en/latest/userguide/canvas.html#groups
不管有没有失败,组都会运行所有任务,如果前一个任务成功,则链式运行下一个任务。您可以做的是,而不是每次在 for 循环中调用 apply_async。您可以使用 signature 方法,该方法应用参数但不执行任务,直到您准备就绪。
from celery import group
...
all_urls= []
for key in keys:
url = s3_client.generate_presigned_url(
ClientMethod='get_object',Params={'Bucket': env.str('S3_BUCKET'),'Key': key},ExpiresIn=86400,)
if not Files.objects.filter(descriptor=strip_descriptor_url_scheme(url)).exists():
all_urls.append(url)
g = group(extract_descriptor.s(kwargs={"descriptor": str(url)}) for url
in all_urls) # create group
result = g() # you may need to call g.apply_sync(),but this executes all tasks in group
result.ready() # have all subtasks completed?
result.successful() # were all subtasks successful?