具有真正多处理/多线程的 Eventlet 网络服务器

问题描述

简短说明

基于 Flask / Flask-SocketIO / Eventlet 的 Web 服务器为用户提供 HTML 前端。 对于某些操作,后端必须调用由 3rd 方模块提供的 heavy_lifting() 函数,该函数可能会运行很长时间(几分钟),并且偶尔会发出日志消息,这些消息应使用 websockets 发送到前端。

我知道在 Python 中使用多线程/多处理有很多陷阱。我不关心heavy_lifting() 是在异步任务/协程、线程还是单独的进程中完成的。最后我只关心它工作,意思是:不阻塞 eventlet 服务器,将日志消息从外部函数广播到连接的前端,两者都独立于发生的事情和调用内容(或未调用)在heavy_lifting() 函数中。我认为“调用需要一些时间才能完成的外部函数”的用例对于 Flask 应用程序来说并不是那么遥不可及,所以我想知道我错过了什么来正确实现它。

尝试了什么?

  • 直接函数调用 Eventlet 认在单独的任务/协程中处理请求。任务可能会运行更长时间,而服务器仍然响应,只要该任务通过调用例如time.sleep() 或 asyncio.sleep(),使用 await... 但是,外部的heavy_lifting() 不调用 sleep(),因此不能在路由处理程序中直接调用。在示例脚本中,当直接调用函数时,服务器不会响应,直到函数返回。设置 _SLEEP_ON_LOG_EMIT = True 会有所帮助,因为服务器现在至少能够在每次从函数发送日志消息时做出响应。 (日志消息由 logging.Handler 处理,它向连接的客户端发出 () websocket 事件。)但是,由于日志消息之间的时间没有定义并且可能很长,这当然不是首选的解决方案。

  • 线程函数调用 旧实现使用 tk UI,其中heavy_lifting() 使用 threading.Thread() 在单独的线程中执行。只要没有将日志消息发送到 socketio 客户端,执行似乎可以与 eventlet 一起工作。 here 还讨论了此主题。使用monkey_patch() 有效地修补线程实现会产生与when 相同的效果

  • 使用 start_background_task(): Flask-SocketIO 的 start_background_task() 将根据使用的异步环境启动后台任务的实现。对于 eventlet,它将使用基于协程/任务的适当实现。实际上,认情况下它与“直接”函数调用相同:无论如何,路由处理程序也运行在单独的 eventlet 协程中。所以结果是一样的:任务运行良好,但服务器没有响应,直到任务放弃控制,例如通过调用 .sleep() 函数(或 await ...)。同样,这不是在heavy_lifting() 本身中完成的,它只能在处理日志消息时“注入”,但这并不是真正的解决方案。

  • 使用多处理: 为了将heavy_lifting() 与网络服务器完全分离,我们的想法是将其作为一个新的multiprocessing.Process() 启动。通过 Queue() 发送消息,设置和通信变得更加复杂。这种情况似乎有效……但是在尝试调试(真实的、更大的)项目时,我遇到了这种方法的一些奇怪问题。有时,当涉及多处理并且 PyCharm 调试器处于活动状态时,eventlet 甚至无法正常启动。此外,eventlet 似乎 not compatible 具有多处理功能。如果我没记错的话,根本不支持 os.forks() 所以它不再是跨平台的(因为 multiprocessing 在 Unix 上使用 fork() )。 使用我尝试过的所有不同方法,结合和不结合monkey_patch(),我不记得到底出了什么问题,因为在示例代码中,多处理方法在这里似乎效果最好。但是,我们感谢任何有关改进的意见或任何其他相关评论

完整示例代码

from flask import Flask,render_template_string
from flask_socketio import SocketIO
import time
import logging
import threading
from multiprocessing import Queue,Process

# Set this to true to see that the direct call and start_background_task work partially
# (server responsive when sleep() is called)
_SLEEP_ON_LOG_EMIT = False


def heavy_lifting():
    # this function is part of a 3rd party module,will run for some time
    # simulate heavy lifting without time.sleep() or socketio.sleep() calls
    t_total = 30
    t_int = 8
    logging.info(f"heavy_lifting() will run for {t_total} seconds,send a log message every {t_int} seconds")
    t_start = time.time()
    t_stop = time.time() + t_total
    while time.time() < t_stop:
        t_next_int = time.time() + t_int
        while time.time() < t_next_int:
            x = 2 ** 2048
        logging.info(f"This is a log message generated by heavy_lifting() after {round(time.time() - t_start)}s")
    logging.info(f"heavy_lifting() is done.")


class WebUILogHandler(logging.Handler):
    # this log handler will receive the log messages from heavy_lifting() and emit() them to the UI
    def __init__(self,socketio: SocketIO):
        super().__init__()
        self.socketio: SocketIO = socketio

    def emit(self,record: logging.LogRecord) -> None:
        msg = self.format(record)
        self.socketio.emit("log",msg,broadcast=True)
        # by adding a sleep,at least for every log message the eventlet server
        # is able to handle its tasks while heavy_lifting() is running...
        # But: time between log messages is indeterminate. Server not responding in between.
        if _SLEEP_ON_LOG_EMIT:
            self.socketio.sleep(0.001)


def heavy_lifting_mp_wrapper(q: Queue):
    """ used for mutliprocessing
    the wrapper is started as a new process,and sends all logging messages from heavy_lifting() to the queue
    """
    q.put("heavy_lifting_mp_wrapper() starting...")
    logging.getLogger().setLevel(logging.INFO)
    logging.getLogger().addHandler(QueueLogHandler(q))
    heavy_lifting()
    q.put("heavy_lifting_mp_wrapper() done.")


class QueueLogHandler(logging.Handler):
    def __init__(self,q: Queue):
        super().__init__()
        self.q = q

    def emit(self,record: logging.LogRecord) -> None:
        self.q.put(self.format(record))


def main():
    app = Flask(__name__)
    socketio = SocketIO(app,async_mode='eventlet')

    logging.getLogger().setLevel(logging.INFO)
    logging.getLogger().addHandler(WebUILogHandler(socketio))

    @app.route("/")
    def index():
        return render_template_string("""<!DOCTYPE html>
            <html lang="en">
            <head>
                <Meta charset="UTF-8">
                <title>Eventlet Multiprocessing test</title>
                <script src="https://cdn.socket.io/4.0.1/socket.io.min.js"></script>
                <script>
                    const socketio = io({
                        reconnection: true,reconnectionDelay: 1000,reconnectionDelayMax: 1000,randomizationFactor: 0,timeout: 1000,reconnectionAttempts: Infinity,transports: ["websocket"],});
                    function log(msg) {
                        document.getElementById("info").innerHTML += `<li>${msg}</li>`;
                    }
                    socketio.on("log",log);
                    socketio.on("connect",function() {
                        log("Socket connection established");
                    });
                    socketio.on("disconnect",function() {
                        log("Socket connection lost");
                    });
                    document.addEventListener('DOMContentLoaded',function() {
                        // instead of reloading the page,use fetch to initiate the HTTP call,// show answer in log
                        const a = document.getElementsByTagName("a");
                        for(let i = 0; i < a.length; i++) {
                            a[i].addEventListener("click",async function(e) {
                                e.preventDefault();
                                const href = this.getAttribute("href");
                                const response = await fetch(href,{method: "GET",});
                                const text = await response.text();
                                log(text);
                            });
                        }
                    },false);
                </script>
            </head>
            <body>
                <h1>Actions</h1>
                <ul>
                    <li><a href="{{ url_for("do_heavy_lifting_direct") }}">Start heavy_lifting() directly</a></li>
                    <li><a href="{{ url_for("do_heavy_lifting_thread") }}">Start heavy_lifting() as thread</a></li>
                    <li><a href="{{ url_for("do_heavy_lifting_task") }}">Start heavy_lifting() as eventlet task</a></li>
                    <li><a href="{{ url_for("do_heavy_lifting_process") }}">Start heavy_lifting() as process</a></li>
                </ul>
                <h1>Output log</h1>
                <ul id="info"></ul>
            </body>
            </html>""")

    @app.route("/do_heavy_lifting_direct")
    def do_heavy_lifting_direct():
        logging.info("do_heavy_lifting_direct() called")
        # by default,the eventlet server is set up to handle all requests in a separate task
        # so long running functions can be called here,as long as they allow giving control to other eventlet tasks
        heavy_lifting()
        # heavy_lifting unfortunately does not give up control due to never calling a sleep function or similar.
        return "do_heavy_lifting_direct() route done."

    @app.route("/do_heavy_lifting_thread")
    def do_heavy_lifting_thread():
        logging.info("do_heavy_lifting_thread() called")
        t = threading.Thread(target=heavy_lifting)
        t.start()
        return "do_heavy_lifting_thread() route done."

    @app.route("/do_heavy_lifting_task")
    def do_heavy_lifting_task():
        logging.info("do_heavy_lifting_task() called")
        socketio.start_background_task(target=heavy_lifting)
        return "do_heavy_lifting_task() route done."

    @app.route("/do_heavy_lifting_process")
    def do_heavy_lifting_process():
        logging.info("do_heavy_lifting_process() called")
        queue = Queue()
        p = Process(target=heavy_lifting_mp_wrapper,args=(queue,))
        p.start()
        # process is forwarding logging messages using the queue
        # read messages from queue and emit
        while p.is_alive():
            while not queue.empty():
                msg = queue.get()
                logging.info(msg + " (Q)")
            socketio.sleep(0.01)
        return "do_heavy_lifting_process() route done."

    socketio.run(app,debug=True)


if __name__ == '__main__':
    main()

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)