问题描述
我正在开发一种启发式算法来为 NP(因此 cpu 密集型)问题找到“好的”解决方案。
我正在使用 Python 实现我的解决方案(我同意在考虑速度时它不是最佳选择,但确实如此)并且我将工作量分配到许多子进程中,每个子进程负责探索可能的解决方案的空间。
为了提高性能,我想分享在所有子进程之间执行每个子进程期间收集的一些信息。 收集此类信息的“明显”方法是将它们收集在字典中,字典的键是(冻结)整数集,值是整数列表(或集)。 因此,共享字典必须对每个子进程都可读和可写,但我可以有把握地预期读取会比写入频繁得多,因为子进程只有在找到“有趣”的东西时才会写入共享字典并读取字典更频繁地了解某个解决方案是否已经被其他进程评估过(以避免两次或更多次探索同一分支)。 我不希望这样的字典的维度超过 10 MB。
目前,我使用 multiprocessing.Manager()
的实例实现了共享字典,该实例负责处理对开箱即用的共享字典的并发访问。
但是(根据我的发现)这种共享数据的方式是使用进程之间的管道来实现的,这比普通和简单的共享内存慢得多(此外,字典必须在通过管道发送之前进行腌制,并在收到时取消腌制).
到目前为止我的代码是这样的:
# main.py
import multiprocessing as mp
import os
def worker(a,b,c,shared_dict):
while condition:
# do things
# sometimes reads from shared_dict to check if a candidate solution has already been evaluated by other process
# if not,evaluate it and store it inside the shared_dict together with some related info
return worker_result
def main():
with mp.Manager() as manager:
# setup params a,...
# ...
shared_dict = manager.dict()
n_processes = os.cpu_count()
with mp.Pool(processes=n_processes) as pool:
async_results = [pool.apply_async(worker,(a,shared_dict)) for _ in range(n_processes)]
results = [res.get() for res in async_results]
# gather the overall result from 'results' list
if __name__ == '__main__':
main()
为了避免管道造成的开销,我想使用共享内存,但 Python 标准库似乎没有提供一种直接的方法来处理共享内存中的字典。
据我所知,Python 标准库提供了帮助程序来将数据存储在共享内存中,仅用于标准 ctypes(使用 multiprocessing.Value
and multiprocessing.Array
)或让您访问 raw areas of shared memory。
我不想在共享内存的原始区域中实现我自己的哈希表,因为我既不是哈希表的专家,也不是并发编程的专家,相反,我想知道是否有其他更快的解决方案可以满足我的需求不需要从零开始写所有内容。 例如,我已经看到 ray library 允许以比使用管道更快的方式读取写入共享内存的数据,但是似乎一旦字典被序列化并写入共享内存区域,您就无法修改它。
有什么帮助吗?
解决方法
不幸的是,Ray 中的共享内存必须是不可变的。通常,建议您将 actor 用于可变状态。 (see here)。
您可以对演员进行一些技巧。例如,如果值是不可变的,您可以在 dict 中存储对象引用。那么 dict 本身不会在共享内存中,但它的所有对象都会在。
import matplotlib.pyplot as plt
import numpy as np
x,y = np.meshgrid(
np.arange(0,100,5),np.arange(0,sparse=False)
plt.scatter(x,y,c=x+y,s=70)