celery 计划任务使用

流程: 用户提交任务 --- > Celery   --- > Broker 中间商(可以是数据库,redis)  ---> 最后让celery 中的 worker 执行任务

1 单独使用:
目录结构:



celery_worker.py 文件
#-*- coding:utf-8 -*-
from celery import Celery
import time
app = Celery('tasks',                           #tasks是app 名字
             broker='redis://127.0.0.1:6379/0',  #密码方式redis://xxxx@127.0.0.1:6379/0
             backend='redis://127.0.0.1:6379/0' #密码方式redis://xxxx@127.0.0.1:6379/0#密码方式redis://xxxx@127.0.0.1:6379/0
             )


@app.task
def add(x,y): #运行的任务
    print("success.......",x,y)
    time.sleep(30)
    return  x + y



celery_client.py 文件
#-*- coding:utf-8 -*-
from celery_work import add
t1 = add.delay(1,2) #提交任务执行

#print(t1.get()) #获取任务执行的结果
#print(t1.get(timeout=1)) #设置当前获取超时时间
#print(t1.ready()) #查看任务是否执行完成,Ture执行完成
1 进入单独使用目录里面启动worker
celery -A celery_work worker -l info -P eventlet #celery_work 为当前py文件名


2 项目使用

目录结构:


  


新建一个 tasks_worker 目录 里面创建
celery.py这是Broker    myTasks_1.py任务1 myTasks_2.py任务2 三个文件
celery.py (文件名必须这样命令)

celery.py文件
#-*- coding:utf-8 -*-
from __future__ import absolute_import, unicode_literals
from celery import  Celery    #__future__ 表示将相对路径转换为绝对路径  from celery  导入是从python 包里面的绝对路劲导入Celery

app = Celery('pro',
             broker='redis://127.0.0.1:6379/0',
             backend='redis://127.0.0.1:6379/0',
             include=(['tasks_worker.myTasks_1','tasks_worker.myTasks_2'])  #存任务的文件,tasks_worker当前目录名字是myTasks_1,可以存多个,tasks必须要有这个py文件  tasks_2是一个py文件
             )


app.conf.update(
    result_expires = 3600,  #任务结果保存时间 一个小时
)



if __name__ == "__main__":
    app.start()

myTasks_1.py文件
#-*- coding:utf-8 -*-
# 将相对路径转换为绝对路径
from __future__ import absolute_import, unicode_literals
from .celery import app #获取当前目录中的celery的app

@app.task
def add(x,y):
print("success.......",x,y)
return x + y


myTasks_2.py文件
#-*- coding:utf-8 -*-
# 将相对路径转换为绝对路径
from __future__ import absolute_import, unicode_literals
from .celery import app #获取当前目录中的celery的app

@app.task
def add(x,y):
print("success.......",x,y)
return (x * y)


项目使用目录下面新建一个 执行任务.py
#-*- coding:utf-8 -*-

from tasks_worker.myTasks_1 import add as add_1
from tasks_worker.myTasks_2 import add as add_2

t1 = add_1.delay(1,2)
print(t1.get())


t2 = add_2.delay(1,2)
print(t2.get())
进入项目使用目录
1 里面必须命名为celery.py
2 myTasks_1.py 为任务1
3 myTasks_2.py 为任务2
启动 celery -A tasks_worker worker -l debug -P eventlet #tasks_worker是项目使用下面的tasks_worker文件夹

 

 3定时任务(重点)
目录结构:

celery.py文件
#-*- coding:utf-8 -*-
from __future__ import absolute_import, unicode_literals
from celery import Celery    #__future__ 表示将相对路径转换为绝对路径  from celery  导入是从python 包里面的绝对路劲导入Celery


app = Celery('pro',
             broker='redis://127.0.0.1:6379/0',
             backend='redis://127.0.0.1:6379/0',
             include=(['timing.celery_Tasks'])  #存任务的文件,tasks_worker当前目录名字是myTasks_1,可以存多个,tasks必须要有这个py文件  tasks_2是一个py文件
             )


app.conf.update(
    result_expires = 3600,  #任务结果保存时间 一个小时
)


if __name__ == "__main__":
    app.start()
celery_Tasks.py
#-*- coding:utf-8 -*-
from __future__ import absolute_import, unicode_literals   #__future__ 表示将相对路径转换为绝对路径  from celery  导入是从python 包里面的绝对路劲导入Celery
from .celery import app
from celery.schedules import crontab

#第一种写法
# @app.on_after_configure.connect # def tasks_p(sender,**kwargs): # #sender.add_periodic_task(10.0,test.s("hello"),name="test"), #每秒执行下test函数 要带.s 格式就是整要求的 # # #sender.add_periodic_task(20.0, test.s("world"), name="test" , expires=10), # # sender.add_periodic_task( # crontab(hour=16, minute=13,), # test.s(123), # ) #

#修改app配置文件第二种写法 app.conf.beat_schedule = { 'add-every-10s':{ 'task': "timing.celery_Tasks.test", #路劲要写全 timing目录下面的 celert_Tasks 里面的 test方法 'schedule': 10.0, #每10秒执行上面的任务 'args':(10,)  #传递这个参数到test里面 }, 'add-every-20s': { 'task': "timing.celery_Tasks.test", 'schedule': 20.0, 'args': (20,) }, 'add-every-cron': { 'task': "timing.celery_Tasks.test", 'schedule': crontab(hour=16, minute=26, day_of_week=1), 'args': ("开始.....................................................",) } } app.conf.timezone = 'Asia/Shanghai' #设置时区不然会按utc时间执行 @app.task def test(arg,): print("run....",arg)
进入定时目录
启动worker : celery -A timing worker -l debug -P eventlet
启动beat : celery -A timing.celery_Tasks beat -l debug




相关文章

Python中的函数(二) 在上一篇文章中提到了Python中函数的定...
Python中的字符串 可能大多数人在学习C语言的时候,最先接触...
Python 面向对象编程(一) 虽然Python是解释性语言,但是它...
Python面向对象编程(二) 在前面一篇文章中谈到了类的基本定...
Python中的函数(一) 接触过C语言的朋友对函数这个词肯定非...
在windows下如何快速搭建web.py开发框架 用Python进行web开发...