如何在Numpy Busday_count中使用Dask DataFrame?

问题描述

我正在将Pandas / Numpy代码转换dask,以处理较大的数据集。我似乎无法重新创建以下Pandas / Numpy代码

df['days_to_complete'] = np.busday_count(begindates=df['time_order_date'].values.astype('datetime64[D]'),enddates=df['time_complete_date'],weekmask='1111111',holidays=hols_list)

这将返回time_order_date和time_complete_date之间的整数天,同时考虑工作周和假期列表。它会在我的数据框中创建并填充一个新列,没问题。

dask中,我尝试了以下操作:

  1. map_partitions调用numpy函数

    ddf['days_to_complete'] = ddf.time_order.map_partitions(func=np.busday_count,args= ddf['time_order_date'].values.astype('datetime64[D]'),ddf['time_complete_date']),Meta=(None,'i8'))

  2. 还使用lambda进行map_partitions:

    ddf['days_to_complete'] = ddf.map_partitions(lambda ddf: ddf.assign(result = np.busday_count(begindates=ddf['time_order_date'].values.astype('datetime64[D]'),enddates=ddf['time_complete_date'],holidays=hols_list)),'i8'))

,并在运行ddf.compute()后得到以下错误

TypeError: busday_count() got multiple values for argument 'begindates'

如何最好地以并行处理/友好的方式使用此numpy函数 我没有成功使用dask docs / examples或其他SO线程。 我也想像在这里的基本熊猫中一样使用Pandas CustomBusinessHour rollfoward:

bis_hour = CustomBusinessHour(n=1,weekmask='Mon Tue Wed Thu Fri Sat Sun',holidays=hols_list,start = bus_hours_start,end = bus_hours_end,offset=0)
df['time_order_bis'] = pd.to_datetime(df['time_order'])
df['time_order_bis'] = df['time_order_bis'].apply(lambda row: bis_hour.rollforward(row))

此命令将订单时间“前滚”到定义的客户营业时间内(现在星期六的订单是工作日星期一上午7点)。谢谢!

编辑: 我尝试编写和调用函数

def bdays(df):
  return np.busday_count(df.time_order_date.values.astype('datetime64[D]'),df.time_complete_date,holidays=hols_list)
ddf['days_to_complete'] = ddf.map_partitions(bdays,df=ddf,Meta=('days_to_complete','i8')).compute()

我收到以下错误TypeError: bdays() got multiple values for argument 'df'

解决方法

我知道了!关键是要返回Dask数组,而不要过早计算,这会破坏类型。我建议进行大量的type()检查,并逐步进行操作,从本质上讲,您希望Dask对象一直存在,pandas对象/ numpy数组可能会破坏分区/并行性。

功能:

def bdays(df=ddf):


return da.from_array(np.busday_count(df.time_order_date,df.time_complete_date,weekmask='1111111',holidays=hols_list))

使用map_partitions。请注意,上面函数的第一个参数需要dataframe / partition->我们不在map分区中指定!仅附加参数。

ddf['days_to_complete'] = ddf.map_partitions(bdays,meta=('days_to_complete','i8'))

在分配给数据框中新列之前的计算(compute())导致错误。

TypeError: set_index() missing 1 required positional argument: 'other'

调试建议: 测试您的输入并仅使用一个分区测试功能。 bdays是上面的功能。

type(ddf.map_partitions(bdays,meta='i8'))

output: dask.dataframe.core.Series

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...