多重处理以Python返回大型数据集

问题描述

我在Python 3.7脚本中有2个函数,它们搜索2个单独的网络节点并在列表中返回非常大的字符串数据集。较小的数据集长度是〜300K条目,而较大的数据集长度是〜1.5M。由于该脚本必须如何编译数据集以及使第二个数据集显着更大,因此执行该脚本几乎需要一个小时。我没有办法通过更改编译方式来缩短运行时间,没有更简单的方法可以让我从网络节点获取数据。但是,如果可以同时运行它们,则可以节省近10分钟的时间,因此,我试图通过使用多处理来缩短运行时间,以便同时运行它们两者。

我不需要它们一定在同一秒内开始或在同一秒内完成,只希望它们在同一时间运行。

以下是首次尝试进行多处理编码的分类:

def p_func(arg1,arg2,pval):
     ## Do Stuff
    return pval

def s_func(arg1,sval):
    ## Do Stuff
    return sval

# Creating variables to get return values that multiprocessing can handle
pval = multiprocessing.Value(list)
sval = multiprocessing.Value(list)

# setting up multiprocessing Processes for each function and passing arguments
p1 = multiprocessing.Process(target=p_func,args=(arg1,pval))
s2 = multiprocessing.Process(target=s_func,args=(arg3,sval))
p1.start()
s1.start()
p1.join()
s1.join()

print("Number of values in pval: ",len(pval))
print("Number of values in sval: ",len(sval))

我相信我已经解决了list的问题,所以....

根据评论,我已更新代码,如下所示:

#! python3
import multiprocessing as mp
def p_func(arg1,pval):
    # takes arg1 and arg2 and queries network node to return list of ~300K
    # values and assigns that list to pval for return to main()
    return pval

def s_func(arg1,sval):
    # takes arg1 and queries network node to return list of ~1.5M 
    # values and assigns that list to sval for return to main()
    return sval

# Creating variables to get return values that multiprocessing can handle in 
# main()
with mp.Manager() as mgr
    pval = mgr.list()
    sval = mgr.list()

    # setting up multiprocessing Processes for each function and passing 
    # arguments
    p1 = mp.Process(target=p_func,pval))
    s1 = mp.Process(target=s_func,sval))
    p1.start()
    s1.start()
    p1.join()
    s1.join()

# out of with block
print("Number of values in pval: ",len(sval))

现在出现 TypeError:无法在p1.start()调用中腌制_thread.lock对象。我猜想我在p1声明中传递的变量之一导致了多处理问题,但是我不确定如何读取错误或解决问题。

解决方法

改为使用Manager.list()

import multiprocessing as mp

def p_func(pval):
    pval.extend(list(range(300000)))

def s_func(sval):
    sval.extend(list(range(1500000)))

if __name__ == '__main__':
    # Creating variables to get return values that mp can handle
    with mp.Manager() as mgr:
        pval = mgr.list()
        sval = mgr.list()

        # setting up mp Processes for each function and passing arguments
        p1 = mp.Process(target=p_func,args=(pval,))
        s2 = mp.Process(target=s_func,args=(sval,))
        p1.start()
        s2.start()
        p1.join()
        s2.join()

        print("Number of values in pval: ",len(pval))
        print("Number of values in sval: ",len(sval))

输出:

Number of values in pval:  300000
Number of values in sval:  1500000

Manager对象比共享内存慢,但更灵活。共享内存速度更快,因此,如果您知道数组的上限,则可以使用固定大小的共享内存Array和表示所用大小的共享值,例如:

#!python3
import multiprocessing as mp

def p_func(parr,psize):
    for i in range(10):
        parr[i] = i
    psize.value = 10

def s_func(sarr,ssize):
    for i in range(5):
        sarr[i] = i
    ssize.value = 5

if __name__ == '__main__':
    # Creating variables to get return values that mp can handle
    parr = mp.Array('i',2<<20) # 2M
    sarr = mp.Array('i',2<<20)
    psize = mp.Value('i',0)
    ssize = mp.Value('i',0)

    # setting up mp Processes for each function and passing arguments
    p1 = mp.Process(target=p_func,args=(parr,psize))
    s2 = mp.Process(target=s_func,args=(sarr,ssize))
    p1.start()
    s2.start()
    p1.join()
    s2.join()

    print("parr: ",parr[:psize.value])
    print("sarr: ",sarr[:ssize.value])

输出:

parr:  [0,1,2,3,4,5,6,7,8,9]
sarr:  [0,4]

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...