这个 ConcurrentLinkedQueue/wait/notify 算法是否正确?

问题描述

我知道通常对于像这样的生产者/消费者对,应该使用阻塞队列。我在这里只想了解Java中更好的内存一致性,并发数据结构和锁之间的交互,以及确定ConcurrentLinkedQueue大小时的不精确性。

问题是,下面的算法是否确保所产生的任何东西都被消耗掉了,就像在普通的非线程安全队列的情况下一样?注意:我运行了几次,总是如此。

import java.util.concurrent.ConcurrentLinkedQueue;

public class Produce extends Thread {
    @Override
    public void run() {
        synchronized(Main.queue) {
            Main.queue.add(1);
            Main.queue.notifyAll();
        }
    }
}
public class Consume extends Thread {
    @Override
    public void run() {
        synchronized(Main.queue) {
            while(true) {
                while(!Main.queue.isEmpty()) {
                    Main.queue.poll();
                    System.out.println("consumed");
                }
                System.out.println("empty");
                try {
                    Main.queue.wait();
                } catch(InterruptedException e) {
                }
            }
        }
    }
}
public class Main {
    public static final ConcurrentLinkedQueue<Integer> queue =
            new ConcurrentLinkedQueue();
    public static void main(String[] args) {
        (new Consume()).start();
        (new Produce()).start();
    }
}

解决方法

您的问题的答案是肯定的。消费者将看到所有更新。

然而:

  1. 这不是一个明智的实现。看起来您正在使用带等待/通知的轮询方法,这样您就不需要忙循环来等待队列变为非空。但更好(更简单、更有效)的方法是使用 BlockingQueue 并使用阻塞 get() 方法。

    就其价值而言,您通过将队列对象用作互斥体来执行等待/通知信令,从而否定了使用 ConcurrentLinkedQueue 的任何可能可扩展性优势。 (如果您使用不同的对象作为互斥锁,这也适用。问题在于互斥!)

  2. 如果您打算这样做(无论出于何种原因),notify()notifyAll() 更可取。只有一个消费者能够使用您添加到队列中的(单个)元素。唤醒所有消费者是不必要的。

  3. 扩展 Thread 不是一个好主意。更好的方法是将您的业务逻辑放入作为 Runnable 构造函数参数传递的 Thread(或 lambda)中。阅读:"implements Runnable" vs "extends Thread" in Java


您还对以下内容感兴趣:

... 确定 ConcurrentLinkedQueue 大小时的不精确性到底是多少。

答案在 ConcurrentLinkedQueuejavadoc 中:

“请注意,与大多数集合不同,此方法不是恒定时间操作。由于这些队列的异步性质,确定当前元素数量需要 O(n) 遍历。”

“另外,如果在执行此方法期间添加或删除元素,返回的结果可能不准确。因此,此方法在并发应用程序中通常不是很有用。”

换句话说,ConcurrentLinkedQueue 计算队列元素,如果同时添加和删除元素,则不会给出准确的答案。