问题描述
我在我的应用中设置了一个相当基本的(到目前为止)队列:
非常简化的代码:
from rq import Queue
from rq.decorators import job
@job('backup')
def backup(db,table,conn_str):
backup_sql = "SELECT * INTO {}.dbo.{}_backup from {}.dbo.{}".format(db,db,collection)
@job('update')
def update(db,conn_str,keys,data):
truncate_sql = "TruncATE TABLE {}.dbo.{}".format(db,collection)
sql_cursor.execute(truncate_sql)
for sql_row in data:
sql = "INSERT INTO {}.dbo.{} ({}) values ({})".format(db,",".join(keys),".join(["?"] * len(sql_row)))
sql_cursor.execute(sql,sql_row)
sql_cursor.commit()
def update_data():
...
update_queue = Queue('update',connection=redis_conn)
backup_job = update_queue.enqueue('backup',result_ttl=current_app.config['RESULT_TTL'],)
update_job = update_queue.enqueue('update',)
我想做的是找到一种观看update
的方法。如果失败,我想运行一个作业来恢复在 backup
作业中创建的备份。如果成功,我想运行不同的作业来简单地删除备份。
解决这个问题的正确方法是什么?我对 rq
很陌生,正在查看文档,但没有找到轮询 update
以了解成功/失败的方法,也没有找到处理任一结果的惯用方法。
解决方法
一个选项是创建另一个名为“检查器”的第三个作业,例如,它将根据“更新”作业的状态决定要做什么。为此,您必须指定依赖关系。
depends_on
指定必须在之前完成的另一个作业(或作业 ID)
此作业将排队。
def checker(*args,**kwargs):
pass
checker_job = update_queue.enqueue('checker',*args,depends_on=update_job.id,result_ttl=current_app.config['RESULT_TTL'])
然后检查“checker”内部依赖的状态,并根据该状态加载备份或删除它。
def checker(*args,**kwargs):
job = rq.get_current_job()
update_job = job.dependency
if update_job.status == 'failed':
# do the stuff here
else: # or elif
# do the stuff here