问题描述
我正在尝试使用 sklearn MiniBatchKMeans 对相当大的数据集(150k 个样本和 150k 个特征)进行聚类。我想我可以使用 dask_ml 中的增量来使数据更快地适应我的数据块。这是我在虚拟数据集上的代码片段:
from dask_ml.datasets import make_blobs
from dask_ml.wrappers import Incremental
from sklearn.cluster import MiniBatchKMeans
import dask.array as da
import dask
dataset = da.random.random((150000,150000),chunks = (1000,1000))
kmeans = MiniBatchKMeans(n_clusters = 3)
inc = Incremental(kmeans).fit(dataset)
predicted_labels = inc.predict(dataset).compute()
print(predicted_labels)
进程在计算()步骤被终止。我不认为在 150k 点上运行计算()会如此密集。它因这个奇怪的错误而失败:
ValueError: X has 150000 features,but MiniBatchKMeans is expecting 1000
features as input.
我不明白 MiniBatchKMeans 特征的大小与标签上的计算()有什么关系
编辑 在第一个回答之后,我想澄清一下,我在标签上使用了 compute()(不是数据集!),因为我需要它们进行一些绘图操作。这些值需要在 RAM 上才能被 matplotlib 函数使用。
(150k,) 的数组应该能够轻松地放在 RAM 上,我不知道为什么会失败!
解决方法
由于 compute()
方法会将整个数据集带到本地 RAM,因此该过程会中断。请参阅documentation:
您可以通过调用 .compute()
方法或 dask.compute(...)
函数将任何 dask 集合转换为具体值。
进一步:
但是,如果您尝试将整个数据集带回本地 RAM,这种方法通常会失败。
>>> df.compute() # MemoryError(...)
这也是为什么您会收到大小不匹配的值错误的原因,因为您将一次将整个数据集传递给 predict()
方法,而不是一个接一个地传递较小的块。删除 compute()
语句,它会正常工作:
predicted_labels = inc.predict(dataset)
由于您的目标似乎是“让事情变得更快”,还请注意以下几点:
Dask Array 的每个块都被馈送到底层估算器的 partial_fit
方法。 训练完全是顺序的,因此您不会注意到并行性带来的大量训练时间加速。在分布式环境中,您应该注意到避免额外 IO 带来的一些加速,以及模型通常比数据小得多的事实,因此在机器之间移动速度更快。
(来自here)