无法修复错误“RuntimeError:您需要使用 gevent-websocket 服务器”和“OSError:写错误”

问题描述

我正在为 Flask 编写一个网站。我使用一堆 uWsgi + Nginx + Flask-Socketio。我使用 gevent 作为异步模块。运行过程中出现错误

运行时错误:您需要使用 gevent-websocket 服务器。 uwsgi_response_writev_headers_and_body_do():在 > 期间断管 [core/writer.c line 306] 2 月 23 日 12:57:55 toaa uwsgi[558436]:OSError:写错误

我尝试了不同的配置,并从 async_mode='gevent' 初始化中删除socketio

wsgi.py 文件

from webapp import app,socketio

if __name__ == '__main__':
    socketio.run(app,use_reloader=False,debug=True,log_output=True)

project.ini:

[uwsgi]
module = wsgi:app

master = true
gevent = 1024
gevent-monkey-patch = true
buffer-size=32768 # optionally

socket = /home/sammy/projectnew/projectnew.sock
socket-timeout = 240
chmod-socket = 664

vacuum = true
die-on-term = true

webapp/__init__.py 用于应用程序:

from gevent import monkey

monkey.patch_all()
import grpc.experimental.gevent

grpc.experimental.gevent.init_gevent()

from flask import Flask,session,request
from config import DevelopConfig,MqttConfig,MailConfig,ProductionConfig
from flask_sqlalchemy import sqlAlchemy
from flask_migrate import Migrate
from flask_mail import Mail
from flask_script import Manager
from flask_socketio import SocketIO
# from flask_mqtt import Mqtt
from flask_login import LoginManager
from flask_babel import Babel
from flask_babel_js import BabelJS
from flask_babel import lazy_gettext as _l
from apscheduler.schedulers.gevent import GeventScheduler
# from celery import Celery


app = Flask(__name__)
app.config.from_object(ProductionConfig)     
app.config.from_object(MqttConfig)        
app.config.from_object(MailConfig)         
db = sqlAlchemy(app)
migrate = Migrate(app,db)
mail = Mail(app)
manager = Manager(app,db)
login_manager = LoginManager(app)
login_manager.login_view = 'auth'    
login_manager.login_message = _l("Необходимо авторизоваться для доступа к закрытой странице")
login_manager.login_message_category = "error"
# celery = Celery(app.name,broker=Config.CELERY_broKER_URL)
# celery.conf.update(app.config)
scheduler = GeventScheduler()
# socketio = SocketIO(app) - Production Version
socketio = SocketIO(app,async_mode='gevent')
babel = Babel(app)
babeljs = BabelJS(app=app,view_path='/translations/')
import webapp.views


@babel.localeselector
def get_locale():
    # if the user has set up the language manually it will be stored in the session,# so we use the locale from the user settings
    try:
        language = session['language']
    except KeyError:
        language = None
    if language is not None:
        print(language)
        return language
    return request.accept_languages.best_match(app.config['LANGUAGES'].keys())


from webapp import models

if __name__ == "__main__":
    manager.run()

使用套接字本身的类(mqtt.py):

from webapp import socketio,app
from flask import request
from flask_mqtt import Mqtt
from flask_babel import lazy_gettext as _l
from webapp.tasks import SchedulerTask
from webapp import translate_state_gate as tr_msg
import json
import copy
import logging

mqtt = Mqtt(app)
logger = logging.getLogger('flask.flask_mqtt')
logger.disabled = True


class MqttTOAA(object):
    type_topic = ["/Control","/Data"]
    m_request_state = {"comm": "3"}
    m_start = {"Gate": "Start"}
    m_stop = {"Gate": "Stop"}
    qos_request = 1
    qos_sub = 2
    struct_state_devices = None
    POOL_TIME = 2
    end_publish = None
    devices = None
    schedulers_list = list()
    sch_task = None
    sid_mqtt = None
    code_list = list()

    def __init__(self,devices,lang):
        mqtt._connect()
        self.devices = devices
        self.sch_task = SchedulerTask()
        if lang not in app.config['LANGUAGES'].keys():
            lang = 'ru'
        self.dict_gate = {"dict_state_button": {'con_Clos': tr_msg.MessageGate.t_message[lang]["f_open"],'con_Open': tr_msg.MessageGate.t_message[lang]["f_close"],"fl_OpenClos": (tr_msg.MessageGate.t_message[lang]["f_continue"],tr_msg.MessageGate.t_message[lang]["f_stop"],tr_msg.MessageGate.t_message[lang]["f_abort"])},"dict_state_text": {tr_msg.MessageGate.t_message[lang]["f_open"]:\
                                                 tr_msg.MessageGate.t_message[lang]["ps_close"],tr_msg.MessageGate.t_message[lang]["f_close"]:\
                                                 tr_msg.MessageGate.t_message[lang]["ps_open"],tr_msg.MessageGate.t_message[lang]["f_continue"]:\
                                                 tr_msg.MessageGate.t_message[lang]["ps_stop"],tr_msg.MessageGate.t_message[lang]["f_abort"]:\
                                                 tr_msg.MessageGate.t_message[lang]["pr_close"],tr_msg.MessageGate.t_message[lang]["f_stop"]:\
                                                 (tr_msg.MessageGate.t_message[lang]["pr_open"],tr_msg.MessageGate.t_message[lang]["pr_close"],tr_msg.MessageGate.t_message[lang]["pr_move"])},"dict_type_element": {"button": u'',"text": u'',"device_code": u'',},"state_gate": {},"position": {"state": u'',"stop": False},"reverse": False,}
        self.close_msg = tr_msg.MessageGate.t_message[lang]["pr_close"]
        self.open_msg = tr_msg.MessageGate.t_message[lang]["pr_open"]

        self.create_devices_dict()
        self.handle_mqtt_connect()
        self.mqtt_onmessage = mqtt.on_message()(self._handle_mqtt_message)
        self.mqtt_onlog = mqtt.on_log()(self._handle_logging)
        self.socketio_error = socketio.on_error()(self._handle_error)
        self.handle_change_state = socketio.on('change_state')(self._handle_change_state)
        self.handle_on_connect = socketio.on('connect')(self._handle_on_connect)
        self.handle_unsubscribe_all = socketio.on('unsubscribe_all')(self._handle_unsubscribe_all)

    def _handle_on_connect(self):
        self.sid_mqtt = request.sid


    def handle_mqtt_connect(self):
        task = None
        for dev in self.devices:
            if dev.device_code not in self.code_list:  
                mqtt.subscribe("BK" + dev.device_code + self.type_topic[1],self.qos_sub)
                self.code_list.append(dev.device_code)
                task = self.sch_task.add_scheduler_publish(dev.device_code,mqtt,"BK" + dev.device_code +
                                                           self.type_topic[0],self.m_request_state,self.qos_request,self.POOL_TIME)
                if task is not None:
                    self.schedulers_list.append(task)

        if len(self.schedulers_list) > 0:
            self.sch_task.start_schedulers()
            self.code_list.clear()


    @staticmethod
    def _handle_error():
        print(request.event["message"])  # "my error event"
        print(request.event["args"])  # (data,)


    @staticmethod
    def _handle_unsubscribe_all():
        mqtt.unsubscribe_all()

   
    def _handle_change_state(self,code):
        print(code)
        # print(self.struct_state_devices[code])
        message = None
        if code is not None:
            try:
                type_g = self.struct_state_devices[code]["state_gate"]

                if type_g["fl_OpenClos"] == 1:  
                    message = self.m_stop
                else:
                    if self.struct_state_devices[code]["reverse"] is True:
                       
                        if self.struct_state_devices[code]["position"]["state"] == self.close_msg:
                            message = self.m_stop
                            self.struct_state_devices[code]["position"]["state"] = self.open_msg
                        else:
                            message = self.m_start
                    else:  
                        message = self.m_start
                print("Msg:" + str(message))
            except Exception as ex:
                print(ex)
            if message is not None:
                mqtt.publish("BK" + code + self.type_topic[0],json.dumps(message),self.qos_request)
            else:
                print("Error change state " + code)

    
    def _handle_mqtt_message(self,client,userdata,message):
        # print("Get message")
        # print(self.struct_state_devices)
        data = dict(
            topic=message.topic,payload=message.payload.decode(),qos=message.qos,)
        try:
            data = json.loads(data['payload'])
            self.gate_msg(data)
        except Exception as ex:
            print("Exception: " + str(ex))

   
    @staticmethod
    def _handle_logging(self,level,buf):
        print(level,buf)
        pass

    
    def create_devices_dict(self):
        if self.struct_state_devices is None:
            self.struct_state_devices = dict()
        for dev in self.devices:
            self.struct_state_devices[dev.device_code] = self.dict_gate.copy()  
            if dev.typedev.reverse:
                self.struct_state_devices[dev.device_code]['reverse'] = True

   
    def gate_msg(self,data):
        k = ""
        code = data["esp_id"][2:]
        dict_dev = copy.deepcopy(self.struct_state_devices[code])
        dict_dev["state_gate"] = data.copy()
        try:
            if dict_dev["state_gate"]["con_Clos"] == 0:  # ворота закрыты
                # print("1")
                k = "con_Clos"
                dict_dev["position"]["state"] = k
                dict_dev["position"]["stop"] = False
            elif dict_dev["state_gate"]["con_Open"] == 0:  # ворота открыты
                # print("2")
                k = "con_Open"
                dict_dev["position"]["state"] = k
                dict_dev["position"]["stop"] = False
            elif dict_dev["state_gate"]["fl_OpenClos"] == 0:  
                # print("3")
                k = "fl_OpenClos"
                # обратный ход ворот при закрытии
                if dict_dev["position"]["state"] == self.close_msg and dict_dev["reverse"] is True:
                    # print("4")
                    k1 = 1
                    k2 = 0
                    dict_dev["dict_type_element"]["text"] = \
                        dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
                    dict_dev["position"]["stop"] = False
                else:
                    # print("5")
                    k1 = 0
                    dict_dev["dict_type_element"]["text"] = \
                        dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]]
                    dict_dev["position"]["stop"] = True
            elif dict_dev["state_gate"]["fl_OpenClos"] == 1:  
                # print("6")
                k = "fl_OpenClos"
                
                if len(dict_dev["position"]["state"]) == 0:
                    # print("7")
                    k1 = 1
                    k2 = 2
                    dict_dev["dict_type_element"]["text"] = \
                        dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
                
                elif dict_dev["position"]["state"] == "con_Clos" or \
                        dict_dev["position"]["state"] == self.open_msg:
                    if dict_dev["position"]["stop"]:
                        # print("8")
                        k1 = 1
                        k2 = 1
                        dict_dev["position"]["stop"] = False
                        dict_dev["dict_type_element"]["text"] = \
                            dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
                    else:
                        # print("9")
                        k1 = 1
                        k2 = 0
                        dict_dev["dict_type_element"]["text"] = \
                            dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
                elif dict_dev["position"]["state"] == "con_Open" or \
                        dict_dev["position"]["state"] == self.close_msg:
                    if dict_dev["reverse"]:
                        # print("10")
                        k1 = 2
                        dict_dev["dict_type_element"]["text"] = \
                            dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]]
                    else:
                        if dict_dev["position"]["stop"]:
                            # print("11")
                            k1 = 1
                            k2 = 0
                            dict_dev["position"]["stop"] = False
                            dict_dev["dict_type_element"]["text"] = \
                                dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
                        else:
                            # print("12")
                            k1 = 1
                            k2 = 1
                            dict_dev["dict_type_element"]["text"] = \
                                dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]

                if dict_dev["position"]["state"] != dict_dev["dict_type_element"]["text"]:
                    # print("13")
                    dict_dev["position"]["state"] = dict_dev["dict_type_element"]["text"]

            if k == "fl_OpenClos":
                dict_dev["dict_type_element"]["button"] = dict_dev["dict_state_button"][k][k1]
            else:
                dict_dev["dict_type_element"]["button"] = dict_dev["dict_state_button"][k]
                dict_dev["dict_type_element"]["text"] = \
                    dict_dev["dict_state_text"][dict_dev["dict_state_button"][k]]

        except Exception as ex:
            print("Exception (gate_msg): " + str(ex))
        dict_dev["dict_type_element"]["device_code"] = data["esp_id"][2:]
        dict_dev["dict_type_element"]["temp"] = data["temp_1"]
        dict_dev["dict_type_element"]["button"] = copy.deepcopy(str(dict_dev["dict_type_element"]["button"]))
        dict_dev["dict_type_element"]["text"] = copy.deepcopy(str(dict_dev["dict_type_element"]["text"]))
        self.struct_state_devices[code] = copy.deepcopy(dict_dev)
        socketio.emit('mqtt_message',data=dict_dev["dict_type_element"],room=self.sid_mqtt)
        # print(dict_dev["state_gate"]["esp_id"] + str(dict_dev["dict_type_element"]))

解决方法

当你使用 uWSGI 时,async_mode 应该是 gevent_uwsgi:

socketio = SocketIO(app,async_mode='gevent_uwsgi')