Python Paho MQTT 订阅者卡住了 - influxdb write

问题描述

我使用 paho mqtt 模块发布了一个主题,并从另一个程序订阅了它。 我正在发布发布者能够在大约 2 秒内发送的 10000 条消息。在订阅者中,我收到消息并将值写入 influxdb。大约 2000 条记录后,MQTT 订阅者停止并等待 time.sleep() 完成。

import paho.mqtt.client as mqtt #import the client1
import time
from datetime import datetime
from influxdb_client import InfluxDBClient,Point,Dialect,WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS

org = "my-ord"
bucket = "Bucket1"
token = "my-token"
client = InfluxDBClient(url="http://localhost:8086",token=token,org=org)

write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
#Function to write the record to influx
def update_db(point):
     
    write_api.write(bucket=bucket,record=point)
    print("Point written")

msg_count = 0   
#On message callback function
def on_message(client,userdata,message):
    global msg_count
    msg_count+=1
    print("message received ",str(message.payload))

    _point1 = Point("mqtt2").tag("message","message").field("datapt",str(message.payload))

    update_db(_point1)

    print(msg_count)
#This is the Subscriber
ip = "localhost" 
client = mqtt.Client("P2")
client.on_message=on_message
client.connect(ip) 
client.loop_start()
client.subscribe("influx")

time.sleep(180)

client.loop_stop()
print(msg_count)

发布者在一秒钟内发布了 10000 条消息。如果没有 influx write 命令,代码会一直运行到最后。当我包含写入时,订阅者在大约 2000 条消息后停止。我应该更改什么才能使其正常工作?

解决方法

涌入写入将是一个阻塞调用,您不应该在 MQTT 客户端回调中进行阻塞调用,因为它们会阻止客户端处理新传入的消息。

如果您需要因传入消息而阻塞 IO,您应该将这项工作交给一个单独的线程。