问题描述
我正在尝试使用队列将 SharedMemory 引用传递给已在运行的进程。问题是,一旦我在另一个进程上接收(或获取)了 SharedMemory 对象,对应的内存块似乎根本不匹配,甚至大小也太大了。
import numpy as np
import multiprocessing as mp
from multiprocessing.shared_memory import SharedMemory
def f(q):
shared_memory = q.get()
print(f"In Process: {shared_memory=}")
x = np.frombuffer(buffer=shared_memory.buf,dtype=np.float64)
print(f"In Process: {x=}")
if __name__ == '__main__':
temp_array = np.arange(8)
print(f"Main: {temp_array=}")
smh = SharedMemory(create=True,size=temp_array.nbytes)
print(f"Main: {smh=}")
fix_array = np.frombuffer(buffer=smh.buf,dtype=temp_array.dtype)
fix_array[:] = temp_array[:]
print(f"Main: {fix_array=}")
queue = mp.Queue()
proc = mp.Process(target=f,args=(queue,))
proc.start()
queue.put(smh)
Main: temp_array=array([0,1,2,3,4,5,6,7])
Main: smh=SharedMemory('wnsm_2202c81b',size=32)
Main: fix_array=array([0,0])
In Process: shared_memory=SharedMemory('wnsm_2202c81b',size=4096)
In Process: x=array([0.,(weird very small numbers and many many zeros...),0.])
我希望找回原来的temp_array=array([0,7])
?
根据文档,可能是内存大小不匹配。此外,我使用一个包含 1e6 项的数组对其进行了测试,仅传递 SharedMemory 的名称并使用管道代替队列但仍然相同。
(我使用的是 Windows 10 Build 19043,Python 3.9.6 64 位)
解决方法
感谢@Timus
我认为最好把它分成两个问题来解决:
问题 1, 奇怪的数字:
如果你通过 x = np.frombuffer(buffer=shared_memory.buf,dtype=np.int32) 调整 f 的定义,你会得到你的数字(这是初始类型)。
正如@Timus 指出的那样,错误是数据类型不匹配:
np.arange()
返回带有 np.ndarray
的 dtype=np.int32
,但我试图获取带有 dtype=np.float64
的数组,因此结果错误。
修复:
@Timus 的解决方案或添加 dtype=np.float64
作为 np.arange()
的参数,使其显示为:
temp_array = np.arange(8,dtype=np.float)
问题 2, 数组太长:
根据Python Docs,SharedMemory.size
可能比原来大。因此,数组的长度也可能会有所不同。
修复/解决方法:
将数组修剪为原始大小,例如通过使用 numpy.resize()
。为此,原始 shape
也需要传递给 f()
。虽然对我来说很好,但以下几点可能对其他人来说是个问题:由于 x
只是缓冲区的视图,因此 np.ndarray.resize()
不可用(它不拥有自己的数据)。使用numpy.resize()
,将制作一个副本并且对调整大小的副本所做的更改不会反映在主进程中!为了适应这种情况,可以将 x_resized
的值复制回 x
。
固定代码现在看起来像这样:
import multiprocessing as mp
from multiprocessing.shared_memory import SharedMemory
import numpy as np
def f(q):
shared_memory,shape = q.get() # the shape is passed here
x = np.frombuffer(buffer=shared_memory.buf,dtype=np.float64) # dtype matches
# x = np.trim_zeros(x,"b"),this doesn't work if there are zeros in the dataset
x_resized = np.resize(x,new_shape=shape) # changes not reflected on main process
###
# make things to x_resized
###
x[:8] = x_resized[:] # copy changes back to x
if __name__ == '__main__':
temp_array = np.arange(8,dtype=np.float64) # dtype is correctly specified
smh = SharedMemory(create=True,size=temp_array.nbytes)
fix_array = np.frombuffer(buffer=smh.buf,dtype=temp_array.dtype)
fix_array[:] = temp_array[:]
queue = mp.Queue()
proc = mp.Process(target=f,args=(queue,))
proc.start()
queue.put((smh,temp_array.shape)) # passing the original shape
奇怪的是,虽然第二个进程中的 x
太长了,但回到主进程中 fix_array
仍然保持正确的大小......