Joblib函数的并行计算

问题描述

enter image description here

如何使用Job lib并行化此功能?计算是在for循环内进行的

lotrunnums=
['RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543','6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268']

#function

def get_data_to_dict(data_train,lotrunnums):
    
    start = time.time()
    data=dict()
    for i in lotrunnums:
        trace=data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
        data.update(dict({i: np.array(trace)}))
    end = time.time()
    print('{:.4f} s'.format(end-start))
    return data

解决方法

您将必须创建获取idata_train并返回trace的函数

    def func(data):
        data_train,i = data
        trace = data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
        return i,trace.to_list()

然后可以在for循环中或在ThreadPoolJoblib等中运行它。

运行后,您可以将所有结果转换为字典。


测试不同方法的最小代码

import pandas as pd
import numpy as np
import time
import random

random.seed(0) # always generate the same values

data_train = pd.DataFrame({
    'LOT_RUNNUM': [
        'RX9OE_29756162','S009K_29952685','1P32G_29692263','721YA_29780543','6S3R6_29759571','RX9D0_29865357','RX9PV_29751006','RX9QM_29794268','RX9OE_29756162',],'SENSOR_VALUE': [random.randint(0,9) for _ in range(8*10)],})    

lotrunnums = [
    'RX9OE_29756162','RX9QM_29794268'
]

def get_data_to_dict_1(data_train,lotrunnums):
    """ using `for`-loop` """

    start = time.time()
    
    data = dict()
    
    for i in lotrunnums:
        trace = data_train["SENSOR_VALUE"][data_train["LOT_RUNNUM"]==i]
        #data.update(dict({i: np.array(trace)}))
        #data[i] = np.array(trace)
        data[i] = trace.to_list()
        
    end = time.time()
    
    print('{:.4f} s'.format(end-start))
    
    return data

def get_data_to_dict_2(data_train,lotrunnums):
    """ using `isin()`,`groupby()`,`apply()` """

    start = time.time()
    
    trace = data_train[["SENSOR_VALUE","LOT_RUNNUM"]][data_train["LOT_RUNNUM"].isin(lotrunnums)]

    data = trace.groupby("LOT_RUNNUM")['SENSOR_VALUE'].apply(list).to_dict()
    
    #groups = trace.groupby("LOT_RUNNUM").apply(lambda key,grp: [key,grp["SENSOR_VALUE"].to_list()])
    #print(groups)
    #for key,grp in groups:
    #    print([key,grp["SENSOR_VALUE"].to_list()])
    #data = dict([key,grp["SENSOR_VALUE"].to_list()] for key,grp in groups)
    
    end = time.time()
    
    print('{:.4f} s'.format(end-start))
    
    return data

def get_data_to_dict_threadpool(data_train,lotrunnums):
    """ using ThreadPoll """
    
    from concurrent.futures import ThreadPoolExecutor
    
    def func(data):
        data_train,trace.to_list()
    
    start = time.time()
    
    with ThreadPoolExecutor(max_workers=4) as executor:
        future = executor.map(func,[(data_train,i) for i in lotrunnums])
        data = dict(future)

    end = time.time()
    
    print('{:.4f} s'.format(end-start))
    
    return data

def get_data_to_dict_joblib(data_train,lotrunnums):
    """ using Joblib with threads """
     
    from joblib import Parallel,delayed
    
    def func(data_train,i):
        #data_train,trace.to_list()
    
    start = time.time()
    
    results = Parallel(n_jobs=4,prefer="threads")(delayed(func)(data_train,i) for i in lotrunnums)
    data = dict(results)
    
    end = time.time()
    
    print('{:.4f} s'.format(end-start))
    
    return data

def get_data_to_dict_joblib_process(data_train,lotrunnums):
    """ using Joblib with processes """
    
    from joblib import Parallel,trace.to_list()
    
    start = time.time()
    
    results = Parallel(n_jobs=4)(delayed(func)(data_train,i) for i in lotrunnums)
    data = dict(results)
    
    end = time.time()
    
    print('{:.4f} s'.format(end-start))
    
    return data

# --- main ---

print('--- normal 1 ---')
print(get_data_to_dict_1(data_train,lotrunnums))

print('--- normal 2 ---')
print(get_data_to_dict_2(data_train,lotrunnums))

print('--- threadpool ---')
print(get_data_to_dict_threadpool(data_train,lotrunnums))

print('--- joblib - thread ---')
print(get_data_to_dict_joblib(data_train,lotrunnums))

print('--- joblib - process ---')
print(get_data_to_dict_joblib_process(data_train,lotrunnums))

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...