Python Dask-所有列的分组依据性能

问题描述

我想计算数据中唯一行的数量。下面是一个快速的输入/输出示例。

#input
A,B
0,0
0,1
1,0
1,1

#output
A,B,count
0,1
0,1,2
1,2

我的管道中的数据有5000多个列和1M多行,每个单元格为0或1。下面是我两次尝试使用dask进行缩放(具有26列):

import numpy as np
import string
import time

client = Client(n_workers=6,threads_per_worker=2,processes=True)

columns = list(string.ascii_uppercase)

data = np.random.randint(2,size = (1000000,len(columns)))

ddf_parent = dd.from_pandas(pd.DataFrame(data,columns = columns),npartitions=20)

#1st solution
ddf = ddf_parent.astype(str)

ddf_concat = ddf.apply(''.join,axis =1).to_frame()

ddf_concat.columns = ['pattern']

ddf_concat = ddf_concat.groupby('pattern').size()

start = time.time()
ddf_concat = ddf_concat.compute()
print(time.time()-start)

#2nd solution
ddf_concat_other = ddf_parent.groupby(list(ddf.columns)).size()

start = time.time()
ddf_concat_other = ddf_concat_other.compute()
print(time.time() - start)

结果:

9.491615056991577
12.688117980957031

一个解决方案首先将每一列连接成一个字符串,然后对它进行分组。第二个只是按所有列分组。我倾向于使用第一个,因为它在测试中速度更快,但我愿意提出建议。如果在性能方面有更好的选择,请随意完全更改我的解决方案(此外,有趣的是,sort = False不会加快分组依据,这实际上可能与此:https://github.com/dask/dask/issues/5441和此{{ 3}})

注意: 经过一些测试后,第一个解决方案可以随着列数的增加而相对扩展。我猜一个改进可能是对字符串进行散列以使其始终具有固定长度。在这种情况下对分区号有什么建议吗?从远程仪表板中,我可以看到,经过几次操作,计算图中的节点减少到只有3个,而没有利用其他可用的工人。

当列增加时,第二种解决方案将失败。

注意2: 另外,在第一个解决方案中,我猜想dask如何调度和映射操作确实发生了一些奇怪的事情。发生的情况是,一段时间后,一个工人比其他工人获得更多的任务,然后该工人超过了95%的内存,发生崩溃,然后正确地拆分了任务,但是一段时间后,另一个工人获得了更多的任务(以及周期重新开始)。管道运行正常,但我想知道这是否是预期的行为。随附屏幕截图: https://github.com/rapidsai/cudf/issues/2717

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...