带有生成器/可迭代/迭代器的Python随机样本

问题描述

尽管Martijn Pieters的答案是正确的,但当samplesize变大时,它的确会减慢速度,因为list.insert在循环中使用可能具有二次复杂度。

我认为,这是在提高性能的同时保持一致性的一种替代方法

def iter_sample_fast(iterable, samplesize):
    results = []
    iterator = iter(iterable)
    # Fill in the first samplesize elements:
    try:
        for _ in xrange(samplesize):
            results.append(iterator.next())
    except stopiteration:
        raise ValueError("Sample larger than population.")
    random.shuffle(results)  # Randomize their positions
    for i, v in enumerate(iterator, samplesize):
        r = random.randint(0, i)
        if r < samplesize:
            results[r] = v  # at a decreasing rate, replace random items
    return results

samplesize上面的值开始逐渐显示出差异10000。致电时间(1000000, 100000)

  • iter样本:5.05s
  • iter_sample_fast:2.64秒

解决方法

您知道是否有一种方法可以使pythonrandom.sample与生成器对象一起工作。我试图从一个很大的文本语料库中获取一个随机样本。问题是random.sample()引发以下错误。

TypeError: object of type 'generator' has no len()

我当时在想,也许有某种方法itertools可以解决某些问题,但是经过一点搜索却找不到。

一个有些虚构的例子:

import random
def list_item(ls):
    for item in ls:
        yield item

random.sample( list_item(range(100)),20 )


更新


根据MartinPieters要求,我对当前建议的三种方法进行了一些计时。结果如下。

Sampling 1000 from 10000
Using iterSample 0.0163 s
Using sample_from_iterable 0.0098 s
Using iter_sample_fast 0.0148 s

Sampling 10000 from 100000
Using iterSample 0.1786 s
Using sample_from_iterable 0.1320 s
Using iter_sample_fast 0.1576 s

Sampling 100000 from 1000000
Using iterSample 3.2740 s
Using sample_from_iterable 1.9860 s
Using iter_sample_fast 1.4586 s

Sampling 200000 from 1000000
Using iterSample 7.6115 s
Using sample_from_iterable 3.0663 s
Using iter_sample_fast 1.4101 s

Sampling 500000 from 1000000
Using iterSample 39.2595 s
Using sample_from_iterable 4.9994 s
Using iter_sample_fast 1.2178 s

Sampling 2000000 from 5000000
Using iterSample 798.8016 s
Using sample_from_iterable 28.6618 s
Using iter_sample_fast 6.6482 s

因此,事实证明,array.insert当涉及大样本量时,存在严重的缺陷。我用来计时方法的代码

from heapq import nlargest
import random
import timeit


def iterSample(iterable,samplesize):
    results = []
    for i,v in enumerate(iterable):
        r = random.randint(0,i)
        if r < samplesize:
            if i < samplesize:
                results.insert(r,v) # add first samplesize items in random order
            else:
                results[r] = v # at a decreasing rate,replace random items

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")

    return results

def sample_from_iterable(iterable,samplesize):
    return (x for _,x in nlargest(samplesize,((random.random(),x) for x in iterable)))

def iter_sample_fast(iterable,samplesize):
    results = []
    iterator = iter(iterable)
    # Fill in the first samplesize elements:
    for _ in xrange(samplesize):
        results.append(iterator.next())
    random.shuffle(results)  # Randomize their positions
    for i,v in enumerate(iterator,samplesize):
        r = random.randint(0,i)
        if r < samplesize:
            results[r] = v  # at a decreasing rate,replace random items

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")
    return results

if __name__ == '__main__':
    pop_sizes = [int(10e+3),int(10e+4),int(10e+5),int(10e+5)*5]
    k_sizes = [int(10e+2),int(10e+3),int(10e+4)*2,int(10e+4)*5,int(10e+5)*2]

    for pop_size,k_size in zip(pop_sizes,k_sizes):
        pop = xrange(pop_size)
        k = k_size
        t1 = timeit.Timer(stmt='iterSample(pop,%i)'%(k_size),setup='from __main__ import iterSample,pop')
        t2 = timeit.Timer(stmt='sample_from_iterable(pop,setup='from __main__ import sample_from_iterable,pop')
        t3 = timeit.Timer(stmt='iter_sample_fast(pop,setup='from __main__ import iter_sample_fast,pop')

        print 'Sampling',k,'from',pop_size
        print 'Using iterSample','%1.4f s'%(t1.timeit(number=100) / 100.0)
        print 'Using sample_from_iterable','%1.4f s'%(t2.timeit(number=100) / 100.0)
        print 'Using iter_sample_fast','%1.4f s'%(t3.timeit(number=100) / 100.0)
        print ''

我还进行了一项测试,以检查所有方法是否确实都对发生器进行了无偏向采样。因此,对于所有方法,我都100010000
100000时间上对元素进行采样,并计算出总体中每个项目出现的平均频率,事实证明~.1这三种方法都符合预期。