问题描述
我正在运行一个脚本,有多个客户端发布到同一主题,每个客户端有5条消息,但它们是一个接一个地执行的。我想知道是否有办法同时执行多个发布者,而不是像我编程时那样执行“ for”循环。
我想到要运行各种python脚本,但是如果我想拥有100个发布者,那么它就无法正常工作。有人知道我该怎么做吗?预先感谢
import ssl
import time
import random
import sys
import paho.mqtt.publish as publish
import paho.mqtt.client as mqtt
from datetime import datetime
broker_address = '127.0.0.1'
topic = "casa/hab1"
port=1883
delay=0.2
count=0
i=0
j=0
nclients=1
nmessages=5
for i in range(nclients):
cname="Client"+str(i)
j=int(time.time()) #eliminar la parte decimal
j=str(j)
client_id=cname+str(j)+"_" #generar client_id
client=mqtt.Client(client_id)
client.connect(broker_address)
print("")
print(str(client_id))
client.loop_start()
for count in range(nmessages):
b=random.randrange(10,99,1)
mensaje="Hello World:"+ str(b)+" -- "
client.publish(topic,mensaje,2,retain=False)
Now=datetime.Now().strftime("%Y-%m-%d %H:%M:%s.%f")
print("timestamp "+str(count)+" = "+Now)
count+=1
time.sleep(delay)
i+=1
解决方法
通常要同时运行许多功能,您将需要使用threading
或multiprocessing
在单独的thread
/ process
中运行每个功能。
但是client.loop_start()
已经运行thread
,因此您可以先创建许多客户端,
all_clients = []
for i in range(nclients):
t = int(time.time())
client_id= "Client_{}_{}_".format(i,t)
print('create:',client_id)
client = mqtt.Client(client_id)
client.connect(broker_address)
client.loop_start()
all_clients.append([client_id,client])
然后在发送消息的循环中使用它们
for count in range(nmessages):
for client_id,client in all_clients:
b = random.randrange(10,99,1)
mensaje = "Hello World: {} -- ".format(b)
client.publish(topic,mensaje,2,retain=False)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
print('client:',client_id,'| count:',count,"=",now)
time.sleep(delay)
它将几乎在同一时间发送(+ -0.01s),但是在单独的client
/ thread
中运行每个process
会变得很难以这么小的延迟运行消息。
import time
import random
import paho.mqtt.publish as publish
import paho.mqtt.client as mqtt
from datetime import datetime
broker_address = '127.0.0.1'
#broker_address = '192.168.1.91'
topic = "casa/hab1"
delay = 0.2
nclients = 3
nmessages = 5
# --- first create all clients ----
all_clients = []
for i in range(nclients):
t = int(time.time())
client_id= "Client_{}_{}_".format(i,client])
# ---
print()
# --- loop ---
for count in range(nmessages):
for client_id,now)
time.sleep(delay)
结果:
create: Client_0_1600159539_
create: Client_1_1600159539_
create: Client_2_1600159539_
client: Client_0_1600159539_ | count: 0 = 2020-09-15 10:45:39.125615
client: Client_1_1600159539_ | count: 0 = 2020-09-15 10:45:39.126737
client: Client_2_1600159539_ | count: 0 = 2020-09-15 10:45:39.128049
client: Client_0_1600159539_ | count: 1 = 2020-09-15 10:45:39.329731
client: Client_1_1600159539_ | count: 1 = 2020-09-15 10:45:39.330702
client: Client_2_1600159539_ | count: 1 = 2020-09-15 10:45:39.332028
client: Client_0_1600159539_ | count: 2 = 2020-09-15 10:45:39.533360
client: Client_1_1600159539_ | count: 2 = 2020-09-15 10:45:39.534323
client: Client_2_1600159539_ | count: 2 = 2020-09-15 10:45:39.535380
client: Client_0_1600159539_ | count: 3 = 2020-09-15 10:45:39.737049
client: Client_1_1600159539_ | count: 3 = 2020-09-15 10:45:39.738118
client: Client_2_1600159539_ | count: 3 = 2020-09-15 10:45:39.739249
client: Client_0_1600159539_ | count: 4 = 2020-09-15 10:45:39.941419
client: Client_1_1600159539_ | count: 4 = 2020-09-15 10:45:39.943207
client: Client_2_1600159539_ | count: 4 = 2020-09-15 10:45:39.944785
编辑:
与threading
相同。
在此版本中,您可以在每个delay
中使用随机client
,以使所有流量更加随机。
import time
import random
import paho.mqtt.publish as publish
import paho.mqtt.client as mqtt
from datetime import datetime
import threading
broker_address = '127.0.0.1'
#broker_address = '192.168.1.91'
topic = "casa/hab1"
delay = 0.2
nclients = 3
nmessages = 5
# --- functions ---
def sending(client,client_id):
for count in range(nmessages):
b = random.randrange(10,retain=False)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
print('client: {} | count: {} = {}'.format(client_id,now))
#delay = random.randint(1,5) / 10
time.sleep(delay)
# --- first create all clients ----
all_clients = []
for i in range(nclients):
t = int(time.time())
client_id= "Client_{}_{}_".format(i,client])
# ---
print()
# --- threads ---
all_threads = []
# start threads
for client_id,client in all_clients:
t = threading.Thread(target=sending,args=(client,client_id))
t.start()
all_threads.append(t)
# ... other code ...
# at the end wait for end of threads
for t in all_threads:
t.join()