Django中使用Celery

一、前言

  Celery是一个基于python开发的分布式任务队列,如果不了解请阅读笔者上一篇博文,而做python WEB开发最为流行的框架莫属Django,但是Django的请求处理过程都是同步的无法实现异步任务,若要实现异步任务处理需要通过其他方式(前端的一般解决方案是ajax操作),而后台Celery就是不错的选择。倘若一个用户在执行某些操作需要等待很久才返回,这大大降低了网站的吞吐量。下面将描述Django的请求处理大致流程(图片来源于网络):

请求过程简单说明:浏览器发起请求-->请求处理-->请求经过中间件-->路由映射-->视图处理业务逻辑-->响应请求(template或response)

二、配置使用

  celery很容易集成到Django框架中,当然如果想要实现定时任务的话还需要安装django-celery-beta插件,后面会说明。需要注意的是Celery4.0只支持Django版本>=1.8的,如果是小于1.8版本需要使用Celery3.1。

配置

  新建立项目taskproj,目录结构(每个app下多了个tasks文件,用于定义任务):

在项目目录taskproj/taskproj/目录下新建celery.py:

celery os.environ.setdefault(<span style="color: #800000;">'<span style="color: #800000;">DJANGO_SETTINGS_MODULE<span style="color: #800000;">',<span style="color: #800000;">'<span style="color: #800000;">taskproj.settings<span style="color: #800000;">') <span style="color: #008000;">#<span style="color: #008000;"> 设置django环境
<span style="color: #000000;">
app
= Celery(<span style="color: #800000;">'
<span style="color: #800000;">taskproj
<span style="color: #800000;">'
<span style="color: #000000;">)

app.config_fromobject(<span style="color: #800000;">'<span style="color: #800000;">django.conf:settings<span style="color: #800000;">',namespace=<span style="color: #800000;">'<span style="color: #800000;">CELERY<span style="color: #800000;">') <span style="color: #008000;">#<span style="color: #008000;"> 使用CELERY 作为前缀,在settings中写配置
<span style="color: #000000;">
app.autodiscover_tasks() <span style="color: #008000;">#<span style="color: #008000;"> 发现任务文件每个app下的task.py

taskproj/taskproj/__init__.py:

from .celery import app as celery_app
 = []

taskproj/taskproj/settings.py

CELERY_BROKER_URL = = =

进入项目的taskproj目录启动worker:

celery worker -A taskproj -l debug

定义与触发任务

  任务定义在每个tasks文件中,app01/tasks.py:

celery @shared_task
<span style="color: #0000ff;">def
<span style="color: #000000;"> add(x,y):
<span style="color: #0000ff;">return
x +<span style="color: #000000;"> y

@shared_task
<span style="color: #0000ff;">def<span style="color: #000000;"> mul(x,y):
<span style="color: #0000ff;">return x * y

视图中触发任务

django.http app01 <span style="color: #008000;">#<span style="color: #008000;"> Create your views here.

<span style="color: #0000ff;">def index(request,*args,**<span style="color: #000000;">kwargs):
res=tasks.add.delay(1,3<span style="color: #000000;">)
<span style="color: #008000;">#<span style="color: #008000;">任务逻辑
<span style="color: #0000ff;">return JsonResponse({<span style="color: #800000;">'<span style="color: #800000;">status<span style="color: #800000;">':<span style="color: #800000;">'<span style="color: #800000;">successful<span style="color: #800000;">',<span style="color: #800000;">'<span style="color: #800000;">task_id<span style="color: #800000;">':res.task_id})

访问http://127.0.0.1:8000/index

 若想获取任务结果,可以通过task_id使用AsyncResult获取结果,还可以直接通过backend获取:

扩展

  除了redis、rabbitmq能做结果存储外,还可以使用Django的orm作为结果存储,当然需要安装依赖插件,这样的好处在于我们可以直接通过django的数据查看到任务状态,同时为可以制定更多的操作,下面介绍如何使用orm作为结果存储。

1.安装

pip install django-celery-results

2.配置settings.py,注册app

INSTALLED_APPS =

4.修改backend配置,将redis改为django-db

=

5.修改数据库

python3 manage.py migrate django_celery_results

此时会看到数据库会多创建:

task_id </span>= models.CharField(_(<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;task id</span><span style="color: #800000;"&gt;'</span>),max_length=255,unique=<span style="color: #000000;"&gt;True) task_name </span>= models.CharField(_(<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;task name</span><span style="color: #800000;"&gt;'</span>),null=True,max_length=255<span style="color: #000000;"&gt;) task_args </span>= models.TextField(_(<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;task arguments</span><span style="color: #800000;"&gt;'</span>),null=<span style="color: #000000;"&gt;True) task_kwargs </span>= models.TextField(_(<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;task kwargs</span><span style="color: #800000;"&gt;'</span>),null=<span style="color: #000000;"&gt;True) status </span>= models.CharField(_(<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;state</span><span style="color: #800000;"&gt;'</span>),max_length=50<span style="color: #000000;"&gt;,default</span>=<span style="color: #000000;"&gt;states.PENDING,choices</span>=<span style="color: #000000;"&gt;TASK_STATE_CHOICES ) content_type </span>= models.CharField(_(<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;content type</span><span style="color: #800000;"&gt;'</span>),max_length=128<span style="color: #000000;"&gt;) content_encoding </span>= models.CharField(_(<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;content encoding</span><span style="color: #800000;"&gt;'</span>),max_length=64<span style="color: #000000;"&gt;) result </span>= models.TextField(null=True,default=None,editable=<span style="color: #000000;"&gt;False) date_done </span>= models.DateTimeField(_(<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;done at</span><span style="color: #800000;"&gt;'</span>),auto_now=<span style="color: #000000;"&gt;True) traceback </span>= models.TextField(_(<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;traceback</span><span style="color: #800000;"&gt;'</span>),blank=True,null=<span style="color: #000000;"&gt;True) hidden </span>= models.BooleanField(editable=False,default=False,db_index=<span style="color: #000000;"&gt;True) meta </span>= models.TextField(null=True,editable=<span style="color: #000000;"&gt;False) objects </span>=<span style="color: #000000;"&gt; managers.TaskResultManager() </span><span style="color: #0000ff;"&gt;class</span><span style="color: #000000;"&gt; Meta: </span><span style="color: #800000;"&gt;"""</span><span style="color: #800000;"&gt;Table information.</span><span style="color: #800000;"&gt;"""</span><span style="color: #000000;"&gt; ordering </span>= [<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;-date_done</span><span style="color: #800000;"&gt;'</span><span style="color: #000000;"&gt;] verbose_name </span>= _(<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;task result</span><span style="color: #800000;"&gt;'</span><span style="color: #000000;"&gt;) verbose_name_plural </span>= _(<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;task results</span><span style="color: #800000;"&gt;'</span><span style="color: #000000;"&gt;) </span><span style="color: #0000ff;"&gt;def</span><span style="color: #000000;"&gt; as_dict(self): </span><span style="color: #0000ff;"&gt;return</span><span style="color: #000000;"&gt; { </span><span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;task_id</span><span style="color: #800000;"&gt;'</span><span style="color: #000000;"&gt;: self.task_id,</span><span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;task_name</span><span style="color: #800000;"&gt;'</span><span style="color: #000000;"&gt;: self.task_name,</span><span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;task_args</span><span style="color: #800000;"&gt;'</span><span style="color: #000000;"&gt;: self.task_args,</span><span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;task_kwargs</span><span style="color: #800000;"&gt;'</span><span style="color: #000000;"&gt;: self.task_kwargs,</span><span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;status</span><span style="color: #800000;"&gt;'</span><span style="color: #000000;"&gt;: self.status,</span><span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;result</span><span style="color: #800000;"&gt;'</span><span style="color: #000000;"&gt;: self.result,</span><span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;date_done</span><span style="color: #800000;"&gt;'</span><span style="color: #000000;"&gt;: self.date_done,</span><span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;traceback</span><span style="color: #800000;"&gt;'</span><span style="color: #000000;"&gt;: self.traceback,</span><span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;meta</span><span style="color: #800000;"&gt;'</span><span style="color: #000000;"&gt;: self.meta,} </span><span style="color: #0000ff;"&gt;def</span> <span style="color: #800080;"&gt;__str__</span><span style="color: #000000;"&gt;(self): </span><span style="color: #0000ff;"&gt;return</span> <span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;<Task: {0.task_id} ({0.status})></span><span style="color: #800000;"&gt;'</span>.format(self)</pre>

三、Django中使用定时任务

  如果想要在django中使用定时任务功能同样是靠beat完成任务发送功能,当在Django中使用定时任务时,需要安装django-celery-beat插件。以下将介绍使用过程。

安装配置

1.beat插件安装

pip3 install django-celery-beat

2.注册APP

INSTALLED_APPS =

3.数据库变更

python3 manage.py migrate django_celery_beat

4.分别启动woker和beta

celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler -A taskproj -l info

5.配置admin

urls.py

django.conf.urls django.contrib urlpatterns =<span style="color: #000000;"> [
url(r
<span style="color: #800000;">'
<span style="color: #800000;">^admin/
<span style="color: #800000;">'
<span style="color: #000000;">,admin.site.urls),]

6.创建用户

python3 manage.py createsuperuser

7.登录admin进行管理(地址http://127.0.0.1:8000/admin)并且还可以看到我们上次使用orm作为结果存储的表。

http://127.0.0.1:8000/admin/login/?next=/admin/

 使用示例:

 查看结果:

二次开发

  django-celery-beat插件本质上是对数据库表变化检查,一旦有数据库表改变,调度器重新读取任务进行调度,所以如果想自己定制的任务页面,只需要操作beat插件的四张表就可以了。当然你还可以自己定义调度器,django-celery-beat插件已经内置了model,只需要进行导入便可进行orm操作,以下我用django reset api进行示例:

settings.py

INSTALLED_APPS =

urls.py

urlpatterns =,views.TaskView.as_view({:

views.py

django_celery_beat.models PeriodicTask rest_framework rest_framework rest_framework.viewsets ==

<span style="color: #0000ff;">class<span style="color: #000000;"> Mypagination(pagination.PageNumberPagination):
<span style="color: #800000;">"""<span style="color: #800000;">自定义分页<span style="color: #800000;">"""<span style="color: #000000;">
page_size=2<span style="color: #000000;">
page_query_param = <span style="color: #800000;">'<span style="color: #800000;">p<span style="color: #800000;">'<span style="color: #000000;">
page_size_query_param=<span style="color: #800000;">'<span style="color: #800000;">size<span style="color: #800000;">'<span style="color: #000000;">
max_page_size=4

<span style="color: #0000ff;">class<span style="color: #000000;"> TaskView(ModelViewSet):
queryset =<span style="color: #000000;"> PeriodicTask.objects.all()
serializer_class =<span style="color: #000000;"> Userserializer
permission_classes =<span style="color: #000000;"> []
pagination_class = Mypagination

访问http://127.0.0.1:8000/tasks如下:

相关文章

注:所有源代码均实测运行过。所有源代码均已上传CSDN,请有...
继承APIView和ViewSetMixin;作用也与APIView基本类似,提供...
一、Django介绍Python下有许多款不同的 Web 框架。Django是重...
本文从nginx快速掌握到使用,gunicorn快速掌握到使用,实现小...
uniapp微信小程序订阅消息发送服务通知
Django终端打印SQL语句 1 Setting配置: 2 默认python 使用的...