问题描述
|
我正在使用django和celery(django-celery)进行项目。我们的团队决定将所有数据访问代码包装在ѭ0内(不像ѭ1那样包装到Manager中),而让代码在(app-name)/task.py中只处理用celery组装和执行任务(因此我们不会\'在此层中具有django ORM依赖性)。
在我的
manager.py
中,我有以下内容:
def get_tag(tag_name):
ctype = ContentType.objects.get_for_model(Photo)
try:
tag = Tag.objects.get(name=tag_name)
except ObjectDoesNotExist:
return Tag.objects.none()
return tag
def get_tagged_photos(tag):
ctype = ContentType.objects.get_for_model(Photo)
return TaggedItem.objects.filter(content_type__pk=ctype.pk,tag__pk=tag.pk)
def get_tagged_photos_count(tag):
return get_tagged_photos(tag).count()
在我的task.py中,我喜欢将它们包装成任务(然后可以使用这些任务来完成更复杂的任务),因此我编写了这个装饰器:
import manager #the module within same app containing data access functions
class mfunc_to_task(object):
def __init__(mfunc_type=\'get\'):
self.mfunc_type = mfunc_type
def __call__(self,f):
def wrapper_f(*args,**kwargs):
callback = kwargs.pop(\'callback\',None)
mfunc = getattr(manager,f.__name__)
result = mfunc(*args,**kwargs)
if callback:
if self.mfunc_type == \'get\':
subtask(callback).delay(result)
elif self.mfunc_type == \'get_or_create\':
subtask(callback).delay(result[0])
else:
subtask(callback).delay()
return result
return wrapper_f
然后(仍在task.py
中):
#@task
@mfunc_to_task()
def get_tag():
pass
#@task
@mfunc_to_task()
def get_tagged_photos():
pass
#@task
@mfunc_to_task()
def get_tagged_photos_count():
pass
没有@task
,一切正常。
但是,在使用那个“ 7”型装饰器(按照芹菜文档的指示放到顶部)之后,事情就开始崩溃了。显然,每次调用ѭ9时,都会传递与ѭ11相同的task.get_tag
函数。所以我每次都得到相同的wrapper_f
,现在我唯一要做的就是获得一个标签。
我是装饰工的新手。任何人都可以帮助我了解这里出了什么问题,或者指出完成任务的其他方法?我真的很讨厌为我的每个数据访问功能编写相同的任务包装代码。
解决方法
不太确定为什么传递参数无效吗?
如果使用此示例:
@task()
def add(x,y):
return x + y
让我们向MyCoolTask添加一些日志记录:
from celery import task
from celery.registry import tasks
import logging
import celery
logger = logging.getLogger(__name__)
class MyCoolTask(celery.Task):
def __call__(self,*args,**kwargs):
\"\"\"In celery task this function call the run method,here you can
set some environment variable before the run of the task\"\"\"
logger.info(\"Starting to run\")
return self.run(*args,**kwargs)
def after_return(self,status,retval,task_id,args,kwargs,einfo):
#exit point of the task whatever is the state
logger.info(\"Ending run\")
pass
并创建一个扩展类(扩展MyCoolTask,但现在带有参数):
class AddTask(MyCoolTask):
def run(self,x,y):
if x and y:
result=add(x,y)
logger.info(\'result = %d\' % result)
return result
else:
logger.error(\'No x or y in arguments\')
tasks.register(AddTask)
并确保将kwargs作为json数据传递:
{\"x\":8,\"y\":9}
我得到结果:
[2013-03-05 17:30:25,853: INFO/MainProcess] Starting to run
[2013-03-05 17:30:25,855: INFO/MainProcess] result = 17
[2013-03-05 17:30:26,739: INFO/MainProcess] Ending run
[2013-03-05 17:30:26,741: INFO/MainProcess] Task iamscheduler.tasks.AddTask[6a62641d-16a6-44b6-a1cf-7d4bdc8ea9e0] succeeded in 0.888684988022s: 17
, 为什么不创建扩展celery.Task
的基类,而不是使用decorator?
这样,您的所有任务都可以扩展自定义的任务类,您可以在其中使用方法__call__
和after_return
实现个人行为
。
您还可以为所有任务定义通用方法和对象。
class MyCoolTask(celery.Task):
def __call__(self,here you can
set some environment variable before the run of the task\"\"\"
return self.run(*args,einfo):
#exit point of the task whatever is the state
pass