如何有效地使用带有 python mpi4py 并行性的 Pandas?

问题描述

我正在尝试对不同目录中的 Pandas 数据框进行一些操作。步骤是-

  1. 转到每个特定目录并使用 glob 搜索特定文件
  2. 使用 bash 脚本进行一些编辑并将其转换为更合理的数据文件
  3. 接下来,将数据转换为“CSV”,然后开始使用 Pandas
  4. 最后,使用多处理 (poo.map) 执行使用多个处理器的操作。

然而,问题在于数据帧的大小(650065 行),多处理仍然很慢。我想知道是否有人知道如何用 mpi4py 有效地替换它。

这是代码-

import time
import pandas as pd
import numpy as np
import multiprocessing
import subprocess
import os
import sys
import glob,os
import mpi4py
from mpi4py import MPI
import numpy as np

# get number of processors and processor rank
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

results = []
z_values = []
id_list = []
def processData(df):
        #         df = pd.read_csv(file) 

        # First find the rows where N present that is atom id 64.0 and 65.0
        n1 = np.array(np.where(df.id == 64.0))
        n2 = np.array(np.where(df.id == 65.0))

        # Determine total length of the file i.e number of structures
        lenn1 =n1.size
        lenn2 =n2.size

        # Create list of the N1 and N2 rows 
        n1id = [i.tolist() for i in n1]
        n2id = [j.tolist() for j in n2]

        # Create numpy array of x,y,z of each atom of each row
        df1 = np.array(df[['x','y','z']].loc[n1id[0]])
        df2 = np.array(df[['x','z']].loc[n2id[0]])
        # Function to iterate over structures and compute the distance for each pair of rows
        dist_list = []
        for i in np.arange(0,lenn1-1):
                dist = np.linalg.norm(df2[i]-df1[i])
                dist_list.append([i,dist])

        # largest
        # value1=sorted(dist_list,key=lambda x: x[1],reverse=True)[0][1]
        # value1_id=sorted(dist_list,reverse=True)[0][0]

        # # Second largest 
        # value2=sorted(dist_list,reverse=True)[1][1]
        # value2_id=sorted(dist_list,reverse=True)[1][0]
        # return value1_id,value1,df1[value1_id],df2[value1_id],value2_id,value2,df1[value2_id],df2[value2_id]

        for i in range(0,10000,100):
                value=sorted(dist_list,reverse=True)[i][1]
                value_id=sorted(dist_list,reverse=True)[i][0]
                results.append(value)
                id_list.append(value_id)
                z_values.append(df2[value_id][2])
                re_zip=zip(id_list,results,z_values)
                zip_list = list(re_zip)
        return zip_list
if __name__ == "__main__":
        start_time = time.time()
        file_list = []
        for filename in glob.iglob('/home/ap86/lammps-files/100K/**/*.dump',recursive=True):
                if os.path.isfile(filename):
                        print(filename[:-4])
                        dir=filename[:-10][:-1]
                        os.chdir(dir)
                        print(os.getcwd())
                        bashCommand = "scp -r ../traj.sh ."
                        process = subprocess.Popen(bashCommand.split(),stdout=subprocess.PIPE)
                        output,error = process.communicate()
                        bashCommand = "scp -r traj.dump traj.dump.orig"
                        process = subprocess.Popen(bashCommand.split(),error = process.communicate()
                        with open('traj.dump') as f:
                                if 'vx' in f.read():
                                        bashCommand = "bash traj.sh"
                                        process = subprocess.Popen(bashCommand.split(),stdout=subprocess.PIPE)
                                        output,error = process.communicate()
                                        df = pd.read_csv('traj.dump',sep='\\s+',names=["id","type","x","y","z","vx","vy","vz"],dtype=float)
                                        df.to_csv('traj.csv',index=False)
                                else:
                                        bashCommand = "bash traj.sh"
                                        process = subprocess.Popen(bashCommand.split(),error = process.communicate()
                                        df = pd.read_csv('trajf1.dump',"z"],index=False)
                        df = pd.read_csv('traj.csv')
                        sortedlist = processData(df)
                        print(sortedlist)
                        print("--- %s seconds ---" % (time.time() - start_time))

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)