问题描述
我想计算数据中唯一行的数量。下面是一个快速的输入/输出示例。
#input
A,B
0,0
0,1
1,0
1,1
#output
A,B,count
0,1
0,1,2
1,2
我的管道中的数据有5000多个列和1M多行,每个单元格为0或1。下面是我两次尝试使用dask进行缩放(具有26列):
import numpy as np
import string
import time
client = Client(n_workers=6,threads_per_worker=2,processes=True)
columns = list(string.ascii_uppercase)
data = np.random.randint(2,size = (1000000,len(columns)))
ddf_parent = dd.from_pandas(pd.DataFrame(data,columns = columns),npartitions=20)
#1st solution
ddf = ddf_parent.astype(str)
ddf_concat = ddf.apply(''.join,axis =1).to_frame()
ddf_concat.columns = ['pattern']
ddf_concat = ddf_concat.groupby('pattern').size()
start = time.time()
ddf_concat = ddf_concat.compute()
print(time.time()-start)
#2nd solution
ddf_concat_other = ddf_parent.groupby(list(ddf.columns)).size()
start = time.time()
ddf_concat_other = ddf_concat_other.compute()
print(time.time() - start)
结果:
9.491615056991577
12.688117980957031
第一个解决方案首先将每一列连接成一个字符串,然后对它进行分组。第二个只是按所有列分组。我倾向于使用第一个,因为它在测试中速度更快,但我愿意提出建议。如果在性能方面有更好的选择,请随意完全更改我的解决方案(此外,有趣的是,sort = False不会加快分组依据,这实际上可能与此:https://github.com/dask/dask/issues/5441和此{{ 3}})
注意: 经过一些测试后,第一个解决方案可以随着列数的增加而相对扩展。我猜一个改进可能是对字符串进行散列以使其始终具有固定长度。在这种情况下对分区号有什么建议吗?从远程仪表板中,我可以看到,经过几次操作,计算图中的节点减少到只有3个,而没有利用其他可用的工人。
注意2: 另外,在第一个解决方案中,我猜想dask如何调度和映射操作确实发生了一些奇怪的事情。发生的情况是,一段时间后,一个工人比其他工人获得更多的任务,然后该工人超过了95%的内存,发生崩溃,然后正确地拆分了任务,但是一段时间后,另一个工人获得了更多的任务(以及周期重新开始)。管道运行正常,但我想知道这是否是预期的行为。随附屏幕截图: https://github.com/rapidsai/cudf/issues/2717
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)