一、前言
Celery是一个基于python开发的分布式任务队列,如果不了解请阅读笔者上一篇博文,而做python WEB开发最为流行的框架莫属Django,但是Django的请求处理过程都是同步的无法实现异步任务,若要实现异步任务处理需要通过其他方式(前端的一般解决方案是ajax操作),而后台Celery就是不错的选择。倘若一个用户在执行某些操作需要等待很久才返回,这大大降低了网站的吞吐量。下面将描述Django的请求处理大致流程(图片来源于网络):
请求过程简单说明:浏览器发起请求-->请求处理-->请求经过中间件-->路由映射-->视图处理业务逻辑-->响应请求(template或response)
二、配置使用
celery很容易集成到Django框架中,当然如果想要实现定时任务的话还需要安装django-celery-beta插件,后面会说明。需要注意的是Celery4.0只支持Django版本>=1.8的,如果是小于1.8版本需要使用Celery3.1。
配置
新建立项目taskproj,目录结构(每个app下多了个tasks文件,用于定义任务):
在项目目录taskproj/taskproj/目录下新建celery.py:
<span style="color: #000000;">
app = Celery(<span style="color: #800000;">'<span style="color: #800000;">taskproj<span style="color: #800000;">'<span style="color: #000000;">)
app.config_fromobject(
<span style="color: #800000;">'<span style="color: #800000;">django.conf:settings<span style="color: #800000;">',namespace=<span style="color: #800000;">'<span style="color: #800000;">CELERY<span style="color: #800000;">') <span style="color: #008000;">#<span style="color: #008000;"> 使用CELERY 作为前缀,在settings中写配置<span style="color: #000000;">
app.autodiscover_tasks() <span style="color: #008000;">#<span style="color: #008000;"> 发现任务文件每个app下的task.py
taskproj/taskproj/__init__.py:
from .celery import app as celery_app
= []
taskproj/taskproj/settings.py
进入项目的taskproj目录启动worker:
定义与触发任务
任务定义在每个tasks文件中,app01/tasks.py:
<span style="color: #0000ff;">def<span style="color: #000000;"> add(x,y):
<span style="color: #0000ff;">return x +<span style="color: #000000;"> y
@shared_task
<span style="color: #0000ff;">return x * y
视图中触发任务
<span style="color: #0000ff;">def
index(request,*args,**<span style="color: #000000;">kwargs):res=tasks.add.delay(1,3<span style="color: #000000;">)
<span style="color: #008000;">#<span style="color: #008000;">任务逻辑
<span style="color: #0000ff;">return JsonResponse({<span style="color: #800000;">'<span style="color: #800000;">status<span style="color: #800000;">':<span style="color: #800000;">'<span style="color: #800000;">successful<span style="color: #800000;">',<span style="color: #800000;">'<span style="color: #800000;">task_id<span style="color: #800000;">':res.task_id})
访问http://127.0.0.1:8000/index
若想获取任务结果,可以通过task_id使用AsyncResult获取结果,还可以直接通过backend获取:
扩展
除了redis、rabbitmq能做结果存储外,还可以使用Django的orm作为结果存储,当然需要安装依赖插件,这样的好处在于我们可以直接通过django的数据查看到任务状态,同时为可以制定更多的操作,下面介绍如何使用orm作为结果存储。
1.安装
2.配置settings.py,注册app
4.修改backend配置,将redis改为django-db
5.修改数据库
此时会看到数据库会多创建:
task_id </span>= models.CharField(_(<span style="color: #800000;">'</span><span style="color: #800000;">task id</span><span style="color: #800000;">'</span>),max_length=255,unique=<span style="color: #000000;">True)
task_name </span>= models.CharField(_(<span style="color: #800000;">'</span><span style="color: #800000;">task name</span><span style="color: #800000;">'</span>),null=True,max_length=255<span style="color: #000000;">)
task_args </span>= models.TextField(_(<span style="color: #800000;">'</span><span style="color: #800000;">task arguments</span><span style="color: #800000;">'</span>),null=<span style="color: #000000;">True)
task_kwargs </span>= models.TextField(_(<span style="color: #800000;">'</span><span style="color: #800000;">task kwargs</span><span style="color: #800000;">'</span>),null=<span style="color: #000000;">True)
status </span>= models.CharField(_(<span style="color: #800000;">'</span><span style="color: #800000;">state</span><span style="color: #800000;">'</span>),max_length=50<span style="color: #000000;">,default</span>=<span style="color: #000000;">states.PENDING,choices</span>=<span style="color: #000000;">TASK_STATE_CHOICES
)
content_type </span>= models.CharField(_(<span style="color: #800000;">'</span><span style="color: #800000;">content type</span><span style="color: #800000;">'</span>),max_length=128<span style="color: #000000;">)
content_encoding </span>= models.CharField(_(<span style="color: #800000;">'</span><span style="color: #800000;">content encoding</span><span style="color: #800000;">'</span>),max_length=64<span style="color: #000000;">)
result </span>= models.TextField(null=True,default=None,editable=<span style="color: #000000;">False)
date_done </span>= models.DateTimeField(_(<span style="color: #800000;">'</span><span style="color: #800000;">done at</span><span style="color: #800000;">'</span>),auto_now=<span style="color: #000000;">True)
traceback </span>= models.TextField(_(<span style="color: #800000;">'</span><span style="color: #800000;">traceback</span><span style="color: #800000;">'</span>),blank=True,null=<span style="color: #000000;">True)
hidden </span>= models.BooleanField(editable=False,default=False,db_index=<span style="color: #000000;">True)
meta </span>= models.TextField(null=True,editable=<span style="color: #000000;">False)
objects </span>=<span style="color: #000000;"> managers.TaskResultManager()
</span><span style="color: #0000ff;">class</span><span style="color: #000000;"> Meta:
</span><span style="color: #800000;">"""</span><span style="color: #800000;">Table information.</span><span style="color: #800000;">"""</span><span style="color: #000000;">
ordering </span>= [<span style="color: #800000;">'</span><span style="color: #800000;">-date_done</span><span style="color: #800000;">'</span><span style="color: #000000;">]
verbose_name </span>= _(<span style="color: #800000;">'</span><span style="color: #800000;">task result</span><span style="color: #800000;">'</span><span style="color: #000000;">)
verbose_name_plural </span>= _(<span style="color: #800000;">'</span><span style="color: #800000;">task results</span><span style="color: #800000;">'</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">def</span><span style="color: #000000;"> as_dict(self):
</span><span style="color: #0000ff;">return</span><span style="color: #000000;"> {
</span><span style="color: #800000;">'</span><span style="color: #800000;">task_id</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.task_id,</span><span style="color: #800000;">'</span><span style="color: #800000;">task_name</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.task_name,</span><span style="color: #800000;">'</span><span style="color: #800000;">task_args</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.task_args,</span><span style="color: #800000;">'</span><span style="color: #800000;">task_kwargs</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.task_kwargs,</span><span style="color: #800000;">'</span><span style="color: #800000;">status</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.status,</span><span style="color: #800000;">'</span><span style="color: #800000;">result</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.result,</span><span style="color: #800000;">'</span><span style="color: #800000;">date_done</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.date_done,</span><span style="color: #800000;">'</span><span style="color: #800000;">traceback</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.traceback,</span><span style="color: #800000;">'</span><span style="color: #800000;">meta</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.meta,}
</span><span style="color: #0000ff;">def</span> <span style="color: #800080;">__str__</span><span style="color: #000000;">(self):
</span><span style="color: #0000ff;">return</span> <span style="color: #800000;">'</span><span style="color: #800000;"><Task: {0.task_id} ({0.status})></span><span style="color: #800000;">'</span>.format(self)</pre>
三、Django中使用定时任务
如果想要在django中使用定时任务功能同样是靠beat完成任务发送功能,当在Django中使用定时任务时,需要安装django-celery-beat插件。以下将介绍使用过程。
安装配置
1.beat插件安装
2.注册APP
3.数据库变更
4.分别启动woker和beta
5.配置admin
urls.py
url(r<span style="color: #800000;">'<span style="color: #800000;">^admin/<span style="color: #800000;">'<span style="color: #000000;">,admin.site.urls),]
6.创建用户
7.登录admin进行管理(地址http://127.0.0.1:8000/admin)并且还可以看到我们上次使用orm作为结果存储的表。
http://127.0.0.1:8000/admin/login/?next=/admin/
使用示例:
查看结果:
二次开发
django-celery-beat插件本质上是对数据库表变化检查,一旦有数据库表改变,调度器重新读取任务进行调度,所以如果想自己定制的任务页面,只需要操作beat插件的四张表就可以了。当然你还可以自己定义调度器,django-celery-beat插件已经内置了model,只需要进行导入便可进行orm操作,以下我用django reset api进行示例:
settings.py
urls.py
views.py
<span style="color: #0000ff;">class
<span style="color: #000000;"> Mypagination(pagination.PageNumberPagination):<span style="color: #800000;">"""<span style="color: #800000;">自定义分页<span style="color: #800000;">"""<span style="color: #000000;">
page_size=2<span style="color: #000000;">
page_query_param = <span style="color: #800000;">'<span style="color: #800000;">p<span style="color: #800000;">'<span style="color: #000000;">
page_size_query_param=<span style="color: #800000;">'<span style="color: #800000;">size<span style="color: #800000;">'<span style="color: #000000;">
max_page_size=4
<span style="color: #0000ff;">class
<span style="color: #000000;"> TaskView(ModelViewSet):queryset =<span style="color: #000000;"> PeriodicTask.objects.all()
serializer_class =<span style="color: #000000;"> Userserializer
permission_classes =<span style="color: #000000;"> []
pagination_class = Mypagination
访问http://127.0.0.1:8000/tasks如下: