问题描述
我正在尝试编写一个高效的矢量化numpy自定义函数,如果有的话,该函数基本上执行元素均值sans -1。
给出的想法是,与输入相同大小的ndarray的列表/元组产生一个相同大小的ndarray,这实际上是(作为元素)作为输入提供的所有ndarray的均值。
这些数组中唯一可能的值可以是[-1、0、1]。但是,当计算此自定义均值时,如果存在其他值,我想从均值计算中忽略-1。此自定义方式为mean_with_dont_knows_int
。
我已经尝试了通用函数以及沿轴进行操作(将in堆叠在数组中),但是随着输入形状的增加,对于相对较小的500,500,10
形状输入,计算时间增加了数分钟。
以下是我到目前为止要做的:
from typing import Tuple
import numpy as np
import pytest
def mean_with_dont_knows_int(*argv: int):
arr = np.array(argv,dtype=np.int8)
return mean_with_dont_knows_from_1d(arr)
def mean_with_dont_knows_from_1d(arr: np.ndarray) -> int:
arr = np.delete(arr,np.argwhere(arr == -1))
if arr.size == 0:
return -1
return int(arr.sum() / len(arr) >= 0.5)
def mean_with_dont_knows(*argv: np.ndarray,use_ufunc: bool = True):
if use_ufunc:
return np.frompyfunc(mean_with_dont_knows_int,len(argv),1)(*argv)
return np.apply_along_axis(mean_with_dont_knows_from_1d,-1,np.stack(argv,axis=-1))
@pytest.mark.parametrize("shape",[(10,10,5),(50,50,10),(500,20),])
def test_mean_with_ufunc(shape: Tuple[int,int,int]):
mask1 = np.ones(shape,dtype=np.int8)
mask2 = np.zeros(shape,dtype=np.int8)
mask3 = np.zeros(shape,dtype=np.int8)
expected = np.zeros(shape)
expected[0,:] = 1
mask3[0,:] = -1
result = mean_with_dont_knows(mask1,mask2,mask3)
assert np.array_equal(result,expected) is True
@pytest.mark.parametrize("shape",])
def test_mean_with_axis(shape: Tuple[int,mask3,use_ufunc=False)
assert np.array_equal(result,expected) is True
随着阵列大小的增加,我看到了性能问题。效果实际上呈线性增长,如下图所示:
python -m pytest --durations=0 --capture=no test.py Exe on: 12:33:26 on 2020-08-16
=================================================================================================== test session starts ===================================================================================================
platform darwin -- Python 3.8.3,pytest-5.4.3,py-1.9.0,pluggy-0.13.1
rootdir: /Users/suneeta.mall/Documents/nearmap/data-science/planck
plugins: nbval-0.9.6,cov-2.10.0
collected 8 items
test.py ........
================================================================================================= slowest test durations ==================================================================================================
107.01s call test.py::test_mean_with_axis[shape3]
104.04s call test.py::test_mean_with_ufunc[shape3]
52.77s call test.py::test_mean_with_axis[shape2]
52.32s call test.py::test_mean_with_ufunc[shape2]
0.58s call test.py::test_mean_with_ufunc[shape1]
0.54s call test.py::test_mean_with_axis[shape1]
0.02s call test.py::test_mean_with_ufunc[shape0]
0.01s call test.py::test_mean_with_axis[shape0]
很明显,我缺少一些东西,可以解释为什么我花104.04s来进行相当小的数组的简单均值。.任何人都有想法/建议/改进。
编辑:
已经基于@hpaulj的建议修改了mean_with_dont_knows
方法,该方法似乎可以在(500,10)个形状数组上很好地工作,但是对于(2000,2000,80)
形状,我回到了55.5x秒的计算时间上。
def mean_with_dont_knows(*argv: np.ndarray) -> np.ndarray:
arr = np.stack(argv,axis=-1).astype(np.float)
dont_know_mean = arr.mean(axis=-1).astype(np.int8)
arr[arr == -1] = np.nan
arr = np.nanmean(arr,axis=-1)
arr = (arr >= .5).astype(np.int8)
arr[dont_know_mean == -1] = -1
return arr
解决方法
我简化了mean_with_dont_knows_from_1d
:
def mean_1(arr):
arr1 = arr[arr!=-1] # simpler than np.delete
n = arr1.size
if n==0:
return -1
return int(arr1.mean() >= 0.5)
适用于:
def foo(*argv):
temp = np.stack(argv,axis=-1).reshape(-1,len(argv))
res = np.reshape([mean_1(row) for row in temp],argv[0].shape)
return res
借助reshape
,我将3d迭代简化为在2d数组的行上进行迭代。
对于shape = (10,10,5)
In [30]: np.array_equal(expected,foo(mask1,mask2,mask3))
Out[30]: True
时间:
In [31]: timeit foo(mask1,mask3)
11.8 ms ± 184 µs per loop (mean ± std. dev. of 7 runs,100 loops each)
与您的相比:
In [12]: timeit res = mean_with_dont_knows(mask1,mask3)
29.5 ms ± 133 µs per loop (mean ± std. dev. of 7 runs,10 loops each)
In [13]: timeit res1 = mean_with_dont_knows(mask1,mask3,use_ufunc=False)
31.3 ms ± 25.8 µs per loop (mean ± std. dev. of 7 runs,10 loops each)
大部分改进来自mean_1
函数。
def foo1(*argv):
temp = np.stack(argv,axis=-1).astype(float)
temp[temp==-1] = np.nan
res = np.nanmean(args,axis=-1)
return res >=.5
这利用了np.nanmean
掩盖np.nan
值的功能之一。我可能可以使用-1
的想法直接掩盖np.nanmean
,现在只使用现有代码即可。速度提高了很多:
In [46]: np.array_equal(foo1(mask1,mask3),expected)
Out[46]: True
In [47]: timeit foo1(mask1,mask3)
155 µs ± 217 ns per loop (mean ± std. dev. of 7 runs,10000 loops each)
基本上,我避免了500次迭代(以简单的3个元素“平均值”表示)。