问题描述
我使用 paho mqtt 模块发布了一个主题,并从另一个程序订阅了它。 我正在发布发布者能够在大约 2 秒内发送的 10000 条消息。在订阅者中,我收到消息并将值写入 influxdb。大约 2000 条记录后,MQTT 订阅者停止并等待 time.sleep() 完成。
import paho.mqtt.client as mqtt #import the client1
import time
from datetime import datetime
from influxdb_client import InfluxDBClient,Point,Dialect,WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS
org = "my-ord"
bucket = "Bucket1"
token = "my-token"
client = InfluxDBClient(url="http://localhost:8086",token=token,org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
#Function to write the record to influx
def update_db(point):
write_api.write(bucket=bucket,record=point)
print("Point written")
msg_count = 0
#On message callback function
def on_message(client,userdata,message):
global msg_count
msg_count+=1
print("message received ",str(message.payload))
_point1 = Point("mqtt2").tag("message","message").field("datapt",str(message.payload))
update_db(_point1)
print(msg_count)
#This is the Subscriber
ip = "localhost"
client = mqtt.Client("P2")
client.on_message=on_message
client.connect(ip)
client.loop_start()
client.subscribe("influx")
time.sleep(180)
client.loop_stop()
print(msg_count)
发布者在一秒钟内发布了 10000 条消息。如果没有 influx write 命令,代码会一直运行到最后。当我包含写入时,订阅者在大约 2000 条消息后停止。我应该更改什么才能使其正常工作?
解决方法
涌入写入将是一个阻塞调用,您不应该在 MQTT 客户端回调中进行阻塞调用,因为它们会阻止客户端处理新传入的消息。
如果您需要因传入消息而阻塞 IO,您应该将这项工作交给一个单独的线程。