numpy performace:在numpy中对相同大小的ndarray进行自定义元素操作

问题描述

我正在尝试编写一个高效的矢量化numpy自定义函数,如果有的话,该函数基本上执行元素均值sans -1。

给出的想法是,与输入相同大小的ndarray的列表/元组产生一个相同大小的ndarray,这实际上是(作为元素)作为输入提供的所有ndarray的均值。 这些数组中唯一可能的值可以是[-1、0、1]。但是,当计算此自定义均值时,如果存在其他值,我想从均值计算中忽略-1。此自定义方式为mean_with_dont_knows_int

我已经尝试了通用函数以及沿轴进行操作(将in堆叠在数组中),但是随着输入形状的增加,对于相对较小的500,500,10形状输入,计算时间增加了数分钟。

以下是我到目前为止要做的:

from typing import Tuple

import numpy as np
import pytest


def mean_with_dont_knows_int(*argv: int):
    arr = np.array(argv,dtype=np.int8)
    return mean_with_dont_knows_from_1d(arr)


def mean_with_dont_knows_from_1d(arr: np.ndarray) -> int:
    arr = np.delete(arr,np.argwhere(arr == -1))
    if arr.size == 0:
        return -1
    return int(arr.sum() / len(arr) >= 0.5)


def mean_with_dont_knows(*argv: np.ndarray,use_ufunc: bool = True):
    if use_ufunc:
        return np.frompyfunc(mean_with_dont_knows_int,len(argv),1)(*argv)
    return np.apply_along_axis(mean_with_dont_knows_from_1d,-1,np.stack(argv,axis=-1))


@pytest.mark.parametrize("shape",[(10,10,5),(50,50,10),(500,20),])
def test_mean_with_ufunc(shape: Tuple[int,int,int]):
    mask1 = np.ones(shape,dtype=np.int8)
    mask2 = np.zeros(shape,dtype=np.int8)
    mask3 = np.zeros(shape,dtype=np.int8)
    expected = np.zeros(shape)
    expected[0,:] = 1
    mask3[0,:] = -1
    result = mean_with_dont_knows(mask1,mask2,mask3)
    assert np.array_equal(result,expected) is True


@pytest.mark.parametrize("shape",])
def test_mean_with_axis(shape: Tuple[int,mask3,use_ufunc=False)
    assert np.array_equal(result,expected) is True

随着阵列大小的增加,我看到了性能问题。效果实际上呈线性增长,如下图所示:

python -m pytest --durations=0  --capture=no  test.py                                                                              Exe on: 12:33:26 on 2020-08-16
=================================================================================================== test session starts ===================================================================================================
platform darwin -- Python 3.8.3,pytest-5.4.3,py-1.9.0,pluggy-0.13.1
rootdir: /Users/suneeta.mall/Documents/nearmap/data-science/planck
plugins: nbval-0.9.6,cov-2.10.0
collected 8 items

test.py ........

================================================================================================= slowest test durations ==================================================================================================
107.01s call     test.py::test_mean_with_axis[shape3]
104.04s call     test.py::test_mean_with_ufunc[shape3]
52.77s call     test.py::test_mean_with_axis[shape2]
52.32s call     test.py::test_mean_with_ufunc[shape2]
0.58s call     test.py::test_mean_with_ufunc[shape1]
0.54s call     test.py::test_mean_with_axis[shape1]
0.02s call     test.py::test_mean_with_ufunc[shape0]
0.01s call     test.py::test_mean_with_axis[shape0]

很明显,我缺少一些东西,可以解释为什么我花104.04s来进行相当小的数组的简单均值。.任何人都有想法/建议/改进。

编辑:

已经基于@hpaulj的建议修改了mean_with_dont_knows方法,该方法似乎可以在(500,10)个形状数组上很好地工作,但是对于(2000,2000,80)形状,我回到了55.5x秒的计算时间上。

def mean_with_dont_knows(*argv: np.ndarray) -> np.ndarray:
    arr = np.stack(argv,axis=-1).astype(np.float)
    dont_know_mean = arr.mean(axis=-1).astype(np.int8)

    arr[arr == -1] = np.nan
    arr = np.nanmean(arr,axis=-1)
    arr = (arr >= .5).astype(np.int8)

    arr[dont_know_mean == -1] = -1
    return arr

解决方法

我简化了mean_with_dont_knows_from_1d

def mean_1(arr):
    arr1 = arr[arr!=-1]    # simpler than np.delete
    n = arr1.size
    if n==0:
        return -1
    return int(arr1.mean() >= 0.5)

适用于:

def foo(*argv):
    temp = np.stack(argv,axis=-1).reshape(-1,len(argv))
    res = np.reshape([mean_1(row) for row in temp],argv[0].shape)
    return res

借助reshape,我将3d迭代简化为在2d数组的行上进行迭代。

对于shape = (10,10,5)

In [30]: np.array_equal(expected,foo(mask1,mask2,mask3))                                            
Out[30]: True

时间:

In [31]: timeit foo(mask1,mask3)                                                               
11.8 ms ± 184 µs per loop (mean ± std. dev. of 7 runs,100 loops each)

与您的相比:

In [12]: timeit res = mean_with_dont_knows(mask1,mask3)                                      
29.5 ms ± 133 µs per loop (mean ± std. dev. of 7 runs,10 loops each)
In [13]: timeit res1 = mean_with_dont_knows(mask1,mask3,use_ufunc=False)                    
31.3 ms ± 25.8 µs per loop (mean ± std. dev. of 7 runs,10 loops each)

大部分改进来自mean_1函数。

def foo1(*argv):
    temp = np.stack(argv,axis=-1).astype(float)
    temp[temp==-1] = np.nan
    res = np.nanmean(args,axis=-1)
    return res >=.5

这利用了np.nanmean掩盖np.nan值的功能之一。我可能可以使用-1的想法直接掩盖np.nanmean,现在只使用现有代码即可。速度提高了很多:

In [46]: np.array_equal(foo1(mask1,mask3),expected)                                           
Out[46]: True
In [47]: timeit foo1(mask1,mask3)                                                              
155 µs ± 217 ns per loop (mean ± std. dev. of 7 runs,10000 loops each)

基本上,我避免了500次迭代(以简单的3个元素“平均值”表示)。

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...