很长时间无法连接到MQTT代理,并且绿色线程无法正常工作

问题描述

该网站写在烧瓶中。我正在尝试使用config.py文件中编写的参数连接到MQTT代理:

class DevelopConfig(Config):
    DEBUG = True
    TESTING = True
    ASSETS_DEBUG = True


class MqttConfig(object):
    MQTT_broKER_URL = 'temper.chost.com.ua'
    MQTT_broKER_PORT = 1883
    MQTT_CLIENT_ID = 'flask_mqtt'
    MQTT_CLEAN_SESSION = True
    MQTT_USERNAME = 'test'
    MQTT_PASSWORD = 'tester'
    MQTT_KEEPALIVE = 2
    MQTT_TLS_ENABLED = False
    MQTT_LAST_WILL_TOPIC = 'home/lastwill'
    MQTT_LAST_WILL_MESSAGE = 'bye'
    MQTT_LAST_WILL_QOS = 2

连接需要很长的时间,或者根本不会发生。问题很可能是流的组织错误。我使用eventlet.greenthread模块创建线程,并使用Flask-MQTT与MQTT服务器一起使用。烧瓶初始化文件init.py:

from flask import Flask
from flask_sqlalchemy import sqlAlchemy
from flask_migrate import Migrate
from flask_mail import Mail
from flask_script import Manager
from flask_socketio import SocketIO
from flask_mqtt import Mqtt
import eventlet
from config import DevelopConfig,MqttConfig,MailConfig

eventlet.monkey_patch()

app = Flask(__name__)
app.config.from_object(DevelopConfig)       # применение конфигурация разработчика
app.config.from_object(MqttConfig)          # конфигурация для работы с сервером MQTT
app.config.from_object(MailConfig)          # конфигурация для работы с email
db = sqlAlchemy(app)
migrate = Migrate(app,db)
mail = Mail(app)
manager = Manager(app,db)
socketio = SocketIO(app)
mqtt = Mqtt(app)

from app import models

if __name__ == "__main__":
    manager.run()

启动Flask应用程序是通过app.py文件完成的

# обработчик страницы профиля пользователя
# только для авторизованных пользователей
@app.route('/profile/<int:iduser>')
@login_required
def profile(iduser):
    user = Users.query.get(iduser)
    devices = user.devices.all()
    print(devices)
    if len(devices) != 0:
        mqttToaa = MqttTOAA(devices[1].device_code,devices[1].typedev.reverse)
    return render_template('profile.html',iduser=current_user.get_id())

if __name__ == '__main__':
    try:
        socketio.run(app,host='127.0.0.1',port=5000,use_reloader=False,debug=True)
    except socket.error as socketerror:
        print("Error socketio: " + socketerror)

用于MQTT服务器的文件

# -*- coding: utf-8 -*-

from app import socketio,mqtt
from flask import request
import eventlet
import eventlet.greenthread as greenthread
import json


class MqttTOAA(object):
    # топик контроля воротами забора,топик данных воротами забора,топик контроля гаражными,топик 
данных гаражными
    type_topic = ["/Control","/Data"]
    m_request_state = {"comm": "3"}  # запрос на получение статуса ворот
    m_start = {"Gate": "Start"}  # сообщение для открытия/закрытия ворот
    m_stop = {"Gate": "Stop"}  # сообщение для остановки ворот
    qos_request = 2
    qos_sub = 0
    # состояние ворот: действие
    dict_state_button_fence = {"con_Clos": "Открыть","con_Open": "Закрыть","fl_OpenClos": ("Остановить","Продолжить")}
    dict_state_button_garage = {"con_Clos": "Открыть","fl_OpenClos": ("Продолжить","Остановить","Прервать")}
    # действие: статус,отображаемый на странице
    dict_state_text_fence = {"Открыть": "закрыты","Закрыть": "открыты","Остановить": ("открываются","закрываются","в движении"),"Продолжить": "остановлены"}
    dict_state_text_garage = {"Открыть": "закрыты","Продолжить": "остановлены","Прервать": "закрываются","в движении")}
    # поля: итоговые статусы
    dict_type_element_fence = {"button": "","text": ""}  # текст кнопки и отображаемый статус
dict_type_element_garage = {"button": "","text": ""}
state_gate_fence = {}  # статус ворот забора
state_gate_garage = {}  # статус ворот гаража
initial_position_fence = ""  # первоначальное положение
position_garage = {"state": "","stop": False}  # предыдущая позиция ворот и отметка о том,были ли отсановленны
POOL_TIME = 3     # Интервал отправки запроса брокеру

def __init__(self,device_code,reverse):
    self.mqtt_connect = mqtt.on_connect()(self._handle_connect)
    self.mqtt_onmessage = mqtt.on_message()(self._handle_mqtt_message)
    self.mqtt_onlog = mqtt.on_log()(self._handle_logging)
    self.socketio_error = socketio.on_error()(self._handle_error)
    self.handle_change_state = socketio.on('change_state')(self._handle_change_state)
    self.device_code = "BK" + device_code
    self.reverse = reverse

# обработчик ошибок
def _handle_error(self):
    print(request.event["message"])  # "my error event"
    print(request.event["args"])  # (data,)


# функция изменения состояния ворот по нажатию
def _handle_change_state(self):
    message = None
    try:
        # для каких из ворот необходимо сменить состояние
        if self.reverse is not True:
            type_g = self.state_gate_fence
        else:
            type_g = self.state_gate_garage

        if type_g["fl_OpenClos"] == 1:  # ворота в движении -> остановка
            message = self.m_stop
        # остановились и двигаются в обратном направлении -> остановка
        elif (type_g["fl_OpenClos"] == 0) and (self.position_garage["state"] == "закрываются"):
            message = self.m_stop
            self.position_garage["state"] = "открываются"
        else:  # ворота остановленны -> продолжение движения
            message = self.m_start
        print(self.position_garage["state"])
        print(message)
    except Exception as ex:
        print(ex)
    mqtt.publish(self.device_code + self.type_topic[0],json.dumps(message),self.qos_request)

# передача запроса на получение данных
@staticmethod
def handle_publish(topic_req_res,m_req_state,qos_req,timer):
    while True:
        print("Send")
        eventlet.sleep(timer)
        msg = json.dumps(m_req_state)
        mqtt.publish(topic_req_res,msg,qos_req)

# ожидание подключения к брокеру,# затем подписка на топик и запуск потока для постоянной отсылки сообщений в топик Control
def _handle_connect(self,client,userdata,flags,rc):
    mqtt.subscribe(self.device_code + self.type_topic[1],self.qos_sub)
    print("Subscribe!")
    publish_thread = greenthread.spawn(self.handle_publish,self.device_code + self.type_topic[0],self.m_request_state,self.qos_request,self.POOL_TIME)

# обработка принятых сообщений от топика,на который подписан
def _handle_mqtt_message(self,message):
    print("Get message")
    data = dict(
        topic=message.topic,payload=message.payload.decode(),qos=message.qos,)
    try:
        data = json.loads(data['payload'])
        if self.reverse is not True:
            self.fence_msg(data)
        else:
            self.garage_msg(data)
    except Exception as ex:
        print("Exception: " + str(ex))

2条消息始终显示在日志中

16发送PINGREQ 16收到PINGRESP

代码挂起,等待装饰器被调用

mqtt.on_connect()

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)