如何同时运行多个发布者paho-mqtt?

问题描述

我正在运行一个脚本,有多个客户端发布到同一主题,每个客户端有5条消息,但它们是一个一个地执行的。我想知道是否有办法同时执行多个发布者,而不是像我编程时那样执行“ for”循环。

我想到要运行各种python脚本,但是如果我想拥有100个发布者,那么它就无法正常工作。有人知道我该怎么做吗?预先感谢

import ssl
import time 
import random
import sys  
import paho.mqtt.publish as publish
import paho.mqtt.client as mqtt
from datetime import datetime

broker_address = '127.0.0.1'
topic = "casa/hab1"
port=1883
delay=0.2
count=0
i=0
j=0
nclients=1
nmessages=5


for i in range(nclients):
  cname="Client"+str(i) 
  j=int(time.time()) #eliminar la parte decimal
  j=str(j)
  client_id=cname+str(j)+"_" #generar client_id
  client=mqtt.Client(client_id) 
  client.connect(broker_address)  
  print("")

  print(str(client_id))
  client.loop_start() 

  for count in range(nmessages):
     b=random.randrange(10,99,1)
     mensaje="Hello World:"+ str(b)+" -- "
     client.publish(topic,mensaje,2,retain=False) 
     Now=datetime.Now().strftime("%Y-%m-%d %H:%M:%s.%f")
     print("timestamp "+str(count)+" = "+Now)
     
     count+=1
     time.sleep(delay)

  i+=1

解决方法

通常要同时运行许多功能,您将需要使用threadingmultiprocessing在单独的thread / process中运行每个功能。

但是client.loop_start()已经运行thread,因此您可以先创建许多客户端,

all_clients = []

for i in range(nclients):
    t = int(time.time())

    client_id= "Client_{}_{}_".format(i,t) 
    print('create:',client_id)
    
    client = mqtt.Client(client_id) 
    client.connect(broker_address)  
    client.loop_start() 

    all_clients.append([client_id,client])

然后在发送消息的循环中使用它们

for count in range(nmessages):

    for client_id,client in all_clients: 
        b = random.randrange(10,99,1)
        mensaje = "Hello World: {} -- ".format(b)
        client.publish(topic,mensaje,2,retain=False) 
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
        print('client:',client_id,'| count:',count,"=",now)

    time.sleep(delay)

它将几乎在同一时间发送(+ -0.01s),但是在单独的client / thread中运行每个process会变得很难以这么小的延迟运行消息。


import time 
import random
import paho.mqtt.publish as publish
import paho.mqtt.client as mqtt
from datetime import datetime

broker_address = '127.0.0.1'
#broker_address = '192.168.1.91'

topic = "casa/hab1"
delay = 0.2

nclients = 3
nmessages = 5

# --- first create all clients ----

all_clients = []
for i in range(nclients):
    t = int(time.time())

    client_id= "Client_{}_{}_".format(i,client])

# ---

print()

# --- loop ---

for count in range(nmessages):

    for client_id,now)

    time.sleep(delay)

结果:

create: Client_0_1600159539_
create: Client_1_1600159539_
create: Client_2_1600159539_

client: Client_0_1600159539_ | count: 0 = 2020-09-15 10:45:39.125615
client: Client_1_1600159539_ | count: 0 = 2020-09-15 10:45:39.126737
client: Client_2_1600159539_ | count: 0 = 2020-09-15 10:45:39.128049
client: Client_0_1600159539_ | count: 1 = 2020-09-15 10:45:39.329731
client: Client_1_1600159539_ | count: 1 = 2020-09-15 10:45:39.330702
client: Client_2_1600159539_ | count: 1 = 2020-09-15 10:45:39.332028
client: Client_0_1600159539_ | count: 2 = 2020-09-15 10:45:39.533360
client: Client_1_1600159539_ | count: 2 = 2020-09-15 10:45:39.534323
client: Client_2_1600159539_ | count: 2 = 2020-09-15 10:45:39.535380
client: Client_0_1600159539_ | count: 3 = 2020-09-15 10:45:39.737049
client: Client_1_1600159539_ | count: 3 = 2020-09-15 10:45:39.738118
client: Client_2_1600159539_ | count: 3 = 2020-09-15 10:45:39.739249
client: Client_0_1600159539_ | count: 4 = 2020-09-15 10:45:39.941419
client: Client_1_1600159539_ | count: 4 = 2020-09-15 10:45:39.943207
client: Client_2_1600159539_ | count: 4 = 2020-09-15 10:45:39.944785

编辑:

threading相同。

在此版本中,您可以在每个delay中使用随机client,以使所有流量更加随机。

import time 
import random
import paho.mqtt.publish as publish
import paho.mqtt.client as mqtt
from datetime import datetime
import threading

broker_address = '127.0.0.1'
#broker_address = '192.168.1.91'

topic = "casa/hab1"
delay = 0.2

nclients = 3
nmessages = 5

# --- functions ---

def sending(client,client_id):

    for count in range(nmessages):

        b = random.randrange(10,retain=False) 
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
        print('client: {} | count: {} = {}'.format(client_id,now))
        #delay = random.randint(1,5) / 10

        time.sleep(delay)

# --- first create all clients ----

all_clients = []
for i in range(nclients):
    t = int(time.time())

    client_id= "Client_{}_{}_".format(i,client])

# ---

print()

# --- threads ---

all_threads = []

# start threads
for client_id,client in all_clients:
    t = threading.Thread(target=sending,args=(client,client_id))
    t.start()
    all_threads.append(t)
    
# ... other code ...

# at the end wait for end of threads
for t in all_threads:
    t.join()