问题描述
我正在寻找可用于在 multiprocess.Process
es 之间进行通信的 python 3.6(这个确切版本)的消息队列实现,具体来说,它应该是多生产者、单消费者、具有优先接收权的 fifo应用程序特定类型的消息(例如,如果队列中间有一条系统消息(用 erlang 术语),而队列头部有一条普通消息,则下一次接收应返回系统消息而不是普通消息)
但我怀疑会有这样的库,所以问题变成了,是否有任何 stdlib 或第三方库给了我一大块共享内存或更好的列表,这样我就可以读写支持的缓冲区,但是内存/列表并使用诸如 mp.Lock
?
multiprocessing.Manager
使用 tcp,并启动一个新进程
解决方法
我对 Erlang 不太熟悉,但是,根据您描述需求的方式,我认为您可以采用使用 multiprocessing.Queue
并在阅读消息之前对消息进行排序的方法。
这个想法是为每个进程都有一个 multiprocessing.Queue
(FIFO 消息队列)。当进程 A 向进程 B 发送消息时,进程 A 将其消息连同消息的优先级放入进程 B 的消息队列中。当进程读取其消息时,它会将消息从 FIFO 队列传输到列表中,然后在处理消息之前对列表进行排序。消息首先按优先级排序,然后是它们到达消息队列的时间。
这是在 Windows 上使用 Python 3.6 测试过的示例。
from multiprocessing import Process,Queue
import queue
import time
def handle_messages(process_id,message_queue):
# Keep track of the message number to ensure messages with the same priority
# are read in a FIFO fashion.
message_no = 0
messages = []
while True:
try:
priority,contents = message_queue.get_nowait()
messages.append((priority,message_no,contents))
message_no+=1
except queue.Empty:
break
# Handle messages in correct order.
for message in sorted(messages):
print("{}: {}".format(process_id,message[-1]))
def send_message_with_priority(destination_queue,message,priority):
# Send a message to a destination queue with a specified priority.
destination_queue.put((-priority,message))
def process_0(my_id,queues):
while True:
# Do work
print("Doing work...")
time.sleep(5)
# Receive messages
handle_messages(my_id,queues[my_id])
def process_1(my_id,queues):
message_no = 0
while True:
# Do work
time.sleep(1)
# Receive messages
handle_messages(my_id,queues[my_id])
send_message_with_priority(queues[0],"This is message {} from process {}".format(message_no,my_id),1)
message_no+=1
def process_2(my_id,queues):
message_no = 0
while True:
# Do work
time.sleep(3)
# Receive messages
handle_messages(my_id,"This is urgent message {} from process {}".format(message_no,2)
message_no+=1
if __name__ == "__main__":
qs = {i: Queue() for i in range(3)}
processes = [Process(target=p,args=(i,qs)) for i,p in enumerate([process_0,process_1,process_2])]
for p in processes:
p.start()
for p in processes:
p.join()