问题描述
简短说明
基于 Flask / Flask-SocketIO / Eventlet 的 Web 服务器为用户提供 HTML 前端。
对于某些操作,后端必须调用由 3rd 方模块提供的 heavy_lifting()
函数,该函数可能会运行很长时间(几分钟),并且偶尔会发出日志消息,这些消息应使用 websockets 发送到前端。
我知道在 Python 中使用多线程/多处理有很多陷阱。我不关心heavy_lifting() 是在异步任务/协程、线程还是单独的进程中完成的。最后我只关心它工作,意思是:不阻塞 eventlet 服务器,将日志消息从外部函数广播到连接的前端,两者都独立于发生的事情和调用的内容(或未调用)在heavy_lifting() 函数中。我认为“调用需要一些时间才能完成的外部函数”的用例对于 Flask 应用程序来说并不是那么遥不可及,所以我想知道我错过了什么来正确实现它。
尝试了什么?
-
直接函数调用: Eventlet 默认在单独的任务/协程中处理请求。任务可能会运行更长时间,而服务器仍然响应,只要该任务通过调用例如time.sleep() 或 asyncio.sleep(),使用 await... 但是,外部的heavy_lifting() 不调用 sleep(),因此不能在路由处理程序中直接调用。在示例脚本中,当直接调用函数时,服务器不会响应,直到函数返回。设置
_SLEEP_ON_LOG_EMIT = True
会有所帮助,因为服务器现在至少能够在每次从函数发送日志消息时做出响应。 (日志消息由 logging.Handler 处理,它向连接的客户端发出 () websocket 事件。)但是,由于日志消息之间的时间没有定义并且可能很长,这当然不是首选的解决方案。 -
线程函数调用: 旧实现使用 tk UI,其中heavy_lifting() 使用
threading.Thread()
在单独的线程中执行。只要没有将日志消息发送到 socketio 客户端,执行似乎可以与 eventlet 一起工作。 here 还讨论了此主题。使用monkey_patch() 有效地修补线程实现会产生与when 相同的效果: -
使用 start_background_task(): Flask-SocketIO 的
start_background_task()
将根据使用的异步环境启动后台任务的实现。对于 eventlet,它将使用基于协程/任务的适当实现。实际上,默认情况下它与“直接”函数调用相同:无论如何,路由处理程序也运行在单独的 eventlet 协程中。所以结果是一样的:任务运行良好,但服务器没有响应,直到任务放弃控制,例如通过调用 .sleep() 函数(或 await ...)。同样,这不是在heavy_lifting() 本身中完成的,它只能在处理日志消息时“注入”,但这并不是真正的解决方案。 -
使用多处理: 为了将heavy_lifting() 与网络服务器完全分离,我们的想法是将其作为一个新的
multiprocessing.Process()
启动。通过 Queue() 发送消息,设置和通信变得更加复杂。这种情况似乎有效……但是在尝试调试(真实的、更大的)项目时,我遇到了这种方法的一些奇怪问题。有时,当涉及多处理并且 PyCharm 调试器处于活动状态时,eventlet 甚至无法正常启动。此外,eventlet 似乎 not compatible 具有多处理功能。如果我没记错的话,根本不支持 os.forks() 所以它不再是跨平台的(因为 multiprocessing 在 Unix 上使用 fork() )。 使用我尝试过的所有不同方法,结合和不结合monkey_patch(),我不记得到底出了什么问题,因为在示例代码中,多处理方法在这里似乎效果最好。但是,我们感谢任何有关改进的意见或任何其他相关评论。
完整示例代码
from flask import Flask,render_template_string
from flask_socketio import SocketIO
import time
import logging
import threading
from multiprocessing import Queue,Process
# Set this to true to see that the direct call and start_background_task work partially
# (server responsive when sleep() is called)
_SLEEP_ON_LOG_EMIT = False
def heavy_lifting():
# this function is part of a 3rd party module,will run for some time
# simulate heavy lifting without time.sleep() or socketio.sleep() calls
t_total = 30
t_int = 8
logging.info(f"heavy_lifting() will run for {t_total} seconds,send a log message every {t_int} seconds")
t_start = time.time()
t_stop = time.time() + t_total
while time.time() < t_stop:
t_next_int = time.time() + t_int
while time.time() < t_next_int:
x = 2 ** 2048
logging.info(f"This is a log message generated by heavy_lifting() after {round(time.time() - t_start)}s")
logging.info(f"heavy_lifting() is done.")
class WebUILogHandler(logging.Handler):
# this log handler will receive the log messages from heavy_lifting() and emit() them to the UI
def __init__(self,socketio: SocketIO):
super().__init__()
self.socketio: SocketIO = socketio
def emit(self,record: logging.LogRecord) -> None:
msg = self.format(record)
self.socketio.emit("log",msg,broadcast=True)
# by adding a sleep,at least for every log message the eventlet server
# is able to handle its tasks while heavy_lifting() is running...
# But: time between log messages is indeterminate. Server not responding in between.
if _SLEEP_ON_LOG_EMIT:
self.socketio.sleep(0.001)
def heavy_lifting_mp_wrapper(q: Queue):
""" used for mutliprocessing
the wrapper is started as a new process,and sends all logging messages from heavy_lifting() to the queue
"""
q.put("heavy_lifting_mp_wrapper() starting...")
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(QueueLogHandler(q))
heavy_lifting()
q.put("heavy_lifting_mp_wrapper() done.")
class QueueLogHandler(logging.Handler):
def __init__(self,q: Queue):
super().__init__()
self.q = q
def emit(self,record: logging.LogRecord) -> None:
self.q.put(self.format(record))
def main():
app = Flask(__name__)
socketio = SocketIO(app,async_mode='eventlet')
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(WebUILogHandler(socketio))
@app.route("/")
def index():
return render_template_string("""<!DOCTYPE html>
<html lang="en">
<head>
<Meta charset="UTF-8">
<title>Eventlet Multiprocessing test</title>
<script src="https://cdn.socket.io/4.0.1/socket.io.min.js"></script>
<script>
const socketio = io({
reconnection: true,reconnectionDelay: 1000,reconnectionDelayMax: 1000,randomizationFactor: 0,timeout: 1000,reconnectionAttempts: Infinity,transports: ["websocket"],});
function log(msg) {
document.getElementById("info").innerHTML += `<li>${msg}</li>`;
}
socketio.on("log",log);
socketio.on("connect",function() {
log("Socket connection established");
});
socketio.on("disconnect",function() {
log("Socket connection lost");
});
document.addEventListener('DOMContentLoaded',function() {
// instead of reloading the page,use fetch to initiate the HTTP call,// show answer in log
const a = document.getElementsByTagName("a");
for(let i = 0; i < a.length; i++) {
a[i].addEventListener("click",async function(e) {
e.preventDefault();
const href = this.getAttribute("href");
const response = await fetch(href,{method: "GET",});
const text = await response.text();
log(text);
});
}
},false);
</script>
</head>
<body>
<h1>Actions</h1>
<ul>
<li><a href="{{ url_for("do_heavy_lifting_direct") }}">Start heavy_lifting() directly</a></li>
<li><a href="{{ url_for("do_heavy_lifting_thread") }}">Start heavy_lifting() as thread</a></li>
<li><a href="{{ url_for("do_heavy_lifting_task") }}">Start heavy_lifting() as eventlet task</a></li>
<li><a href="{{ url_for("do_heavy_lifting_process") }}">Start heavy_lifting() as process</a></li>
</ul>
<h1>Output log</h1>
<ul id="info"></ul>
</body>
</html>""")
@app.route("/do_heavy_lifting_direct")
def do_heavy_lifting_direct():
logging.info("do_heavy_lifting_direct() called")
# by default,the eventlet server is set up to handle all requests in a separate task
# so long running functions can be called here,as long as they allow giving control to other eventlet tasks
heavy_lifting()
# heavy_lifting unfortunately does not give up control due to never calling a sleep function or similar.
return "do_heavy_lifting_direct() route done."
@app.route("/do_heavy_lifting_thread")
def do_heavy_lifting_thread():
logging.info("do_heavy_lifting_thread() called")
t = threading.Thread(target=heavy_lifting)
t.start()
return "do_heavy_lifting_thread() route done."
@app.route("/do_heavy_lifting_task")
def do_heavy_lifting_task():
logging.info("do_heavy_lifting_task() called")
socketio.start_background_task(target=heavy_lifting)
return "do_heavy_lifting_task() route done."
@app.route("/do_heavy_lifting_process")
def do_heavy_lifting_process():
logging.info("do_heavy_lifting_process() called")
queue = Queue()
p = Process(target=heavy_lifting_mp_wrapper,args=(queue,))
p.start()
# process is forwarding logging messages using the queue
# read messages from queue and emit
while p.is_alive():
while not queue.empty():
msg = queue.get()
logging.info(msg + " (Q)")
socketio.sleep(0.01)
return "do_heavy_lifting_process() route done."
socketio.run(app,debug=True)
if __name__ == '__main__':
main()
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)