Java 多线程 - 有限制的生产者消费者

问题描述

我正在尝试为生产者/消费者编写解决方案,但有限制。

生产者(可能有多个)应该在生产总共 5个值后终止,而消费者(有可能不止一个消费者)应该在所有五个值都被消费后停止。

为此我尝试使用 AtomicInteger

public class InternalQueue {
    
    private static final int MAX_QUEUE_SIZE = 10;
    
    private Queue<Integer> queue = new LinkedList<>();
    
    public AtomicInteger itemsProduced = new AtomicInteger(0);
    
    public AtomicInteger itemsConsumed = new AtomicInteger(0);
        
    public synchronized boolean put(int n) {
        if(itemsProduced.get() < MAX_QUEUE_SIZE) {
            queue.add(n);
            itemsProduced.addAndGet(1);
            return true;
        }
        return false;
    }
    
    
}

但问题是如何让消费者线程知道何时停止?

我想避免例外。

也可能有多个生产者和多个消费者。

有没有更好的方法来做到这一点?

解决方法

也许类似于 BlockingQueue 上的这个包装器?

public class MyQueue<T> {

    private final BlockingQueue<T> queue;
    private final AtomicInteger freeSpace;


    public MyQueue(int size) {
        this.freeSpace = new AtomicInteger(size);
        this.queue = new LinkedBlockingQueue<>(size);
    }

    public boolean produce(T t) {
        if(isDone()) {
            return false;
        }
        
        freeSpace.decrementAndGet();
        return queue.offer(t);
    }

    public boolean isDone() {
        return freeSpace.get() <= 0;
    }

    public Optional<T> consume() {
        return Optional.ofNullable(queue.poll());
    }
}

消费者可以使用 isDone 方法,也可以在 Optional 返回的 consume 为空时停止。请注意,不接受 null

,

您需要队列类来跟踪发布到队列中的项目数量,以及队列何时为空或已满。您还需要对生产者的请求进行排队,以便一次只有一个生产者可以访问该队列。 以下 Ada 示例演示了这一点。 Ada 中的解决方案很简单,因为 Ada 将调用对受保护的对象条目进行排队,确保在任何给定时间只有一个生产者或消费者可以访问。

with Ada.Text_IO; use Ada.Text_IO;

procedure Main is
   protected Limited_Buffer is
      entry Put(Item : in Integer);
      entry Get(Item : out Integer);
      function Is_New return Boolean;
      function At_Limit return Boolean;
   private
      Count : Natural := 0;
      Value : Integer;
      New_Value : Boolean := False;
   end Limited_Buffer;
   
   protected body Limited_Buffer is
      entry Put(Item : in Integer) when not New_Value and then not At_Limit is
      begin
         Value := Item;
         New_Value := True;
         Count := Count + 1;
      end Put;
      entry Get(Item : out Integer) when New_Value is
      begin
         Item := Value;
         New_Value := False;
      end Get;
      function Is_New return boolean is
      begin
         return New_Value;
      end Is_New;
      
      function At_Limit return Boolean is
      begin
         return Count >= 5;
      end At_Limit;
      
   end Limited_Buffer;
   
   task P1;
   
   task body P1 is
      Num : Integer := 1;
   begin
      loop
         select
            Limited_Buffer.Put(Num);
            Num := Num + 1;
         or
            delay 0.001;
         end select;
         exit when Limited_Buffer.At_Limit;
      end loop;
   end P1;
   
   task P2;
   
   task body P2 is
      Num : Integer := 10;
   begin
      loop
         select
            Limited_Buffer.Put(Num);
            Num := Num + 1;
         or
            delay 0.001;
         end select;
         exit when Limited_Buffer.At_Limit;
      end loop;
   end P2;
   
   task type C1;
   
   task body C1 is
      Num : Integer;
   begin
      loop
         select
         Limited_Buffer.Get(Num);
         Put_Line(Num'Image);
         or
            delay 0.001;
         end select;
         exit when Limited_Buffer.At_Limit;
      end loop;
   end C1;
   
   Con1,Con2 : C1;
             
begin
   null;
end Main;

Ada 保护对象,例如上例中的 Limited_Buffer,允许定义三种方法:过程、条目和函数。过程在受保护对象上提供排他性无条件读写锁。条目为受保护对象提供有条件的读写锁。函数是对受保护对象提供共享读锁的只读方法。 此示例使用名为 Put 和 Get 的两个条目以及名为 Is_New 和 At_Limit 的两个函数。受保护对象声明的私有部分定义了受保护对象使用的私有数据。在此示例中,受保护对象中有三个数据项。 Count 是 Natural 的一个实例,它是 Integer 的预定义子类型,最小值为 0。本例中 Count 初始化为 0。值是 Integer 的一个实例。值是存储由 Put 和 Get 条目写入和读取受保护对象的值的数据元素。 New_Value 是初始化为 False 的 Boolean 实例。 New_Value 用于确保写入受保护对象的每个值都只消耗一次。

受保护体 Limited_Buffer 实现了两个条目和两个函数的逻辑。 Ada 总是将受保护对象和任务的规范和实现分开。 每个条目的第一行模仿受保护对象规范,但还添加了边界条件。仅当边界条件评估为 TRUE 时才评估该条目。任务对条目的每次调用都是队列。默认排队策略是 FIFO,确保按调用顺序处理调用。当调用任务在入口队列中时,调用任务被挂起。每个条目都有自己的队列。

该示例继续创建两个名为 P1 和 P2 的生产者任务。这两个任务之间的唯一区别是它们写入队列的数字范围。 我们不希望生产者在 Queue 处理完值限制后永远挂在 Put 入口队列中。 Ada 提供了有条件地调用入口队列的语法。此语法从“select”保留字开始。入口调用完成后,局部变量 Num 会增加。调用的条件性质从“或”保留字开始。入口调用与定时器相关联。如果在进入调用完成之前时间到期,则取消进入调用并重复循环。当 Limited_Buffer.At_Limit 返回 TRUE 时,循环终止。

两个消费者是相同的。因此,我为消费者创建了一个任务类型。稍后在源代码中创建了任务类型的两个实例。消费者反复调用 Limited_Buffer.Get 并打印从缓冲区中检索到的值。此活动也是有条件地完成,因为我们不希望在从缓冲区读取所有值后暂停消费者。

最后创建了消费者任务类型的两个实例。当主过程的执行到达主过程中的 begin 语句时,所有四个任务都开始执行。主过程什么都不做,这就是为什么它唯一的可执行语句是空保留字。

,

看看ArrayBlockingQueue

这是一个经典的“有界缓冲区”,其中一个固定大小的数组保存由生产者插入并由消费者提取的元素。容量一旦创建,就无法更改。尝试将元素放入已满队列将导致操作阻塞;尝试从空队列中获取元素同样会阻塞。