如何使用自定义编解码器将 numpy 数组保存为字节?

问题描述

我有一个 int8 类型和 (100,100) 形状的 numpy 数组。我已经使用霍夫曼编码来找到一种最有效的方式来编码其内容。对于我的特定用例,我尝试保存的值大致正态分布在 0 左右,标准差约为 5,但此问题适用于以最佳方式保存具有任何分布的 ndarray。就我而言,在极少数情况下可以观察到高达 -20 或 20 的极值。显然,霍夫曼编码这种类型的数组比使用标准的 8 位整数更节省空间。我的问题是如何做到这一点。

我尝试使用 np.save() 和 Pickle,但无法获得我想要的效率。具体来说,对于形状为 (100,100) 的数组,我使用 np.save() 获得了一个大小为 10,128 字节的文件,这对于每个整数 1 个字节加上开销是有意义的。使用pickle,我得到一个大小为10,158 字节的文件,大致相同。但是,根据我的霍夫曼编码方案,我应该能够在我的特定测试用例(如下所示)中以 144 个字节对数组内容进行编码!! (不包括开销)

我尝试将数组中的每个整数映射到其最佳字节字符串,所以我有一个字节数组(类型 S12),然后保存它,但我得到了一个同时使用 np.save() 和 pickle 的 118kb 文件,所以这显然不起作用。

感谢您的帮助!

重现我的确切测试用例的代码

import pickle
import numpy as np

# Seed random number generator
np.random.seed(1234)

# Build random normal array and round it 
test_array = np.random.normal(0,5,size=(100,100))
test_array = np.round(test_array).astype(np.int8)

# Set encoding dictionary
encoding_dict = {6: b'0000',-8: b'00010',8: b'00011',5: b'0010',-5: b'0011',12: b'0100000',-13: b'01000010',14: b'010000110',-15: b'0100001110',-14: b'0100001111',10: b'010001',7: b'01001',-4: b'0101',4: b'0110',-7: b'01110',11: b'0111100',-11: b'0111101',-12: b'01111100',13: b'011111010',-19: b'011111011000',-18: b'011111011001',-16: b'01111101101',-17: b'011111011100',16: b'011111011101',15: b'01111101111',-10: b'0111111',-3: b'1000',-6: b'10010',-9: b'100110',9: b'100111',3: b'1010',-2: b'1011',1: b'1100',2: b'1101',-1: b'1110',0: b'1111'}

# Save using different methods
np.save('test.npy',test_array)
with open('test.pkl','wb') as file:
    pickle.dump(test_array,file)

# Try converting to bytes and then saving
bytes_array = np.array([encoding_dict[key] for key in test_array.flatten()]).reshape(test_array.shape)
np.save('test_bytes.npy',bytes_array)
with open('test_bytes.pkl','wb') as file:
    pickle.dump(bytes_array,file)

# See how many bytes it should take
tmp_flat = test_array.flatten()
tmp_bytes = np.zeros_like(tmp_flat)
for i in range(len(tmp_bytes)):
    tmp_bytes[i] = len(encoding_dict[tmp_flat[i]]) / 8
print(tmp_bytes.sum())

解决方法

您无法对您描述的数据进行 70 倍的压缩。你为什么这么认为?

即使输入字节被限制为四个值,你能得到的最好结果是四倍压缩。 (8 位超过 2 位。)您有一个正态分布,其中 10 或 11 个值正好在 ±1 sigma 之内。

也许您可以通过统计数据对随机字节进行两倍压缩。在美好的一天。

更新:

刚刚计算了分布的熵,假设标准差为 5。我得到每个样本的熵为 4.37 位。所以我对两倍的估计过于乐观。更像是 1.8 的因数。

顺便说一下,您不需要手动执行此操作。您可以将 zlibZ_HUFFMAN_ONLY 策略一起使用。它将为您生成最佳霍夫曼代码。

,

我没有使用过这种编码,但我怀疑您的 144 字节是否是一个准确的度量。

您的 bytes_array 是 100 个元素,每个元素 12 个字节 ('S12'),或者是 test_array 大小的 12 倍(每个元素 1 个字节)。

如果我们列一个清单:

In [440]: alist = [encoding_dict[key] for key in test_array.flatten()]
In [441]: len(alist)
Out[441]: 10000
In [442]: alist[:10]
Out[442]: 
[b'1101',b'10010',b'01001',b'1011',b'0101',b'0110',b'1000',b'1111',b'0111101']

并查看这些字符串的长度:

In [444]: sum([len(i) for i in alist])
Out[444]: 43938

每个元素平均 4 个字节。即使我们能以某种方式将这些字节转换为位,也只是 50% 的压缩:

In [445]: _/8
Out[445]: 5492.25
,

您的错误在这里:

tmp_bytes = np.zeros_like(tmp_flat)

tmp_flat 是一个 int8 数组,因此语句 tmp_bytes[i] = len(encoding_dict[tmp_flat[i]]) / 8 在将浮点值转换为 int 时会截断很多数字。用以下内容替换有问题的行:

tmp_bytes = np.zeros(tmp_flat.shape,np.single)

但是为了展示如何实际进行压缩:我建议使用 np.packbits,它实际上会为您创建一个 5493 字节的数组。

# Make a string of all the data
s = b''.join(encoding_dict[key] for key in test_array.ravel())
# Convert the string into an array
a = np.array(list(map(int,s.decode('ascii'))))
# pack it
result = np.packbits(a)

语句 a = ... 做了很多额外的工作,因为它解码数据,然后复制它,然后将字符串转换为整数无数次等等。这是一个更长但效率更高的语句方法:

s = bytearray(b'').join(encoding_dict[key] for key in test_array.ravel())
a = np.array(s)
a -= ord('0')    # A now contains just 0 and 1
result = np.packbits(a)

当您存储此数组时,请确保包含您预期的位数,而不是字节数。您可以使用 np.unpackbits 解包为二进制字符串,它支持专门用于此目的的 count 参数(顺便说一下,my addition)。

最后一点,尽可能使用 ravel 而不是 flatten。后者总是复制,而前者一般不会。