问题描述
该网站写在烧瓶中。我正在尝试使用config.py文件中编写的参数连接到MQTT代理:
class DevelopConfig(Config):
DEBUG = True
TESTING = True
ASSETS_DEBUG = True
class MqttConfig(object):
MQTT_broKER_URL = 'temper.chost.com.ua'
MQTT_broKER_PORT = 1883
MQTT_CLIENT_ID = 'flask_mqtt'
MQTT_CLEAN_SESSION = True
MQTT_USERNAME = 'test'
MQTT_PASSWORD = 'tester'
MQTT_KEEPALIVE = 2
MQTT_TLS_ENABLED = False
MQTT_LAST_WILL_TOPIC = 'home/lastwill'
MQTT_LAST_WILL_MESSAGE = 'bye'
MQTT_LAST_WILL_QOS = 2
连接需要很长的时间,或者根本不会发生。问题很可能是流的组织错误。我使用eventlet.greenthread模块创建线程,并使用Flask-MQTT与MQTT服务器一起使用。烧瓶初始化文件init.py:
from flask import Flask
from flask_sqlalchemy import sqlAlchemy
from flask_migrate import Migrate
from flask_mail import Mail
from flask_script import Manager
from flask_socketio import SocketIO
from flask_mqtt import Mqtt
import eventlet
from config import DevelopConfig,MqttConfig,MailConfig
eventlet.monkey_patch()
app = Flask(__name__)
app.config.from_object(DevelopConfig) # применение конфигурация разработчика
app.config.from_object(MqttConfig) # конфигурация для работы с сервером MQTT
app.config.from_object(MailConfig) # конфигурация для работы с email
db = sqlAlchemy(app)
migrate = Migrate(app,db)
mail = Mail(app)
manager = Manager(app,db)
socketio = SocketIO(app)
mqtt = Mqtt(app)
from app import models
if __name__ == "__main__":
manager.run()
启动Flask应用程序是通过app.py文件完成的
# обработчик страницы профиля пользователя
# только для авторизованных пользователей
@app.route('/profile/<int:iduser>')
@login_required
def profile(iduser):
user = Users.query.get(iduser)
devices = user.devices.all()
print(devices)
if len(devices) != 0:
mqttToaa = MqttTOAA(devices[1].device_code,devices[1].typedev.reverse)
return render_template('profile.html',iduser=current_user.get_id())
if __name__ == '__main__':
try:
socketio.run(app,host='127.0.0.1',port=5000,use_reloader=False,debug=True)
except socket.error as socketerror:
print("Error socketio: " + socketerror)
用于MQTT服务器的文件:
# -*- coding: utf-8 -*-
from app import socketio,mqtt
from flask import request
import eventlet
import eventlet.greenthread as greenthread
import json
class MqttTOAA(object):
# топик контроля воротами забора,топик данных воротами забора,топик контроля гаражными,топик
данных гаражными
type_topic = ["/Control","/Data"]
m_request_state = {"comm": "3"} # запрос на получение статуса ворот
m_start = {"Gate": "Start"} # сообщение для открытия/закрытия ворот
m_stop = {"Gate": "Stop"} # сообщение для остановки ворот
qos_request = 2
qos_sub = 0
# состояние ворот: действие
dict_state_button_fence = {"con_Clos": "Открыть","con_Open": "Закрыть","fl_OpenClos": ("Остановить","Продолжить")}
dict_state_button_garage = {"con_Clos": "Открыть","fl_OpenClos": ("Продолжить","Остановить","Прервать")}
# действие: статус,отображаемый на странице
dict_state_text_fence = {"Открыть": "закрыты","Закрыть": "открыты","Остановить": ("открываются","закрываются","в движении"),"Продолжить": "остановлены"}
dict_state_text_garage = {"Открыть": "закрыты","Продолжить": "остановлены","Прервать": "закрываются","в движении")}
# поля: итоговые статусы
dict_type_element_fence = {"button": "","text": ""} # текст кнопки и отображаемый статус
dict_type_element_garage = {"button": "","text": ""}
state_gate_fence = {} # статус ворот забора
state_gate_garage = {} # статус ворот гаража
initial_position_fence = "" # первоначальное положение
position_garage = {"state": "","stop": False} # предыдущая позиция ворот и отметка о том,были ли отсановленны
POOL_TIME = 3 # Интервал отправки запроса брокеру
def __init__(self,device_code,reverse):
self.mqtt_connect = mqtt.on_connect()(self._handle_connect)
self.mqtt_onmessage = mqtt.on_message()(self._handle_mqtt_message)
self.mqtt_onlog = mqtt.on_log()(self._handle_logging)
self.socketio_error = socketio.on_error()(self._handle_error)
self.handle_change_state = socketio.on('change_state')(self._handle_change_state)
self.device_code = "BK" + device_code
self.reverse = reverse
# обработчик ошибок
def _handle_error(self):
print(request.event["message"]) # "my error event"
print(request.event["args"]) # (data,)
# функция изменения состояния ворот по нажатию
def _handle_change_state(self):
message = None
try:
# для каких из ворот необходимо сменить состояние
if self.reverse is not True:
type_g = self.state_gate_fence
else:
type_g = self.state_gate_garage
if type_g["fl_OpenClos"] == 1: # ворота в движении -> остановка
message = self.m_stop
# остановились и двигаются в обратном направлении -> остановка
elif (type_g["fl_OpenClos"] == 0) and (self.position_garage["state"] == "закрываются"):
message = self.m_stop
self.position_garage["state"] = "открываются"
else: # ворота остановленны -> продолжение движения
message = self.m_start
print(self.position_garage["state"])
print(message)
except Exception as ex:
print(ex)
mqtt.publish(self.device_code + self.type_topic[0],json.dumps(message),self.qos_request)
# передача запроса на получение данных
@staticmethod
def handle_publish(topic_req_res,m_req_state,qos_req,timer):
while True:
print("Send")
eventlet.sleep(timer)
msg = json.dumps(m_req_state)
mqtt.publish(topic_req_res,msg,qos_req)
# ожидание подключения к брокеру,# затем подписка на топик и запуск потока для постоянной отсылки сообщений в топик Control
def _handle_connect(self,client,userdata,flags,rc):
mqtt.subscribe(self.device_code + self.type_topic[1],self.qos_sub)
print("Subscribe!")
publish_thread = greenthread.spawn(self.handle_publish,self.device_code + self.type_topic[0],self.m_request_state,self.qos_request,self.POOL_TIME)
# обработка принятых сообщений от топика,на который подписан
def _handle_mqtt_message(self,message):
print("Get message")
data = dict(
topic=message.topic,payload=message.payload.decode(),qos=message.qos,)
try:
data = json.loads(data['payload'])
if self.reverse is not True:
self.fence_msg(data)
else:
self.garage_msg(data)
except Exception as ex:
print("Exception: " + str(ex))
2条消息始终显示在日志中
16发送PINGREQ 16收到PINGRESP
mqtt.on_connect()
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)