问题描述
给定一个点 (0.8,0.6)
和一个强度 (3)
,可以双线性地将单个点“反向插值”到具有整数索引 (0,0) -> (1,1)
的 2x2 网格上。
在这样的网格上,上面坐标的权重变为:
0.08 | 0.12
----------------
0.32 | 0.48
我们可以乘以与坐标相关的强度,得到一个 2x2 网格双线性加权强度:
0.24 | 0.36
----------------
0.96 | 1.44
并且可以这样绘制:
points = np.array([(0.8,0.6),(2.2,2.6),(5,1),(3,4.2),(8.5,8.2)])
intens = np.array([3,3,1,2])
image,weight = bilinear(points,intens)
对于我的工作,我需要 weights
和 intensity*weights
作为输出数组。我需要在非常大的(数百万个)坐标上执行上述操作,其中坐标的值从 0.0 到 4095.0。我在下面编写了一个 numpy 例程,虽然它对于 100_000 个点来说相当快(1.25 秒),但我希望它更快,因为我需要在我拥有的 10_000_000 个数据点上多次调用它。
我考虑过对 numpy 代码进行矢量化而不是 for 循环,但随后我为每个点生成了一个 4096x4096 的空数组,然后我将对其求和。这将需要 1000 TB 的内存。
我也尝试过在 cupy 中的简单实现,但由于我使用了 for 循环,它变得太慢了。
在我的代码中,我为每个点生成一个 2x2 加权数组,将数组乘以强度,然后通过切片将它们添加到主数组中。有没有更好的办法?
import numpy as np
def bilinear(points,intensity):
"""Bilinear weighting of points onto a grid.
Extent of grid given by min and max of points in each dimension
points should have shape (N,2)
intensity should have shape (N,)
"""
floor = np.floor(points)
ceil = floor + 1
floored_indices = np.array(floor,dtype=int)
low0,low1 = floored_indices.min(0)
high0,high1 = floored_indices.max(0)
floored_indices = floored_indices - (low0,low1)
shape = (high0 - low0 + 2,high1-low1 + 2)
weights_arr = np.zeros(shape,dtype=float)
int_arr = np.zeros(shape,dtype=float)
upper_diff = ceil - points
lower_diff = points - floor
w1 = np.prod((upper_diff),axis=1)
w2 = upper_diff[:,0]*lower_diff[:,1]
w3 = lower_diff[:,0]*upper_diff[:,1]
w4 = np.prod((lower_diff),axis=1)
for i,index in enumerate(floored_indices):
s = np.s_[index[0]:index[0]+2,index[1]:index[1]+2]
weights = np.array([[w1[i],w2[i]],[w3[i],w4[i]]])
weights_arr[s] += weights
int_arr[s] += intensity[i]*weights
return int_arr,weights_arr
rng = np.random.default_rng()
N_points = 10_000 # use 10_000 so it is quick
image_shape = (256,256) # Use 256 so it isn't so big
points = rng.random((N_points,2)) * image_shape
intensity = rng.random(N_points)
image,intensity)
为了测试代码,我还提供了以下绘图代码 - 仅使用少量 (~10) 个点,否则散点将覆盖整个图像。
import matplotlib.pyplot as plt
floor = np.floor(points) - 0.5
lower,left = floor.min(0)
upper,right = (floor).max(0) + 2
extent = (left,right,upper,lower)
fig,(ax1,ax2) = plt.subplots(ncols=2,figsize=(6,3))
ax1.scatter(*points[:,::-1].T,c='red')
im1 = ax1.imshow(weight,clim=(image.min(),image.max()),extent=extent)
ax1.set(title='Weight',xlim=(left - 1,right + 1),ylim = (upper + 1,lower - 1))
colorbar(im1)
ax2.scatter(*points[:,c='red')
im2 = ax2.imshow(image,extent=extent)
ax2.set(title='Weight x Intensity',lower - 1))
colorbar(im2)
plt.tight_layout()
plt.show()
# If labeling the first point
# ax1.text(*points[0].T,f"({points[0,0]},{points[0,1]})",va='bottom',ha='center',color='red')
# ax2.text(*points[0].T,1]},{intens[0]})",color='red')
解决方法
您想使用 np.add.at
。看,https://numpy.org/doc/stable/reference/generated/numpy.ufunc.at.html
def bilinear_2(points,intensity):
# Create empty matrices,starting from 0 to p.max
w = np.zeros((points[:,0].max().astype(int) + 2,points[:,1].max().astype(int) + 2))
i = np.zeros_like(w)
# Calc weights
floor = np.floor(points)
ceil = floor + 1
upper_diff = ceil - points
lower_diff = points - floor
w1 = upper_diff[:,0] * upper_diff[:,1]
w2 = upper_diff[:,0] * lower_diff[:,1]
w3 = lower_diff[:,1]
w4 = lower_diff[:,1]
# Get indices
ix,iy = floor[:,0].astype(int),floor[:,1].astype(int)
# Use np.add.at. See,https://numpy.org/doc/stable/reference/generated/numpy.ufunc.at.html
np.add.at(w,(ix,iy),w1)
np.add.at(w,iy+1),w2)
np.add.at(w,(ix+1,w3)
np.add.at(w,w4)
np.add.at(i,w1 * intensity)
np.add.at(i,w2 * intensity)
np.add.at(i,w3 * intensity)
np.add.at(i,w4 * intensity)
# Clip (to accomodate image size to be the same as your bilinear function)
iix,iiy = points[:,0].min().astype(int),1].min().astype(int)
i,w = i[iix:,iiy:],w[iix:,iiy:]
return i,w
# At 10_000 samples:
%time image,weight = bilinear(points,intensity)
%time image_2,weight_2 = bilinear_2(points,intensity)
>>>
CPU times: user 178 ms,sys: 3.73 ms,total: 182 ms
Wall time: 185 ms
CPU times: user 9.63 ms,sys: 601 µs,total: 10.2 ms
Wall time: 10 ms
# These tests passes
np.testing.assert_allclose(weight,weight_2)
np.testing.assert_allclose(image,image_2)
# At 100K samples
N_points = 100_000
image_shape = (256,256)
points = rng.random((N_points,2)) * image_shape
intensity = rng.random(N_points)
%time image_2,intensity)
CPU times: user 115 ms,sys: 66 ms,total: 181 ms
Wall time: 181 ms
# At 10M samples
N_points = 10_000_000
image_shape = (256,intensity)
CPU times: user 8.23 s,sys: 656 ms,total: 8.88 s
Wall time: 9.31 s
除此之外,这种方法是不可能的。因为整数数组索引不会增量更新。
例如
a = np.zeros(5)
a[np.array((1,1,2))] += 1
a
>>> array([0.,1.,0.,0.])
但是;
a = np.zeros(5)
np.add.at(a,([1,2]),1)
a
>>> array([0.,2.,0.])
,
感谢@armamut 的好回答!它启发了我看了一下,然后我发现了 np.bincount
,它也是在cupy 中实现的。事实证明,bincount 的实现速度更快,cupy 的实现真的很快!后者可能会进一步改进,因为我必须处理几个元组才能使其工作。
# Timings
points = np.random.random((10_000_000,2)) * (256,256)
intens = np.random.random((10_000_000))
pcupy = cp.asarray(points)
icupy = cp.asarray(intens)
%time bilinear_bincount_cupy(pcupy,icupy)
%time bilinear_bincount_numpy(points,intens)
%time bilinear_2(points,intens)
Wall time: 456 ms
Wall time: 2.57 s
Wall time: 5.37 s
numpy 实现:
def bilinear_bincount_numpy(points,intensities):
"""Bilinear weighting of points onto a grid.
Extent of grid given by min and max of points in each dimension
points should have shape (N,2)
intensity should have shape (N,)
"""
floor = np.floor(points)
ceil = floor + 1
floored_indices = np.array(floor,dtype=int)
low0,low1 = floored_indices.min(0)
high0,high1 = floored_indices.max(0)
floored_indices = floored_indices - (low0,low1)
shape = (high0 - low0 + 2,high1-low1 + 2)
upper_diff = ceil - points
lower_diff = points - floor
w1 = np.prod((upper_diff),axis=1)
w2 = upper_diff[:,0]*lower_diff[:,0]*upper_diff[:,1]
w4 = np.prod((lower_diff),axis=1)
shifts = np.array([[0,0],[0,1],[1,1]])
indices = floored_indices[:,None] + shifts
indices = (indices * (shape[1],1)).sum(-1)
weights = np.array([w1,w2,w3,w4]).T
weight_bins = np.bincount(indices.flatten(),weights=weights.flatten())
intens_bins = np.bincount(indices.flatten(),weights=(intensities[:,None]*weights).flatten())
all_weight_bins = np.zeros(np.prod(shape))
all_intens_bins = np.zeros(np.prod(shape))
all_weight_bins[:len(weight_bins)] = weight_bins
all_intens_bins[:len(weight_bins)] = intens_bins
weight_image = all_weight_bins.reshape(shape)
intens_image = all_intens_bins.reshape(shape)
return intens_image,weight_image
还有cupy的实现:
def bilinear_bincount_cupy(points,intensities):
"""Bilinear weighting of points onto a grid.
Extent of grid given by min and max of points in each dimension
points should be a cupy array of shape (N,2)
intensity should be a cupy array of shape (N,)
"""
floor = cp.floor(points)
ceil = floor + 1
floored_indices = cp.array(floor,high1 = floored_indices.max(0)
floored_indices = floored_indices - cp.array([low0,low1])
shape = cp.array([high0 - low0 + 2,high1-low1 + 2])
upper_diff = ceil - points
lower_diff = points - floor
w1 = upper_diff[:,1]
shifts = cp.array([[0,None] + shifts
indices = (indices * cp.array([shape[1].item(),1])).sum(-1)
weights = cp.array([w1,w4]).T
# These bins only fill up to the highest index - not to shape[0]*shape[1]
weight_bins = cp.bincount(indices.flatten(),weights=weights.flatten())
intens_bins = cp.bincount(indices.flatten(),None]*weights).flatten())
# So we create a zeros array that is big enough
all_weight_bins = cp.zeros(cp.prod(shape).item())
all_intens_bins = cp.zeros_like(all_weight_bins)
# And fill it here
all_weight_bins[:len(weight_bins)] = weight_bins
all_intens_bins[:len(weight_bins)] = intens_bins
weight_image = all_weight_bins.reshape(shape.get())
intens_image = all_intens_bins.reshape(shape.get())
return intens_image,weight_image