KMeans 预测的标签上的 dask compute() 问题

问题描述

我正在尝试使用 sklearn MiniBatchKMeans 对相当大的数据集(150k 个样本和 150k 个特征)进行聚类。我想我可以使用 dask_ml 中的增量来使数据更快地适应我的数据块。这是我在虚拟数据集上的代码片段:

    from dask_ml.datasets import make_blobs
    from dask_ml.wrappers import Incremental
    from sklearn.cluster import MiniBatchKMeans
    import dask.array as da
    import dask

    dataset = da.random.random((150000,150000),chunks = (1000,1000))
    kmeans = MiniBatchKMeans(n_clusters = 3)
    inc = Incremental(kmeans).fit(dataset)
    predicted_labels = inc.predict(dataset).compute()
    print(predicted_labels) 

进程在计算()步骤被终止。我不认为在 150k 点上运行计算()会如此密集。它因这个奇怪的错误而失败:

ValueError: X has 150000 features,but MiniBatchKMeans is expecting 1000 
features as input.

我不明白 MiniBatchKMeans 特征的大小与标签上的计算()有什么关系

编辑 在第一个回答之后,我想澄清一下,我在标签上使用了 compute()(不是数据集!),因为我需要它们进行一些绘图操作。这些值需要在 RAM 上才能被 matplotlib 函数使用。

(150k,) 的数组应该能够轻松地放在 RAM 上,我不知道为什么会失败!

解决方法

由于 compute() 方法会将整个数据集带到本地 RAM,因此该过程会中断。请参阅documentation

您可以通过调用 .compute() 方法或 dask.compute(...) 函数将任何 dask 集合转换为具体值。

进一步:

但是,如果您尝试将整个数据集带回本地 RAM,这种方法通常会失败。

>>> df.compute() # MemoryError(...)

这也是为什么您会收到大小不匹配的值错误的原因,因为您将一次将整个数据集传递给 predict() 方法,而不是一个接一个地传递较小的块。删除 compute() 语句,它会正常工作:

predicted_labels = inc.predict(dataset)

由于您的目标似乎是“让事情变得更快”,还请注意以下几点:

Dask Array 的每个块都被馈送到底层估算器的 partial_fit 方法。 训练完全是顺序的,因此您不会注意到并行性带来的大量训练时间加速。在分布式环境中,您应该注意到避免额外 IO 带来的一些加速,以及模型通常比数据小得多的事实,因此在机器之间移动速度更快。

(来自here