问题描述
我查看了Lib/multiprocessing/queues.py
安装在Python安装中的Queue类(Python
2.7,但与我简短检查过的Python 3.2的版本没有明显不同)。我了解它的工作原理:
队列对象维护两组对象。一组是由所有进程共享的多进程安全基元。其他每个进程分别创建和使用。
-
Pipe
两端都有一个对象另存为self._reader
和self._writer
。 - 一个
BoundedSemaphore
对象,该对象计算(并有选择地限制)队列中有多少个对象。 - 一个
Lock
用于读取管道对象,并在非Windows平台的另一个写作。(我认为这是因为在Windows上写入管道本质上是多进程安全的。)
每个进程对象在_after_fork
和_start_thread
方法中设置:
- 甲
collections.deque
对象用于缓冲写入管道。 -
threading.condition
当缓冲区不为空时用于发信号的对象。 - 一
threading.Thread
,做实际的写作对象。它是延迟创建的,因此直到在给定进程中请求对队列的至少一次写入之前,它才存在。 -
Finalize
进程结束时清理对象的各种对象。
get
队列中的A非常简单。您获得了读取锁,减少了信号量,并从Pipe的读取端获取了一个对象。
Aput
更复杂。它使用多个线程。调用者获取put
条件的锁,然后将其对象添加到缓冲区中并在解锁条件之前发出信号。如果信号量尚未运行,它还会增加信号量并启动它。
编写器线程在该_Feed
方法中永远循环(直到取消)。如果缓冲区为空,则等待notempty
条件。然后,它从缓冲区中获取一个项目,获取写锁(如果存在)并将该项目写入Pipe。
因此,鉴于所有这些,您可以对其进行修改以获取LIFO队列吗?这似乎并不容易。管道本质上是FIFO对象,尽管Queue不能整体保证FIFO的行为(由于来自多个进程的写入的异步特性),但它总是主要是FIFO。
如果只有一个使用者,则可以get
从队列中将对象添加到自己的本地进程堆栈中。尽管使用共享内存,但有一定限制的堆栈不会太难,因此很难做到多用户堆栈。您需要一个锁,一对条件(用于在完全状态和空状态下进行阻塞/信号发送),一个共享的整数值(用于保存的值的数量)和一个适当类型的共享数组(用于值本身)。
解决方法
有谁知道一种干净的方法来获得近LIFO或什至没有近FIFO(例如随机)行为multiprocessing.Queue
?
替代问题:有人可以指出我后面管理实际存储结构的线程的代码multiprocessing.Queue
吗?提供大约LIFO访问似乎很简单,但是我迷失在寻找它的兔子洞中。
笔记:
- 相信
multiprocessing.Queue
不能保证订单。精细。但是它是近FIFO的,所以近LIFO会很棒。 - 我可以将所有当前项目从队列中拉出,并在使用它们之前颠倒顺序,但是如果可能的话,我宁愿避免混乱。
(编辑)澄清一下:我正在使用CPU进行仿真,multiprocessing
因此不能使用中的专用队列Queue
。由于几天没有看到任何答案,因此我在上面添加了替代问题。
万一有问题,下面multiprocessing.Queue
是接近FIFO的一些证据。它只是表明在一个简单的情况下(单线程),它在我的系统上是完美的FIFO:
import multiprocessing as mp
import Queue
q = mp.Queue()
for i in xrange(1000):
q.put(i)
deltas = []
while True:
try:
value1 = q.get(timeout=0.1)
value2 = q.get(timeout=0.1)
deltas.append(value2-value1)
except Queue.Empty:
break
#positive deltas would indicate the numbers are coming out in increasing order
min_delta,max_delta = min(deltas),max(deltas)
avg_delta = sum(deltas)/len(deltas)
print "min",min_delta
print "max",max_delta
print "avg",avg_delta
打印:最小值,最大值和平均值恰好为1(完美的FIFO)