为什么与 DASK Delayed 合并比与 DASK 内置命令合并花费的时间要多得多?

问题描述

我想将一个形状为 df1.shape = (80000,18) 的大熊猫数据框合并到一个名为“key”的列上一个形状为 df2.shape = (1,18) 的小熊猫数据框。这是使用 dd.merge 的时间性能

ddf1 = from_pandas(df1,npartitions=20)
ddf2 = from_pandas(df2,npartitions=1)
start = time.time()
pred_mldf = dd.merge(ddf1,ddf2,on =['key'])
print(pred_mldf)
t0 = time.time()
print("deltat = ",t0 - start)

结果是 deltat = 0.04。

然后我开始使用以这种方式延迟的 dask 来实现它:

def mymerge(df1,df2,key):
    pred_mldf = pd.merge(df1,on = key)
    return pred_mldf

start = time.time()
pred_mldf = dask.delayed(mymerge)(df1,['key'])
pred_mldf.compute()
t0 = time.time()
print("deltat = ",t0 - start)

结果是 deltat = 3.48。

我的假设是我需要通过两种方法达到相同的时间性能。我在这里做错了什么?

解决方法

正如@Nick Becker 在评论中指出的那样,现在您的第一个代码块只定义了合并,但不执行它(而第二个代码块则这样做),因此添加 .compute() 应该给出不同的合并时间:

ddf1 = from_pandas(df1,npartitions=20)
ddf2 = from_pandas(df2,npartitions=1)
start = time.time()
pred_mldf = dd.merge(ddf1,ddf2,on =['key']).compute()
print(pred_mldf)
t0 = time.time()
print("deltat = ",t0 - start)

执行速度不同的另一个原因是,在第二个代码块中,您将完整的 df1 传递给延迟函数。如果 df1 很大,那么将其分成 20 个块(如在第一个代码块中)并将它们单独传递给延迟函数可能会更公平一些。