Python 3.8-parallel.futures.ProcessPoolExecutor性能随时间下降

问题描述

我正在尝试并行化我的一个匹配函数,并且它在一开始就起作用。很高兴看到我的72核ec2实例正在杀死它,大约一分钟左右,它又回到了单核,并且每秒的迭代次数开始下降。

import concurrent.futures as cf

results = pd.DataFrame()

with cf.ProcesspoolExecutor() as executor:
    for res in tqdm(executor.map(matcher_helper,list(range(len(df))))):
        results = pd.concat([results,res],axis=0)

一开始我就看到了

multicore processing

然后转到此

singlecore dropping

大约一分钟,与单核相比,处理效果非常好。在进行多处理时,它的迭代速度大约为每秒250次,并且下降为每秒35次

非常感谢任何指导。

编辑-附加信息-我的原始功能

def matcher(data,data_radial_matrice,data_indice,comparison_data,comparison_radial_matrice,distance_threshold=.1):
    

    import pandas as pd
    from sklearn.metrics.pairwise import haversine_distances
    from fuzzywuzzy import fuzz
    import numpy as np

    lvl1 = haversine_distances(data_radial_matrice[data_indice].reshape(-1,2),comparison_radial_matrice) * 3959
    lvl2 = pd.Series(lvl1[lvl1 < distance_threshold])

    lvl1 = pd.DataFrame(np.argwhere(lvl1 < distance_threshold)).iloc[:,1]

    lvl3 = pd.concat((lvl1,lvl2),axis=1)
    lvl3.columns = ['neigh_index','distance']
    lvl3.set_index('neigh_index',inplace=True)
    lvl3 = lvl3.merge(comparison_data,left_index=True,right_index=True,how='inner')

    lvl4 = lvl3.loc[:,'match_text'].apply(
        lambda x: fuzz.token_set_ratio(data.loc[data_indice,'match_text'],x))
    lvl5 = np.where(lvl4 == np.max(lvl4))
    interim_result = lvl3.iloc[lvl5]
    interim_result['match_@R_404_5157@'] = np.max(lvl4)
    interim_result['adp_indice'] = data_indice

    return interim_result

解决方法

当我将结果收集部分更改为解决问题的pandas.concat时,主要的性能瓶颈是由np.concatenate进程引起的。在熊猫后端中,经过一定的IO阈值后,这会减慢整个过程并终止多核处理。

我对代码做了些微更改,最后返回了numpy数组。

def matcher2(data,data_radial_matrice,data_indice,comparison_data,comparison_radial_matrice,distance_threshold=.1):
'''  Haversine Distance between selected data point and comparison data points are calculated in miles
    by default is limited to .1 mile distance and among this filtered resuls matching is done and max score records are returned
'''

import pandas as pd
from sklearn.metrics.pairwise import haversine_distances
from fuzzywuzzy import fuzz
import numpy as np

lvl1 = haversine_distances(data_radial_matrice[data_indice].reshape(-1,2),comparison_radial_matrice) * 3959
lvl2 = pd.Series(lvl1[lvl1 < distance_threshold])

lvl1 = pd.DataFrame(np.argwhere(lvl1 < distance_threshold)).iloc[:,1]

lvl3 = pd.concat((lvl1,lvl2),axis=1)
lvl3.columns = ['neigh_index','distance']
lvl3.set_index('neigh_index',inplace=True)
lvl3 = lvl3.merge(comparison_data,left_index=True,right_index=True,how='inner')

lvl4 = lvl3.loc[:,'match_text'].apply(
    lambda x: fuzz.token_set_ratio(data.loc[data_indice,'match_text'],x))
lvl5 = np.where(lvl4 == np.max(lvl4))
interim_result = lvl3.iloc[lvl5]
interim_result['match_score'] = np.max(lvl4)
interim_result['adp_indice'] = data_indice

return np.array(interim_result)

最后,当我解析结果时。

def dnb_matcher_helper(indice):
    return matcher2(adp,adp_rad,indice,dnb,dnb_rad)

import concurrent.futures as cf

dnb_results = np.empty(shape=(1,35))

with cf.ProcessPoolExecutor() as executor:
    for res in tqdm(executor.map(dnb_matcher_helper,list(range(len(adp))))):
    if len(res) == 0:
        continue
    else:
        for line in res:
            line = line.reshape((1,35))
            dnb_results = np.concatenate((dnb_results,line),axis=0)