问题描述
我正在编写一个程序,用于计算数组中 0 到 100 万(不包括)之间整数的平方。我最多使用 8 个线程(包括在内)。
为了索引数组,我使用原子整数。
无论线程数如何,我都希望 run 方法中的 for 循环体执行一百万次。
为了计算它执行了多少次,我使用了另一个原子整数。
static AtomicInteger cnt = new AtomicInteger(0);
static AtomicInteger ii = new AtomicInteger(0);
static long[] arr = new long[1_000_000];
public static class Worker extends Thread {
public static void main(String[] args) throws IOException,InterruptedException {
int maxThreads = Runtime.getRuntime().availableProcessors() * 2;
for (int n = 1; n <= maxThreads; n++) {
int numThreads = n;
Thread[] threads = new Thread[numThreads];
ii = new AtomicInteger(0);
for (int i = 0; i < threads.length; i++) {
threads[i] = new Worker();
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
System.out.printf("%d threads,cnt is %d\n",threads.length,cnt.get());
cnt.set(0);
}
}
@Override
public void run() {
for (int i = ii.get(); i < arr.length; i = ii.getAndIncrement()) {
arr[i] = (long)i*i;
cnt.getAndIncrement();
}
}
}
预期的执行结果是:
@H_404_5@1 threads,cnt is 1000000
2 threads,cnt is 1000000
3 threads,cnt is 1000000
4 threads,cnt is 1000000
5 threads,cnt is 1000000
6 threads,cnt is 1000000
7 threads,cnt is 1000000
8 threads,cnt is 1000000
但是在运行时我得到以下信息:
@H_404_5@1 threads,cnt is 1000001
2 threads,cnt is 1000002
3 threads,cnt is 1000003
4 threads,cnt is 1000002
5 threads,cnt is 1000003
6 threads,cnt is 1000002
7 threads,cnt is 1000002
8 threads,cnt is 1000005
你能帮我调试吗?
解决方法
有一些小问题,例如运行之间的 ii = new AtomicInteger(0)
赋值或抛出 IOException
的声明永远不会发生。虽然对 ii
的赋值在这里没有影响,因为它发生在没有线程访问 ii
的点上,它可能会分散注意力,因为它偏离了多线程代码的既定代码模式。您应该像在两次运行之间重置 ii
一样重置 cnt
。
实际问题是循环的起点:
for (int i = ii.get(); i < arr.length; i = ii.getAndIncrement()) {
您正在使用 get
进行读取而不增加值,因此多个线程可能会读取相同的值,从而导致后续 arr[i] = (long)i*i;
的数据竞争(此处未引起注意,但当然应该是避免)并执行比必要更多的迭代,正如您注意到的,由于 cnt
更新。您应该像后续迭代一样使用 getAndIncrement()
作为初始索引,以确保每个线程访问不同的数组索引。
static final AtomicInteger cnt = new AtomicInteger(0);
static final AtomicInteger ii = new AtomicInteger(0);
static final long[] arr = new long[1_000_000];
public static class Worker extends Thread {
public static void main(String[] args) throws InterruptedException {
int maxThreads = Runtime.getRuntime().availableProcessors() * 2;
for (int numThreads = 1; numThreads <= maxThreads; numThreads++) {
Thread[] threads = new Thread[numThreads];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Worker();
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
System.out.printf("Used %d threads,cnt is %d\n",numThreads,cnt.get());
cnt.set(0);
ii.set(0);
}
}
@Override
public void run() {
for(int i = ii.getAndIncrement(); i < arr.length; i = ii.getAndIncrement()) {
arr[i] = (long)i*i;
cnt.getAndIncrement();
}
}
}
Used 1 threads,cnt is 1000000
Used 2 threads,cnt is 1000000
Used 3 threads,cnt is 1000000
Used 4 threads,cnt is 1000000
Used 5 threads,cnt is 1000000
Used 6 threads,cnt is 1000000
Used 7 threads,cnt is 1000000
Used 8 threads,cnt is 1000000
Used 9 threads,cnt is 1000000
Used 10 threads,cnt is 1000000
Used 11 threads,cnt is 1000000
Used 12 threads,cnt is 1000000
Used 13 threads,cnt is 1000000
Used 14 threads,cnt is 1000000
Used 15 threads,cnt is 1000000
Used 16 threads,cnt is 1000000
请注意,复杂的并行处理框架不使用这种原子索引更新,而是在启动线程之前根据预期的目标并行度将范围拆分为大小相等的子范围,因此每个线程可以使用普通的局部索引变量。
您可以在使用 Arrays.parallelSetAll(arr,i -> (long)i*i);
或 Stream API 时免费获得。
你为什么要这样做:
ii = new AtomicInteger(0);
在你的主循环中?整个程序有 1 个名为 ii
的变量,但它被分配了一个轮播值,同时在两者之间启动线程,这意味着每个线程都有一个(核心 * 2)个线程中的任意一个,因此,您的应用的确切工作方式取决于月相。
只需删除该行。
另请注意,您循环了 n
次“可用处理器 *2”次,然后 在 循环内循环了 n 次。那肯定不是你的本意。如果是,那么您的变量名就是在撒谎。