通过 RPyC 将 *args 和 **kwargs 正确传递给底层模块的问题

问题描述

背景

我正在尝试实施 RPyC github 存储库中提供的 example 概念验证 APScheduler,以便使用具有多个工作人员的 gunicorn 部署我的应用程序(一个问题 pointed out 在 APScheduler 的常见问题部分)。此外,我正在尝试使用 Flask-APScheduler 这样做,以便能够从任务功能轻松地在 app_contexts 内工作。

问题

调用通过 RPyC 公开的调度程序服务时,我似乎无法正确提供参数。更具体地说,在传递参数 argskwargs文字变量)以及存储在暴露的 *args**kwargs 中的变量时,这似乎成为一个问题RPyC 函数

基本上,我通常在直接调用 scheduler.add_job() 时使用的参数在通过 RPyC 路由时不起作用,而是在尝试将 RPyC 公开方法接收的参数传递给底层时导致如下错误调度程序实例。我该如何解决这个问题?

最小的工作示例

一个终端运行python app.py,在另一个终端运行python scheduler.py

# app.py
from flask import Flask,current_app,jsonify
from flask_apscheduler import APScheduler
import rpyc

scheduler = APScheduler()

def create_app():
    app = Flask(__name__)
    app.scheduler = rpyc.connect("localhost",12345)
    app.scheduler = app.scheduler.root  # just so current_app.scheduler can be used like normal

    @app.route("/add_job/<report_id>",methods=["GET"])
    def add_job(report_id):
        """
        This works as expected when using 
        from app import scheduler
        scheduler.add_job(...)
        """
        current_app.scheduler.add_job(
            func="app.tasks:run_report",args=(report_id,),kwargs={"email_results": True},executor="threadpool",trigger="cron",day="*/1",id="reconcile_accounts"
        )
        return jsonify({"status": "scheduled"})

    return app

if __name__ == "__main__":
    app = create_app()
    app.run(debug=True)
# scheduler.py
from rpyc.utils.server import ThreadedServer
import rpyc

from app import create_app,scheduler

class SchedulerService(rpyc.Service):
    def __init__(self):
        self._app = None
        self._scheduler = None

    def on_connect(self,conn):
        # code that runs when a connection is created
        # (to init the service,if needed)
        self._app = create_app()
        self._scheduler = scheduler

    def exposed_add_job(self,func,*args,**kwargs):
        # Problem occurs below when sending *args and **kwargs to Flask-APScheduler,which sends them to APScheduler
        job_id = kwargs.pop("id",None)
        return self._scheduler.add_job(job_id,**kwargs)

if __name__ == "__main__":
    server = ThreadedServer(SchedulerService,port=12345,protocol_config={"allow_public_attrs": True})
    try:
        server.start()
    except (KeyboardInterrupt,SystemExit):
        pass
    finally:
        scheduler.shutdown()

来自self._scheduler.add_job(job_id,**kwargs)的跟踪本

127.0.0.1 - - [08/Jul/2021 10:29:43] "GET /reports/2/run HTTP/1.1" 500 -
Traceback (most recent call last):
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py",line 2088,in __call__
    return self.wsgi_app(environ,start_response)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py",line 2073,in wsgi_app
    response = self.handle_exception(e)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py",line 2070,in wsgi_app
    response = self.full_dispatch_request()
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py",line 1515,in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py",line 1513,in full_dispatch_request
    rv = self.dispatch_request()
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask\app.py",line 1499,in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)
  File "C:\Users\mhill\PycharmProjects\reporting\app\views.py",line 189,in run_report
    kwargs={"config": json.dumps(report.serialize())}
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\netref.py",line 240,in __call__
    return syncreq(_self,consts.HANDLE_CALL,args,kwargs)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\netref.py",line 63,in syncreq
    return conn.sync_request(handler,proxy,*args)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\protocol.py",line 473,in sync_request
    return self.async_request(handler,timeout=timeout).value
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\async_.py",line 102,in value
    raise self._obj
_get_exception_class.<locals>.Derived: dictionary update sequence element #0 has length 6; 2 is required

========= Remote Traceback (1) =========
Traceback (most recent call last):
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\rpyc\core\protocol.py",line 324,in _dispatch_request
    res = self._HANDLERS[handler](self,line 592,in _handle_call
    return obj(*args,**dict(kwargs))
  File "C:/Users/mhill/PycharmProjects/reporting/scheduler.py",line 20,in exposed_add_job
    return self._scheduler.add_job(func,**kwargs)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\flask_apscheduler\scheduler.py",line 168,in add_job
    return self._scheduler.add_job(**job_def)
  File "C:\Users\mhill\PycharmProjects\reporting\venv\lib\site-packages\apscheduler\schedulers\base.py",line 429,in add_job
    'kwargs': dict(kwargs) if kwargs is not None else {},ValueError: dictionary update sequence element #0 has length 6; 2 is required

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...