如何将密钥分发到存储桶中,但要确保每个存储桶至少有一个密钥?

问题描述

我有一组“键”和“桶”,其中 num_keys > num_buckets,但它们的数量级大致相同。

一个具体的例子,“key”类似于任务,“bucket”类似于线程。我想尽可能均匀地将任务分配给线程,同时确保没有线程空闲。

例如,我有 400 个任务和 200 个线程:

task1,task2,...,task400
thread1,thread1,... thread200

我想将任务统一分配给线程:

thread1 -> task50,task394
thread2 -> task37,task250,task324
...
thread200 -> task20

这些是我编造的数字,但要清楚的是,没有空闲线程对我来说更重要;我不太关心拥有完美的分布。

其他一些对我很重要的属性

  • 对特定的存储桶没有偏见。
  • 无论键的数量如何,都应始终将相同的键分配给同一个存储桶。如果桶的数量发生变化,分配也可以改变(因此不需要某种一致的散列方案)。
  • 分发应独立进行,无需外部协调。

目前,我正在尝试使用 MD5 执行此操作:

BigInteger numThreads = BigInteger.valueOf(200); // buckets
BigInteger currentThreadId = BigInteger.valueOf(1); 
List<Task> tasks = getGlobalTasks(); // keys
List<Task> myTasks = new ArrayList<>();

for (Task task : tasks) {
    BigInteger hash = new BigInteger(MessageDigest.getInstance("MD5").digest(task.getId()));
    BigInteger bucketId = hash.mod(numThreads);
    if (currentThreadId.equals(bucketId)) {
      myTasks.add(task);
    }
}

我怀疑如果键和桶的数量之间存在更大的差异,这种方法会正常工作。然而,使用当前的方案,我的大约 35 个线程处于空闲状态,因为它没有被分配任何键。我猜这是由于与 MD5 哈希分布的差异造成的。

那么问题来了,有没有什么办法可以重构这段代码,保证每个bucket/thread至少接收一个任务?

注意:我实际上想做的是以一种对 AWS Kinesis Shards 具有弹性的方式将 Apache Storm Spouts 分配给 resharding(这意味着更改分片数)。不过,这些细节对于这个问题并不重要。

这是上面编译/运行的代码示例的示例:

  public static void main(String [] args) throws NoSuchAlgorithmException {
    BigInteger numBuckets= BigInteger.valueOf(20); // buckets
    int numTasks = 40;

    List<BigInteger> tasks = new ArrayList<>(); // keys
    for (int i = 0; i < numTasks; i++) {
      tasks.add(BigInteger.valueOf((int)(Math.random() * 5000)));
    }

    Map<BigInteger,List<BigInteger>> bucketToTasks = new HashMap<>();
    for (BigInteger task : tasks) {
        String taskStr = task.toString();
        BigInteger hash = new BigInteger(MessageDigest.getInstance("MD5").digest(taskStr.getBytes()));
        BigInteger bucketId = hash.mod(numBuckets);
        if (!bucketToTasks.containsKey(bucketId)) {
          bucketToTasks.put(bucketId,new ArrayList<>());
        }
        bucketToTasks.get(bucketId).add(task);
    }

    for (int i = 0; i < numBuckets.intValue(); i++){
      System.out.println("Bucket: " + i + ",Tasks: " + bucketToTasks.get(BigInteger.valueOf(i)));
    }
  }

输出

Bucket: 0,Tasks: [2459,2922]
Bucket: 1,Tasks: [21]
Bucket: 2,Tasks: [453]
Bucket: 3,Tasks: [494]
Bucket: 4,Tasks: null
Bucket: 5,Tasks: [1557,2355]
Bucket: 6,Tasks: [4145,1693]
Bucket: 7,Tasks: null
Bucket: 8,Tasks: null
Bucket: 9,Tasks: [1016,4556]
Bucket: 10,Tasks: [3739]
Bucket: 11,Tasks: [4644,3295]
Bucket: 12,Tasks: [2773,3983,2724]
Bucket: 13,Tasks: [1663]
Bucket: 14,Tasks: [2593,2593,1457,3023,1817,4353,4196]
Bucket: 15,Tasks: [2258,802,1530]
Bucket: 16,Tasks: [1882,4765,1969]
Bucket: 17,Tasks: [4882,3030,3429]
Bucket: 18,Tasks: [1666,4377,1714,4109]
Bucket: 19,Tasks: [819,7]

如您所见,有些桶是完全空的。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)