同时运行两个 dask-ml 输入器而不是依次运行 设置Dask 插补输出

问题描述

我可以像这样使用 dask-ml 估算平均值和最频繁的值,这很好用:

mean_imputer = impute.SimpleImputer(strategy='mean')
most_frequent_imputer = impute.SimpleImputer(strategy='most_frequent')
data = [[100,2,5],[np.nan,np.nan,np.nan],[70,7,5]]
df = pd.DataFrame(data,columns = ['Weight','Age','Height']) 
df.iloc[:,[0,1]] = mean_imputer.fit_transform(df.iloc[:,1]])
df.iloc[:,[2]] = most_frequent_imputer.fit_transform(df.iloc[:,[2]])
print(df)


    Weight  Age   Height
0   100.0   2.0   5.0
1   85.0    4.5   5.0
2   70.0    7.0   5.0

但是,如果我有 1 亿行数据,那么当 dask 可以只执行一个循环时,它似乎会执行两个循环,是否可以同时和/或并行而不是按顺序运行两个输入器?实现这一目标的示例代码是什么?

解决方法

如果实体彼此独立,您可以按照文档中的建议使用 dask.delayedDask Toutorial 来并行化计算。

您的代码如下所示:

from dask.distributed import Client

client = Client(n_workers=4)

from dask import delayed
import numpy as np
import pandas as pd
from dask_ml import impute

mean_imputer = impute.SimpleImputer(strategy='mean')
most_frequent_imputer = impute.SimpleImputer(strategy='most_frequent')

def fit_transform_mi(d):
    return mean_imputer.fit_transform(d)
def fit_transform_mfi(d):
    return most_frequent_imputer.fit_transform(d)
def setdf(a,b,df):
    df.iloc[:,[0,1]]=a
    df.iloc[:,[2]]=b
    return df

data = [[100,2,5],[np.nan,np.nan,np.nan],[70,7,5]]
df = pd.DataFrame(data,columns = ['Weight','Age','Height']) 
a = delayed(fit_transform_mi)(df.iloc[:,1]])
b = delayed(fit_transform_mfi)(df.iloc[:,[2]])
c = delayed(setdf)(a,df)
df= c.compute()
print(df)
client.close()

c 对象是一个惰性 Delayed 对象。这个对象包含我们计算最终结果所需的一切,包括对所有所需函数的引用,以及它们的输入和相互之间的关系。

,

Dask 可用于通过并行处理加速计算以及当数据不适合内存时。在下面的示例中,十个文件中包含的 300M 行数据是使用 Dask 估算的。该过程的图形表明: 1. 均值和最频繁的输入器并行运行; 2. 十个文件也是并行处理的。

设置

为了准备大量数据,将你问题中的三行数据进行复制,形成一个30M行的数据框。数据框保存在十个不同的文件中,以产生总共 300M 行,其统计数据与您的问题相同。

import numpy as np
import pandas as pd

N = 10000000
weight = np.array([100,70]*N)
age = np.array([2,7]*N)
height = np.array([5,5]*N)
df = pd.DataFrame({'Weight': weight,'Age': age,'Height': height})

# Save ten large data frames to disk
for i in range(10):
    df.to_parquet(f'./df_to_impute_{i}.parquet',compression='gzip',index=False)

Dask 插补

import graphviz
import dask
import dask.dataframe as dd
from dask_ml.impute import SimpleImputer

# Read all files for imputation in a dask data frame from a specific directory
df = dd.read_parquet('./df_to_impute_*.parquet')

# Set up the imputers and columns
mean_imputer = SimpleImputer(strategy='mean')
mostfreq_imputer = SimpleImputer(strategy='most_frequent')
imputers = [mean_imputer,mostfreq_imputer]

mean_cols = ['Weight','Age']
freq_cols = ['Height']
columns = [mean_cols,freq_cols]

# Create a new data frame with imputed values,then visualize the computation.
df_list = []
for imputer,col in zip(imputers,columns):
    df_list.append(imputer.fit_transform(df.loc[:,col]))
imputed_df = dd.concat(df_list,axis=1)
imputed_df.visualize(filename='imputed.svg',rankdir='LR')

# Save the new data frame to disk
imputed_df.to_parquet('imputed_df.parquet',compression='gzip')

输出

enter image description here

imputed_df.head()

    Weight  Age     Height
0   100.0   2.0     5.0
1   85.0    4.5     5.0
2   70.0    7.0     5.0
3   100.0   2.0     5.0
4   85.0    4.5     5.0


# Check the summary statistics make sense - 300M rows and stats as expected
imputed_df.describe().compute()

    Weight  Age     Height
count   3.000000e+08    3.000000e+08    300000000.0
mean    8.500000e+01    4.500000e+00    5.0
std     1.224745e+01    2.041241e+00    0.0
min     7.000000e+01    2.000000e+00    5.0
25%     7.000000e+01    2.000000e+00    5.0
50%     8.500000e+01    4.500000e+00    5.0
75%     1.000000e+02    7.000000e+00    5.0
max     1.000000e+02    7.000000e+00    5.0