问题描述
我正在尝试为生产者/消费者编写解决方案,但有限制。
生产者(可能有多个)应该在生产总共 5个值后终止,而消费者(有可能不止一个消费者)应该在所有五个值都被消费后停止。
为此我尝试使用 AtomicInteger
:
public class InternalQueue {
private static final int MAX_QUEUE_SIZE = 10;
private Queue<Integer> queue = new LinkedList<>();
public AtomicInteger itemsProduced = new AtomicInteger(0);
public AtomicInteger itemsConsumed = new AtomicInteger(0);
public synchronized boolean put(int n) {
if(itemsProduced.get() < MAX_QUEUE_SIZE) {
queue.add(n);
itemsProduced.addAndGet(1);
return true;
}
return false;
}
}
但问题是如何让消费者线程知道何时停止?
我想避免例外。
也可能有多个生产者和多个消费者。
有没有更好的方法来做到这一点?
解决方法
也许类似于 BlockingQueue
上的这个包装器?
public class MyQueue<T> {
private final BlockingQueue<T> queue;
private final AtomicInteger freeSpace;
public MyQueue(int size) {
this.freeSpace = new AtomicInteger(size);
this.queue = new LinkedBlockingQueue<>(size);
}
public boolean produce(T t) {
if(isDone()) {
return false;
}
freeSpace.decrementAndGet();
return queue.offer(t);
}
public boolean isDone() {
return freeSpace.get() <= 0;
}
public Optional<T> consume() {
return Optional.ofNullable(queue.poll());
}
}
消费者可以使用 isDone
方法,也可以在 Optional
返回的 consume
为空时停止。请注意,不接受 null
值
您需要队列类来跟踪发布到队列中的项目数量,以及队列何时为空或已满。您还需要对生产者的请求进行排队,以便一次只有一个生产者可以访问该队列。 以下 Ada 示例演示了这一点。 Ada 中的解决方案很简单,因为 Ada 将调用对受保护的对象条目进行排队,确保在任何给定时间只有一个生产者或消费者可以访问。
with Ada.Text_IO; use Ada.Text_IO;
procedure Main is
protected Limited_Buffer is
entry Put(Item : in Integer);
entry Get(Item : out Integer);
function Is_New return Boolean;
function At_Limit return Boolean;
private
Count : Natural := 0;
Value : Integer;
New_Value : Boolean := False;
end Limited_Buffer;
protected body Limited_Buffer is
entry Put(Item : in Integer) when not New_Value and then not At_Limit is
begin
Value := Item;
New_Value := True;
Count := Count + 1;
end Put;
entry Get(Item : out Integer) when New_Value is
begin
Item := Value;
New_Value := False;
end Get;
function Is_New return boolean is
begin
return New_Value;
end Is_New;
function At_Limit return Boolean is
begin
return Count >= 5;
end At_Limit;
end Limited_Buffer;
task P1;
task body P1 is
Num : Integer := 1;
begin
loop
select
Limited_Buffer.Put(Num);
Num := Num + 1;
or
delay 0.001;
end select;
exit when Limited_Buffer.At_Limit;
end loop;
end P1;
task P2;
task body P2 is
Num : Integer := 10;
begin
loop
select
Limited_Buffer.Put(Num);
Num := Num + 1;
or
delay 0.001;
end select;
exit when Limited_Buffer.At_Limit;
end loop;
end P2;
task type C1;
task body C1 is
Num : Integer;
begin
loop
select
Limited_Buffer.Get(Num);
Put_Line(Num'Image);
or
delay 0.001;
end select;
exit when Limited_Buffer.At_Limit;
end loop;
end C1;
Con1,Con2 : C1;
begin
null;
end Main;
Ada 保护对象,例如上例中的 Limited_Buffer,允许定义三种方法:过程、条目和函数。过程在受保护对象上提供排他性无条件读写锁。条目为受保护对象提供有条件的读写锁。函数是对受保护对象提供共享读锁的只读方法。 此示例使用名为 Put 和 Get 的两个条目以及名为 Is_New 和 At_Limit 的两个函数。受保护对象声明的私有部分定义了受保护对象使用的私有数据。在此示例中,受保护对象中有三个数据项。 Count 是 Natural 的一个实例,它是 Integer 的预定义子类型,最小值为 0。本例中 Count 初始化为 0。值是 Integer 的一个实例。值是存储由 Put 和 Get 条目写入和读取受保护对象的值的数据元素。 New_Value 是初始化为 False 的 Boolean 实例。 New_Value 用于确保写入受保护对象的每个值都只消耗一次。
受保护体 Limited_Buffer 实现了两个条目和两个函数的逻辑。 Ada 总是将受保护对象和任务的规范和实现分开。 每个条目的第一行模仿受保护对象规范,但还添加了边界条件。仅当边界条件评估为 TRUE 时才评估该条目。任务对条目的每次调用都是队列。默认排队策略是 FIFO,确保按调用顺序处理调用。当调用任务在入口队列中时,调用任务被挂起。每个条目都有自己的队列。
该示例继续创建两个名为 P1 和 P2 的生产者任务。这两个任务之间的唯一区别是它们写入队列的数字范围。 我们不希望生产者在 Queue 处理完值限制后永远挂在 Put 入口队列中。 Ada 提供了有条件地调用入口队列的语法。此语法从“select”保留字开始。入口调用完成后,局部变量 Num 会增加。调用的条件性质从“或”保留字开始。入口调用与定时器相关联。如果在进入调用完成之前时间到期,则取消进入调用并重复循环。当 Limited_Buffer.At_Limit 返回 TRUE 时,循环终止。
两个消费者是相同的。因此,我为消费者创建了一个任务类型。稍后在源代码中创建了任务类型的两个实例。消费者反复调用 Limited_Buffer.Get 并打印从缓冲区中检索到的值。此活动也是有条件地完成,因为我们不希望在从缓冲区读取所有值后暂停消费者。
最后创建了消费者任务类型的两个实例。当主过程的执行到达主过程中的 begin 语句时,所有四个任务都开始执行。主过程什么都不做,这就是为什么它唯一的可执行语句是空保留字。
,这是一个经典的“有界缓冲区”,其中一个固定大小的数组保存由生产者插入并由消费者提取的元素。容量一旦创建,就无法更改。尝试将元素放入已满队列将导致操作阻塞;尝试从空队列中获取元素同样会阻塞。