在芹菜中有没有办法,如果任务执行失败,我可以自动将它放入另一个队列.
例如,任务在队列x中运行,异常将其排入另一个名为error_x的队列
编辑:
目前我使用celery == 3.0.13以及django 1.4,Rabbitmq作为经纪人.
解决方法
好吧,如果要将任务路由到另一个队列,则无法使用重试机制.来自文档:
retry() can be used to re-execute the task,for example in the event
of recoverable errors.When you call retry it will send a new message,using the same
task-id,and it will take care to make sure the message is delivered
to the same queue as the originating task.
如果出现任何异常,您必须重新启动并手动将其路由到您想要的队列.这似乎是一个很好的工作error callbacks.
主要问题是我们需要在错误回调中获取任务名称才能启动它.此外,我们可能不希望每次启动任务时都添加回调.因此,装饰器将是自动添加正确回调的好方法.
from functools import partial,wraps import celery @celery.shared_task def error_callback(task_id,task_name,retry_queue,retry_routing_key): # We must retrieve the task object itself. # `tasks` is a dict of 'task_name': celery_task_object task = celery.current_app.tasks[task_name] # Re launch the task in specified queue. task.apply_async(queue=retry_queue,routing_key=retry_routing_key) def retrying_task(retry_queue,retry_routing_key): """Decorates function to automatically add error callbacks.""" def retrying_decorator(func): @celery.shared_task @wraps(func) # just to keep the original task name def wrapper(*args,**kwargs): return func(*args,**kwargs) # Monkey patch the apply_async method to add the callback. wrapper.apply_async = partial( wrapper.apply_async,link_error=error_callback.s(wrapper.name,retry_routing_key) ) return wrapper return retrying_decorator # Usage: @retrying_task(retry_queue='another_queue',retry_routing_key='another_routing_key') def failing_task(): print 'Hi,I will fail!' raise Exception("I'm failing!") failing_task.apply_async()
您可以调整装饰器以传递您需要的任何参数.