在另一个成对的bin数组中获取数据数组最小值的最快方法

问题描述

我有三个一维数组:

  • idxs:索引数据
  • weightsidxs中各个指标的权重
  • bins:用于计算其中最小权重的 bin。

这是我当前使用 idxs 检查名为 weights 的数据在哪个 bin 中的方法,然后计算分箱权重的最小值/最大值:

illustration

  1. 获取显示每个 slices 元素属于哪个 bin 的 idxs
  2. 同时对 slicesweights 进行排序。
  3. 计算每个 bin(切片)中 weights 的最小值。

numpy 方法

import random
import numpy as np

# create example data
out_size = int(10)
bins = np.arange(3,out_size-3)
idxs = np.arange(0,out_size)
#random.shuffle(idxs)

# set duplicated slice manually for test
idxs[4] = idxs[3]
idxs[6] = idxs[7]

weights = idxs

# get which bin idxs belong to
slices = np.digitize(idxs,bins)

# get index and weights in bins
valid = (bins.max() >= idxs) & (idxs >= bins.min())
valid_slices = slices[valid]
valid_weights = weights[valid]

# sort slice and weights
sort_index = valid_slices.argsort()
valid_slices_sort = valid_slices[sort_index]
valid_weights_sort = valid_weights[sort_index]

# get index of each first unque slices
unique_slices,unique_index = np.unique(valid_slices_sort,return_index=True)
# calculate the minimum
res_sub = np.minimum.reduceat(valid_weights_sort,unique_index)

# save results
res = np.full((out_size),np.nan)
res[unique_slices-1] = res_sub

print(res)

结果:

array([ 3.,nan,5.,nan])

如果我将 out_size 增加到 1e7 并shuffle 数据,速度(从 np.digitize 到末尾)很慢:

13.5 s ± 136 ms per loop (mean ± std. dev. of 7 runs,1 loop each)

而且,这是每个部分的消耗时间

np.digitize: 10.8 s ± 12.6 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
valid: 171 ms ± 3.78 ms per loop (mean ± std. dev. of 7 runs,10 loops each)
argsort and slice: 2.02 s ± 33.7 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
unique: 9.9 ms ± 113 µs per loop (mean ± std. dev. of 7 runs,100 loops each)
np.minimum.reduceat: 5.11 ms ± 52.7 µs per loop (mean ± std. dev. of 7 runs,100 loops each)

np.digitize() 花费的时间最多:10.8 秒。并且,接下来是 argsort: 2.02 s。

我还检查了使用 mean 计算 np.histogram消耗时间

counts,_ = np.histogram(idxs,bins=out_size,range=(0,out_size))
sums,out_size),weights = weights,density=False)
mean = sums / np.where(counts == 0,np.nan,counts)

33.2 s ± 3.47 s per loop (mean ± std. dev. of 7 runs,1 loop each)

这类似于我计算最小值的方法

scipy 方法

from scipy.stats import binned_statistic
statistics,_,_ = binned_statistic(idxs,weights,statistic='min',bins=bins)

print(statistics)

结果有点不同,但是对于较长的(1e7)混洗数据,速度要慢得多(x6):

array([ 3.,5.])

1min 20s ± 6.93 s per loop (mean ± std. dev. of 7 runs,1 loop each)

总结

我想找出一个更快的方法。如果该方法也适用于 dask,那就太好了!

用户案例

这是我的真实数据 (1D) 的样子:

real data

解决方法

实现此目的的快速方法是使用 dask.dataframepd.cut,我首先展示 pandas 版本:

import numpy as np
from scipy.stats import binned_statistic as bs
import pandas as pd

nrows=10**7

df = pd.DataFrame(np.random.rand(nrows,2),columns=['x','val'])

bins = np.linspace(df['x'].min(),df['x'].max(),10)

df['binned_x'] = pd.cut(df['x'],bins=bins,right=False)

result_pandas = df.groupby('binned_x')['val'].min().values
result_scipy = bs(df['x'],df['val'],'min',bins=bins)[0]

print(np.isclose(result_pandas,result_scipy))
# [ True  True  True  True  True  True  True  True  True]

现在要从 pandas 转到 dask,您需要确保 bin 跨分区一致,因此请查看 here。一旦每个分区都被一致地装箱,你想要应用所需的操作(最小/最大/总和/计数):

import dask.dataframe as dd
ddf = dd.from_pandas(df,npartitions=10)

def f(df,bins):
    df = df.copy()
    df['binned_x'] = pd.cut(df['x'],right=False)
    result = df.groupby('binned_x',as_index=False)['val'].min()
    return result

result_dask = ddf.map_partitions(f,bins).groupby('binned_x')['val'].min().compute()

print(np.isclose(result_pandas,result_dask))
# [ True  True  True  True  True  True  True  True  True]

在我的笔记本电脑上,第一个代码大约需要 7 3 秒,第二个代码大约快 10 倍(忘了我在重复计算 pandas 和 scipy执行相同的操作)。分区有一定的余地,但这取决于上下文,因此您可以尝试优化数据/硬件。

更新:请注意,此方法适用于最小值/最大值,但对于平均值,您需要计算总和和计数,然后将它们相除。在一次性执行此计算时可能有一种跟踪权重的好方法,但可能不值得增加代码复杂性。

,

SultanOrazbayev 展示了一个快速的方法;我会添加一个很酷的。

mask = bins[:,None] == idxs[None,:]
result = np.nanmin(np.where(mask,weights,np.nan),axis=-1)
# Note: may produce (expected) runtime warning if bin has no values

当然,你也可以做np.nanmaxnp.nanmean

以上假设您的 bin 确实是单个值。如果它们是范围,则需要稍微多做一些工作来构建掩码

lower_mask = idxs[None,:] >= bins[:,None]
upper_mask = np.empty_like(lower_mask)
upper_mask[:-1,...] = idxs[None,:] < bins[1:,None]
upper_mask[-1,...] = False

mask = lower_mask & upper_mask

此时您可以像上面一样使用 np.nanmin


Ofc np.where 和创建掩码的广播将创建具有各自数据类型的新形状数组 (len(bins),len(idxs))。如果您对此不关心,那么上述内容可能就足够了。

如果这是一个问题(因为您需要 RAM),那么我的第一个建议是购买更多 RAM。如果 - 由于某些愚蠢的原因(例如,钱) - 这不起作用,您可以通过在手动重新跨步到 weights

的视图上使用掩码数组来避免 weights 的副本>
import numpy.ma as ma

mask = ...

restrided_weights = np.lib.stride_tricks.as_strided(weights,shape=(bins.size,idxs.size),strides=(0,idxs.strides[0]))
masked = ma.masked_array(restrided_weights,mask=~mask,fill_value=np.nan,dtype=np.float64)
result = masked.min(axis=-1).filled(np.nan)

这避免了 weights 的副本和上述运行时警告。

如果您甚至没有足够的内存来构造 mask,那么您可以尝试分块处理数据。

最后我检查过,Dask 过去在使用手动跨距数组喂食时有一些有趣的行为。不过,这方面有一些工作,因此您可能需要仔细检查是否已解决,在这种情况下,您可以愉快地将上述内容并行化。


更新基于您对此答案和其他答案的进一步评论:

您可以分块进行此计算,以避免由于大量 bin(数量级为 1e4)而导致的内存问题。将具体数字放入完整示例中并添加进度条表示单核运行时间

import numpy.ma as ma
from tqdm import trange
import numpy as np
import random

# create example data
out_size = int(1.5e5)
#bins = np.arange(3,out_size-3)
bins = np.arange(3,int(3.8e4-3),dtype=np.int64)
idxs = np.arange(0,out_size)
random.shuffle(idxs)

# set duplicated slice manually for test
idxs[4] = idxs[3]
idxs[6] = idxs[7]

weights = idxs

chunk_size = 100

# preallocate buffers to avoid array creation in main loop
extended_bins = np.empty(len(bins) + 1,dtype=bins.dtype)
extended_bins[:-1] = bins
extended_bins[-1] = np.iinfo(bins.dtype).max # last bin goes to infinity
mask_buffer = np.empty((chunk_size,len(idxs)),dtype=bool)


result = np.empty_like(bins,dtype=np.float64)

for low in trange(0,len(bins),chunk_size):
    high = min(low + chunk_size,len(bins))
    chunk_size = high - low
    mask_buffer[:chunk_size,...] = ~((bins[low:high,None] <= idxs[None,:]) & (extended_bins[low+1:high+1,None] > idxs[None,:]))
    mask = mask_buffer[:chunk_size,...]
    restrided_weights = np.lib.stride_tricks.as_strided(weights,shape=mask.shape,idxs.strides[0]))
    masked = ma.masked_array(restrided_weights,mask=mask,dtype=np.float64)
    result[low:high] = masked.min(axis=-1).filled(np.nan)

奖励:对于 minmax ,您可以使用一个很酷的技巧:排序 idxs 和 { {1}} 基于 weights(最小值升序,最大值降序)。这样,您可以立即查找最小值/最大值,并且可以完全避免屏蔽数组和自定义步幅。这依赖于 weights 的一些没有很好记录的行为,它对布尔数组进行快速传递并且不搜索完整数组。

它仅适用于这两种情况,对于更复杂的事情(平均值),您必须回到上述情况,但对于这两种情况,它又减少了约 70% 并在单核时钟上运行

np.argmax