python大数据多处理失败

问题描述

通过运行 .py 文件在Windows中使用 multiprocessing.Pool

我的python代码,对于较小的数据卡盘可以正常运行,但对于大数据量,也可以在运行之前运行给出每个池的输出,然后给出该错误我该怎么办?我没话多说了,很抱歉,stackoverflow没有让我在代码格式中放入这个长错误 错误:

Traceback (most recent call last):
  File "C:\Users\USER GOOD\AppData\Local\Programs\Python\Python38-32\lib\multiprocessing\pool.py",line 131,in worker
    put((job,i,result))
  File "C:\Users\USER GOOD\AppData\Local\Programs\Python\Python38-32\lib\multiprocessing\queues.py",line 362,in put
    obj = _ForkingPickler.dumps(obj)
  File "C:\Users\USER GOOD\AppData\Local\Programs\Python\Python38-32\lib\multiprocessing\reduction.py",line 51,in dumps
    cls(buf,protocol).dump(obj)
MemoryError

在处理上述异常期间,发生了另一个异常:

  File "C:\Users\USER GOOD\AppData\Local\Programs\Python\Python38-32\lib\multiprocessing\process.py",line 315,in _bootstrap
    self.run()
  File "C:\Users\USER GOOD\AppData\Local\Programs\Python\Python38-32\lib\multiprocessing\process.py",line 108,in run
    self._target(*self._args,**self._kwargs)
  File "C:\Users\USER GOOD\AppData\Local\Programs\Python\Python38-32\lib\multiprocessing\pool.py",line 133,in worker
    wrapped = MaybeEncodingError(e,result[1])
  File "C:\Users\USER GOOD\AppData\Local\Programs\Python\Python38-32\lib\multiprocessing\pool.py",line 86,in __init__
    self.value = repr(value)
MemoryError
Process SpawnPoolWorker-4:
Traceback (most recent call last):
  File "C:\Users\USER GOOD\AppData\Local\Programs\Python\Python38-32\lib\multiprocessing\pool.py",in __init__
    self.value = repr(value)
MemoryError
Process SpawnPoolWorker-10:
Traceback (most recent call last):
  File "C:\Users\USER GOOD\AppData\Local\Programs\Python\Python38-32\lib\multiprocessing\pool.py",in __init__
    self.value = repr(value)
MemoryError
Process SpawnPoolWorker-6:
Traceback (most recent call last):
  File "C:\Users\USER GOOD\AppData\Local\Programs\Python\Python38-32\lib\multiprocessing\pool.py",in __init__
    self.value = repr(value)
MemoryError
Process SpawnPoolWorker-8:
Traceback (most recent call last):
  File "C:\Users\USER GOOD\AppData\Local\Programs\Python\Python38-32\lib\multiprocessing\pool.py",in __init__
    self.value = repr(value)
MemoryError
Process SpawnPoolWorker-1:
Traceback (most recent call last):
  File "C:\Users\USER GOOD\AppData\Local\Programs\Python\Python38-32\lib\multiprocessing\pool.py",in __init__
    self.value = repr(value)
MemoryError
Process SpawnPoolWorker-5:
Traceback (most recent call last):
  File "C:\Users\USER GOOD\AppData\Local\Programs\Python\Python38-32\lib\multiprocessing\pool.py",in __init__
    self.value = repr(value)
MemoryError
Process SpawnPoolWorker-7:
Traceback (most recent call last):
  File "C:\Users\USER GOOD\AppData\Local\Programs\Python\Python38-32\lib\multiprocessing\pool.py",in __init__
    self.value = repr(value)
MemoryError
Process SpawnPoolWorker-9:
Traceback (most recent call last):
  File "C:\Users\USER GOOD\AppData\Local\Programs\Python\Python38-32\lib\multiprocessing\pool.py",in __init__
    self.value = repr(value)
MemoryError
Process SpawnPoolWorker-3:
Traceback (most recent call last):
  File "C:\Users\USER GOOD\AppData\Local\Programs\Python\Python38-32\lib\multiprocessing\pool.py",in __init__
    self.value = repr(value)
MemoryError

代码是这样的

import math
import os
import numpy as np
import time
import multiprocessing
import csv
def dadesazlistfake11(dlist):
    ...some data manupilation...
    return zlist ###gives an np array
def subu(remo,slist):
    rawnp=np.zeros(1) ##in order to release some memory from each pool
    periodend=len(slist)-24
    remlist=dadesazlistfake11(slist[575-575:575+25])
    for l in range(576,periodend):
        remlist=np.vstack((remlist,dadesazlistfake11(slist[l-575:l+25])))
        if (l-576)%4000==0:
            print(f'{remo,l-575}')
            ftime=time.perf_counter()
            print(f'in{ftime-stime}s finished')
    return remlist

columha和splito是一些初始化的数组/列表

columha=[[],[],[]]
s=0
l=0
###creating columha
...
##columha is some 4*75 array
def splito(a,b,c,d,e):
    #bazehaye entehaei nemikhahim tu loop bashan
    period=np.zeros(e+1).astype(int)
    period[0]= a+c-1 #575
    period[e]= b-d-1 #489688
    delta=int(math.floor((period[e]-period[0])/e))
    for i in range(1,e):
        period[i]=period[0]+i*delta
    return period

读取数据:


os.chdir(r'C:\Users\...')
my_file = open("dsa.csv",'r')
reader = csv.reader(my_file,delimiter=',')
rawlist = list(reader)
rawlist=rawlist[1:len(rawlist)]
for i in range(len(rawlist)):
    rawlist[i]=rawlist[i][2:]
    for j in range(len(rawlist[i])):
        rawlist[i][j]=float(rawlist[i][j])
rawlist=rawlist[0:800] ##small version
rawnp=np.asarray(rawlist)
rawlist=[] ##in order to release some memory 

代码开始:

thread_num=10
period=splito(0,len(rawnp),96*6,24,thread_num)
tupleinput=[tuple([i,rawnp[period[i]-575:period[i+1]+24]]) for i in range(thread_num)]
stime=time.perf_counter()
if __name__=="__main__":
    with multiprocessing.Pool(thread_num) as pool:
        wholelist = list(pool.starmap(subu,tupleinput))
#         wholenp = np.vstack((wholenp,pool.starmap(subu,tupleinput)))
    rawnp=np.zeros(1) ##in order to release some memory 
    shape0=-1,869
#     wholenp=np.asarray(wholelist[0])#.reshape(shape0)
    wholenp=wholelist[0]
    for i in range(1,len(wholelist)):
#         wholenp=np.vstack((wholenp,np.asarray(wholelist[i]).reshape(shape0)))
        wholenp=np.vstack((wholenp,wholelist[i]))
        wholelist[i]=np.zeros(1)
#         wholelist[i]=[]
    print('lennp',len(wholenp))
    ftime=time.perf_counter()
    print(f'total process in{ftime-stime}s finished')
    input()

我还尝试了只有3个内核的代码,并且发生了相同的错误

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...