为什么不将可选Dask Client与pandas.diff结合使用不能计算正确的统计信息?

问题描述

我很难理解为什么当我不使用dask Client时,某些计算的输出会发生变化。

我有一个相当大的数据集(约数千万条记录),其中包含两列:时间戳和标识符。每个不同的ID(我们可能有大约100k)都具有许多不同的时间戳。

当前的任务是计算与每个ID(datetime64[ns])关联的时间戳(int64)之间的时间差的一些统计信息。分析的要点归结为以下操作:

ddf = import_file(filename)
ddf = ddf.set_index('Dates',drop=False)  
groups= dataframe.groupby(grouping_id)[column_id].transform(pd.Series.diff,Meta=(column_id,column_id_dtype))
lags = lags = groups.compute()/ np.timedelta64(1,'s')

这是对ID进行分组,然后计算时间戳(有序)之间的差异。这些差异应为正数或零(除非索引将时间戳打乱)

  • 对于较小的数据集(最多100万条记录),这是预期的结果

Histogram for one million records

  • 这是任务图:

Task graph

对于较大的数据集(约300万条记录),如果不使用dask客户端,则统计信息看起来非常

Strange histogram with negative values

  • 任务图已更改为以下大小:

Task graph for 3 million records w/o using Dask client

但是,如果使用dask Client,则会恢复正确结果。

Correct histogram with expected trend

  • 任务图再次更改:

    Task graph for 3 million records using Dask client

可以帮助您理解为什么会这样,如果这是正常的/预期的行为,将不胜感激。 谢谢!

最小示例

以下代码是可以重现此行为的最小示例。这些示例已经在Jupyter Notebook 6.1.4和dask 2.30.0以及配备Intel®CoreTM i7-7500U cpu @ 2.70GHz和16 GiB RAM的笔记本电脑中进行了测试。

数据生成

import pandas as pd
import numpy as np

# Change between 1000000 and 3000000 to observe change
nelem = 3000000   

# We need to create a list of IDs and timestamps in ascending order.
ids = np.random.randint(0,size=nelem,high=500000000,dtype=np.int64)
dates = pd.date_range(start='2020-08-01 00:00',end='2020-08-01 23:59',periods=nelem)
data = pd.DataFrame({'Dates':dates,'IDS':np.random.choice(ids,nelem)})

# Save to file for later consumption
filename = 'synthetic/pandas_test_{}.csv'.format(nelem)
data.to_csv(filename,sep='\t',header='True',index=False,date_format='%Y-%m-%d %H:%M:%s')

数据消耗

import dask.dataframe as dd
import pandas as pd
import numpy as np
from time import time
import matplotlib.pyplot as plt


#########################################
# CLIENT DEFinitioN
# Comment or uncomment to observe change
#########################################
from dask.distributed import Client
# Here I have provided some parameters,but the
# call to Client() without parameters also shows
# the change
client = Client(n_workers=2,threads_per_worker=4,processes=False,memory_limit='4GB',silence_logs='error')
client
#######################################

# Select the appropriate file
nelem = 3000000
filename = 'synthetic/pandas_test_{}.csv'.format(nelem)

ddf = dd.read_csv(filename,sep="\t",engine='python',parse_dates=[0,],infer_datetime_format=True)

ddf = ddf.set_index('Dates',drop=False,sorted=True)
print('Index computed. Computing lags...')
# Perform computation
groups = dataframe.groupby('IDS')['Dates'].transform(pd.Series.diff,Meta=('Dates',ddf.dtypes[0]))
lags = groups.compute()/ np.timedelta64(1,'s')

# Plot
plt.figure()
ax = lags.plot(kind='hist',title='Lags',bins=30,color='coral')
ax.set_yscale('log')
plt.show()

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)