问题描述
我有一个 csv 文件,我将它读入一个 2D 列表,我想在 MPI (mpi4py) 中使用 scatter 方法将这个列表的不同块发送到不同的处理元素以如下处理它们:
df = []
with open("data_tiny.csv") as csv_file:
csv_reader = csv.reader(csv_file,delimiter=',')
for row in csv_reader:
df.append(row)
recvbuf = [[""] * (len(df[0])) for _ in range(math.ceil(len(df)//size))]
recvbuf= comm.scatter(df,root=0)
print('Rank: ',rank,',recvbuf received: ',recvbuf)
for t in recvbuf[:]:
if t[7] != 'o3':
recvbuf.remove(t)
comm.gather(recvbuf,df,root=0)
if rank == 0:
print('Rank: ',df)
我收到以下错误:
Traceback (most recent call last):
File "MPI_1.py",line 21,in <module>
recvbuf= comm.scatter(df,root=0)
File "mpi4py/MPI/Comm.pyx",line 1267,in mpi4py.MPI.Comm.scatter
File "mpi4py/MPI/msgpickle.pxi",line 730,in mpi4py.MPI.PyMPI_scatter
File "mpi4py/MPI/msgpickle.pxi",line 119,in mpi4py.MPI.Pickle.dumpv
ValueError: expecting 4 items,got 54
错误说 scatter 需要 4 个项目,得到 54(df(二维数组)的长度是 54,这就是为什么它说 scatter 得到 54)。我的问题是如何将 2d 列表传递给 scatter 方法(而不是通过使用 numpy)并在此处解决错误。
输入数据为9列54行数据如:
a,aa,aaa,aaaa,aaaaa,aaaaaa,ab,abb,abbb
a1,aa1,aaa1,aaaa1,aaaaa1,aaaaaa1,ab1,abb1,abbb1
a2,aa2,aaa2,aaaa2,aaaaa2,aaaaaa2,ab2,abb2,abbb2
a3,aa3,aaa3,aaaa3,aaaaa3,aaaaaa3,ab3,abb3,abbb3
.....
.....
解决方法
ValueError: 期望 4 个项目,得到 54 个
发生这种情况是因为分散例程:
recvbuf= comm.scatter(df,root=0)
期望 df
具有与正在运行的进程数相同的长度(即 comm.size)。
由于您正在运行 4 个进程并且 df
有 54 个元素,因此您会收到错误消息。
> ValueError: expecting 4 items,got 54
要解决这个问题,您需要打包 df
使其包含与进程数量一样多的元素,其中每个元素可以是一个数组,其中包含要发送到给定进程的元素。
例如,假设您正在运行 4 个进程,并且 df=[1,2,3,4,5,6,7,8]
您需要创建 df=[[1,2][3,4][5,6][7,8]]
。其中 df[0] 将转到进程 0,df[1] 将转到进程 1,依此类推。
一个可能的解决方案示例:
import csv
import math
from mpi4py import MPI
def split(a,n):
k,m = divmod(len(a),n)
return list(a[i * k + min(i,m):(i + 1) * k + min(i + 1,m)] for i in range(n))
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
df = []
if rank == 0:
with open("data_tiny.csv") as csv_file:
csv_reader = csv.reader(csv_file,delimiter=',')
for row in csv_reader:
df.append(row)
df = split(df,size)
recvbuf = comm.scatter(df,root=0)
print(recvbuf)