如何在C ++中高效地生成排序均匀分布的随机数?

问题描述

我想在C ++中生成大量n(即n >= 1,000,000)排序并均匀分布的随机数。

我认为的第一简单方法是

  1. 使用n依次生成std::uniform_real_distribution<double>个均匀分布的数字,
  2. 然后使用std::sort对它们进行排序。

但是,这需要几分钟。

一种 second 更为复杂的方法是并行执行两个步骤,如下所示:

template <typename T>
void computeUniformDistribution(std::vector<T>& elements)
{
    #pragma omp parallel
    {
        std::seed_seq seed{distribution_seed,static_cast<size_t>(omp_get_thread_num())};
        std::mt19937 prng = std::mt19937(seed);
        std::uniform_real_distribution<double> uniform_dist(0,std::numeric_limits<T>::max());

        #pragma omp for
        for (size_t i = 0; i < elements.size(); ++i)
        {
            elements[i] = static_cast<T>(uniform_dist(prng));
        }
    }

    std::sort(std::execution::par_unseq,elements.begin(),elements.end());
}

但是,这大约需要大约 30 秒。鉴于生成均匀分布的数字仅需要大约 1.5 秒,瓶颈仍然是排序阶段。

因此,我想问以下问题:如何才能有效地生成以排序方式均匀分布的数据?

解决方法

有一些方法可以生成已经排序的样本,但我认为生成部分排序的样本可能会更好。

将输出范围划分为k个等宽的桶。每个存储桶中的样本数将具有概率相等的多项式分布。采样多项式分布的较慢方法是在[0,k)中生成n个整数。一种更有效的方法是以比率n / k提取k个泊松样本,其总和不超过n,然后使用慢速方法添加另一个n-和样本。对泊松分布进行采样很难做到完美,但是当n / k非常大时(如此处所示),通过对均值和方差为n / k的正态分布进行四舍五入可以很好地近似泊松分布。如果那是不可接受的,那么慢速方法确实可以很好地并行化。

给出存储桶计数,计算前缀总和以找到存储桶边界。对于并行的每个存储桶,请在存储桶范围内生成给定数量的样本并将其排序。如果我们很好地选择n / k,则几乎可以肯定每个存储桶都适合L1缓存。对于n = 1e9,我想尝试k = 1e5或k = 1e6。

这是一个顺序实现。由于我们确实需要避免对封闭的存储桶边界进行2倍的过采样,因此略有修饰,但我将留给您。我对OMP不熟悉,但是我认为您可以通过在SortedUniformSamples的末尾为for循环添加一个杂注来获得一个不错的并行实现。

#include <algorithm>
#include <cmath>
#include <iostream>
#include <numeric>
#include <random>
#include <span>
#include <vector>

template <typename Dist,typename Gen>
void SortedSamples(std::span<double> samples,Dist dist,Gen& gen) {
  for (double& sample : samples) {
    sample = dist(gen);
  }
  std::sort(samples.begin(),samples.end());
}

template <typename Gen>
void ApproxMultinomialSample(std::span<std::size_t> samples,std::size_t n,Gen& gen) {
  double lambda = static_cast<double>(n) / samples.size();
  std::normal_distribution<double> approx_poisson{lambda,std::sqrt(lambda)};
  std::size_t sum;
  do {
    for (std::size_t& sample : samples) {
      sample = std::lrint(approx_poisson(gen));
    }
    sum = std::accumulate(samples.begin(),samples.end(),std::size_t{0});
  } while (sum > n);
  std::uniform_int_distribution<std::size_t> uniform{0,samples.size() - 1};
  for (; sum < n; sum++) {
    samples[uniform(gen)]++;
  }
}

template <typename Gen>
void SortedUniformSamples(std::span<double> samples,Gen& gen) {
  static constexpr std::size_t kTargetBucketSize = 1024;
  if (samples.size() < kTargetBucketSize) {
    SortedSamples(samples,std::uniform_real_distribution<double>{0,1},gen);
    return;
  }
  std::size_t num_buckets = samples.size() / kTargetBucketSize;
  std::vector<std::size_t> bucket_counts(num_buckets);
  ApproxMultinomialSample(bucket_counts,samples.size(),gen);
  std::vector<std::size_t> prefix_sums(num_buckets + 1);
  std::partial_sum(bucket_counts.begin(),bucket_counts.end(),++prefix_sums.begin());
  for (std::size_t i = 0; i < num_buckets; i++) {
    SortedSamples(std::span<double>{&samples[prefix_sums[i]],&samples[prefix_sums[i + 1]]},std::uniform_real_distribution<double>{
                      static_cast<double>(i) / num_buckets,static_cast<double>(i + 1) / num_buckets},gen);
  }
}

int main() {
  std::vector<double> samples(100000000);
  std::default_random_engine gen;
  SortedUniformSamples(samples,gen);
  if (std::is_sorted(samples.begin(),samples.end())) {
    std::cout << "sorted\n";
  }
}

如果您的标准库具有poisson_distribution的高质量实现,您也可以这样做:

template <typename Gen>
void MultinomialSample(std::span<std::size_t> samples,Gen& gen) {
  double lambda = static_cast<double>(n) / samples.size();
  std::poisson_distribution<std::size_t> poisson{lambda};
  std::size_t sum;
  do {
    for (std::size_t& sample : samples) {
      sample = poisson(gen);
    }
    sum = std::accumulate(samples.begin(),samples.size() - 1};
  for (; sum < n; sum++) {
    samples[uniform(gen)]++;
  }
}
,

我很想依靠这样一个事实,即一组均匀分布的变量的排序集中的连续元素之间的差异是指数分布的。可以利用它在O(N)时间而不是O(N*log N)时间内运行。

快速实施将执行以下操作:

template<typename T> void
computeSorteUniform2(std::vector<T>& elements)
{
    std::random_device rd;
    std::mt19937 prng(rd());

    std::exponential_distribution<T> dist(static_cast<T>(1));

    auto sum = dist(prng);

    for (auto& elem : elements) {
        elem = sum += dist(prng);
    }

    sum += dist(prng);

    for (auto& elem : elements) {
        elem /= sum;
    }
}

通过假设您要使用Uniform(0,1)中的值来简化此示例,但是应该易于推广。使用OMP进行这项工作并不是一件容易的事,但也不应该太困难。

如果您关心最后的〜50%性能,则可以使用一些数字技巧来加快生成随机偏差(例如,比MT更快,更好的PRNG)以及将它们转换为double s(但是最近的编译器可能知道这些技巧)。一些参考:Daniel Lemire's blogMelissa O'Neill's PCG site

我刚刚对此进行了基准测试,发现c的std::uniform_real_distributionstd::exponential_distribution都非常慢。 numpy's Ziggurat based implementations的速度提高了8倍,因此,我可以使用笔记本电脑上的一个线程在10秒钟内生成1e9 double(即std的实现需要80秒钟)算法。我没有尝试过OP在1e9元素上的实现,但是使用1e8元素时,我的速度要快15倍。

,

我进行了一些测试,根据系统的不同,基数排序速度是std :: sort的4到6倍,但这需要第二个向量,对于1 GB的元素,每个双精度向量为8 GB,对于总共16 GB的可用内存,因此您可能需要32 GB的RAM。

如果排序不受内存带宽的限制,则多线程基数排序可能会有所帮助。

示例单线程代码:

#include <algorithm>
#include <iostream>
#include <random>
#include <vector>
#include <time.h>

clock_t ctTimeStart;            // clock values
clock_t ctTimeStop;

typedef unsigned long long uint64_t;

//  a is input array,b is working array
uint64_t * RadixSort(uint64_t * a,uint64_t *b,size_t count)
{
uint32_t mIndex[8][256] = {0};          // count / index matrix
uint32_t i,j,m,n;
uint64_t u;
    for(i = 0; i < count; i++){         // generate histograms
        u = a[i];
        for(j = 0; j < 8; j++){
            mIndex[j][(size_t)(u & 0xff)]++;
            u >>= 8;
        }
    }
    for(j = 0; j < 8; j++){             // convert to indices
        m = 0;
        for(i = 0; i < 256; i++){
            n = mIndex[j][i];
            mIndex[j][i] = m;
            m += n;
        }
    }
    for(j = 0; j < 8; j++){             // radix sort
        for(i = 0; i < count; i++){     //  sort by current LSB
            u = a[i];
            m = (size_t)(u>>(j<<3))&0xff;
            b[mIndex[j][m]++] = u;
        }
        std::swap(a,b);                //  swap ptrs
    }
    return(a);
}

#define COUNT (1024*1024*1024)

int main(int argc,char**argv)
{
    std::vector<double> v(COUNT);       // vctr to be generated
    std::vector<double> t(COUNT);       // temp vector
    std::random_device rd;
    std::mt19937 gen(rd());
//  std::uniform_real_distribution<> dis(0,std::numeric_limits<double>::max());
    std::uniform_real_distribution<> dis(0,COUNT);
    ctTimeStart = clock();
    for(size_t i = 0; i < v.size(); i++)
        v[i] = dis(gen);
    ctTimeStop = clock();
    std::cout << "# of ticks " << ctTimeStop - ctTimeStart << std::endl;
    ctTimeStart = clock();
//  std::sort(v.begin(),v.end());
    RadixSort((uint64_t *)&v[0],(uint64_t *)&t[0],COUNT);
    ctTimeStop = clock();
    std::cout << "# of ticks " << ctTimeStop - ctTimeStart << std::endl;
    return(0);
}

如果包含负值的排序加倍(转换为64位无符号整数),则需要将其视为正负+大小64位整数。用于将符号+幅度(SM)与64位无符号整数(ULL)之间进行转换的C ++宏:

// converting doubles to unsigned long long for radix sort or something similar
// note -0 converted to 0x7fffffffffffffff,+0 converted to 0x8000000000000000
// -0 is unlikely to be produced by a float operation

#define SM2ULL(x) ((x)^(((~(x) >> 63)-1) | 0x8000000000000000ull))
#define ULL2SM(x) ((x)^((( (x) >> 63)-1) | 0x8000000000000000ull))
,

有一个简单的观察,涉及[0,1]中排序的统一随机数:

  1. 每个统一的[0,1]数均可能小于一半或大于一半。因此,小于一半大于大于一半的统一[0,1]数目遵循二项式(n,1/2)分布。
  2. 在小于一半的数字中,每个数字小于1/4的可能性大于大于1/4的可能性,因此小于1/4与大于1 / 4个数字遵循相同的分布。
  3. 依此类推。

因此,每个数字可以一次生成一位,即在二进制点之后从左到右。这是如何生成 n 个排序的统一随机数的草图:

  1. 如果 n 为0或1,则停止。否则,生成 b ,一个二项式( n ,1/2)随​​机数。
  2. 在第一个 b 随机数后面附加0,在其余的数字后面附加1。
  3. 在第一个 b 数字上递归运行此算法,但使用 n = b
  4. 在其余数字上递归运行此算法,但使用 n = n - b

在这一点上,我们有一个排序的随机数列表,它具有变化的位数。剩下要做的就是根据需要用统一的随机比特填充每个数字(或截去或舍入多余的比特),以得到数字 p 个比特(例如,双精度为53个比特)。然后,将每个数字除以2 p

我给出一个similar algorithm,以从 n 个随机数中找到最小的 k 个。

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...