问题描述
这是我的测试。py
from datetime import datetime,timedelta
import sys
import os
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import sqlAlchemyJobStore
from apscheduler.jobstores.redis import RedisJobStore
jobstores = {
#'default': sqlAlchemyJobStore(url='sqlite:///jobs.sqlite')
'default': RedisJobStore(host='localhost',port=6379)
}
scheduler = BlockingScheduler(jobstores=jobstores)
def alarm(time):
print('Alarm! This alarm was scheduled at %s.' % time)
if __name__ == '__main__':
alarm_time = datetime.Now() + timedelta(seconds=10)
scheduler.add_job(alarm,'interval',seconds=10,args=[datetime.Now()],name='alarm_test')
print('To clear the alarms,delete the example.sqlite file.')
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
scheduler.start()
except (KeyboardInterrupt,SystemExit):
pass
我成功执行python test.py运行作业
然后通过腻子使用另一个终端
python
>>> import redis
>>> from test import *
>>> r = redis.Redis()
>>> r.keys()
>>> r.zrange('apscheduler.run_times',1)
它将找到工作ID 57841c0ee05249efb466882265f2c495
>>> ret = scheduler.get_jobs(jobstore='default')
ret为空 为什么?
非常感谢
解决方法
您是否在运行get_jobs()
之前启动了调度程序?如果不是,它将仅列出暂定的作业。这就是为什么您看不到工作。
尝试以下方法:
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.redis import RedisJobStore
scheduler = BackgroundScheduler()
scheduler.add_jobstore('redis',host='localhost',port=6379)
scheduler.start(paused=True)
scheduler.print_jobs()