问题描述
我试图用 Flask 做一个简单的任务队列,没有任何数据库。 在最简单的版本中,我有两个端点。提交作业并检查状态。 提交作业会将请求添加到队列中,检查状态将返回作业 ID 的状态(排队、正在运行、失败、已完成)。 工作流程如下:
- 用户提交作业
- 作业已添加到队列
- 用户将每 5 秒检查一次作业的状态
- 每次状态检查都会触发一个函数,检查正在运行的作业数是否小于最大作业数(来自配置)。如果数字较小,它将跨越另一个线程,该作业位于队列顶部。
这是简化的代码:
app = Flask(__name__)
def finish_job(job_id):
finished.append(job_id)
last = running.pop(job_id)
last.close()
def remove_finished():
for j in list(running.keys()):
if not running[j].is_alive():
finish_job(j)
def start_jobs():
while len(running) < config.threads and len(queue_list) > 0:
print('running Now',len(running))
next_job = queue.pop()
queue_list.remove(next_job[0])
start_job(*next_job)
@app.route("/Simulation",methods=['POST'])
@authenticate
def submit_job():
# create id
job_id = str(uuid.uuid4())
job_data = request.data.decode('utf-8')
queue.append((job_id,job_data))
queue_list.add(job_id)
return 'QUEUED',200
@app.route("/Simulation/<uuid:job_id>",methods=['GET'])
@authenticate
def check_status(job_id: uuid):
job_id = str(job_id)
remove_finished()
start_jobs()
if job_id in running:
r = 'RUNNING'
elif job_id in queue_list:
r = 'QUEUED'
elif job_id in finished:
r = 'COMPLETED'
else:
r = 'Failed'
return status_response(r),200
running = {}
finished = []
queue = []
queue_list = set()
app.run()
现在的问题是,如果多个用户同时提交一个检查状态请求,并且只有一个空闲槽可用于运行任务,那么这两个请求都会产生作业。 有没有办法强制 Flask 一次只运行一个函数实例? 谢谢
解决方法
经过多次搜索,我终于找到了答案。
从 Flask 1.0 开始,内置的 WSGI 服务器默认运行线程。
所以,我只需要添加参数来停止线程
app.run(threaded=False)