问题描述
我是 RQ 的新手,我正在尝试在我的 Flask 应用程序中实现它。我的其中一个路由的主要目标是更新数据库中的值。要设置我的工作人员,我使用以下内容:
from rq import Worker,Queue,Connection
import redis
import os
@app.before_first_request
def start_worker():
def runworker():
redis_url = os.environ.get("REdis_URL") or 'redis://'
conn = redis.from_url(redis_url)
with Connection(conn):
worker = Worker(list(map(Queue,listen)))
worker.work()
tp = ThreadPoolExecutor()
tp.submit(runworker)
def get_redis_connection():
redis_connection = getattr(g,'_redis_connection',None)
if redis_connection is None:
redis_url = os.environ.get('REdis_URL') or 'redis://'
redis_connection = redis.from_url(redis_url)
return redis_connection
@app.before_request
def push_rq_connection():
push_connection(get_redis_connection())
@app.teardown_request
def pop_rq_connection(exception=None):
pop_connection()
然后更新路由将更新作业排队
@app.route('/update')
def update_db():
q = Queue(connection=conn)
job = q.q.enqueue('app.tasks.update_task',parameters)
job_id = job.get_id()
return {"job_id": job_id},201,{"Content-Type": 'application/json'}
最后,worker 运行更新函数
def update_task(parameters):
# script to update DB
我知道工作程序设置(几乎)有效,因为如果我将 update_task 函数切换为简单的类似:
def update_task(seconds):
for i in range(seconds):
print(i)
time.sleep(1)
return "Hello world"
它有效。但是,对于真正的函数,我一直遇到环境变量没有被定义的问题,因为当我运行实际更新时,我收到异常说我的变量是 None 或类似的东西。
有谁知道如何在 RQ 中处理环境变量?我应该像配置文件一样在某处再次声明它们吗?
解决方法
好的,所以最好使用 multiprocessing
from multiprocessing import Process
def runworker():
redis_url = os.environ.get("REDIS_URL") or "redis://"
conn = redis.from_url(redis_url)
listen = ['default']
with Connection(conn):
worker = Worker(list(map(Queue,listen)))
worker.work()
pp_worker = Process(target=runworker)
pp_worker.start()
@app.teardow_appcontext
def teardown_rq(self):
pp_worker.terminate()
aaand 这就是诀窍。就好像有人想知道一样。