问题描述
一段时间后,我一直在寻找解决方案。我一直在一个最小的例子中工作,我有一个生产者和多个消费者,它似乎在大多数情况下都能正常工作(生产者和消费者没有死锁,所有的值都被消费一次,等等。 .)
但是,我不确定如何在生产者结束时向消费者发出信号以完成消耗所有剩余的生产数据,然后优雅地结束。这是我准备的最小工作示例。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <semaphore.h>
#include <pthread.h>
#include <stdbool.h>
#define bufferSize 6
#define consumerAmount 3
int buffer[bufferSize];
int producerCell = 0,consumerCell = 0;
sem_t mutexFill,mutexEmpty;
sem_t bufferHasspace,bufferHasData;
bool producerAlive = true;
void *producer_routine()
{
int producedCell;
for (int i = 0; i < 20; i++)
{
sem_wait(&bufferHasspace);
sem_wait(&mutexFill);
producedCell = producerCell;
producerCell = (producerCell + 1) % bufferSize;
sem_post(&mutexFill);
memcpy(&buffer[producedCell],&i,sizeof(i));
printf("Producer generated %i\n",i);
sem_post(&bufferHasData);
}
printf("Producer ended\n");
producerAlive = false;
}
void *consumer_routine(void *id)
{
int consumedCell,semaphoreValue;
do
{
sem_wait(&bufferHasData);
sem_wait(&mutexEmpty);
consumedCell = consumerCell;
consumerCell = (consumerCell + 1) % bufferSize;
sem_getvalue(&bufferHasData,&semaphoreValue);
sem_post(&mutexEmpty);
printf("Consumer %i processed %i\n",*(int *)id,buffer[consumedCell]);
sem_post(&bufferHasspace);
}while (producerAlive || semaphoreValue > 0);
printf("Consumer %i ended\n",*(int *)id);
}
int main()
{
sem_init(&mutexFill,1,1);
sem_init(&mutexEmpty,1);
sem_init(&bufferHasspace,bufferSize);
sem_init(&bufferHasData,0);
pthread_t consumers[consumerAmount];
int consumerIDs[consumerAmount];
for (int i = 0; i < consumerAmount; i++)
{
consumerIDs[i] = i;
pthread_create(&consumers[i],NULL,&consumer_routine,&consumerIDs[i]);
}
pthread_t producer;
pthread_create(&producer,&producer_routine,NULL);
for (int i = 0; i < consumerAmount; i++)
{
pthread_join(consumers[i],NULL);
}
return 0;
}
Producer generated 0
Producer generated 1
Consumer 0 processed 0
Consumer 1 processed 1
Producer generated 2
Producer generated 3
Consumer 2 processed 2
Consumer 2 processed 3
Producer generated 4
Producer generated 5
Producer generated 6
Consumer 1 processed 4
Consumer 1 processed 6
Producer generated 7
Consumer 0 processed 5
Producer generated 8
Consumer 1 processed 7
Producer generated 9
Producer generated 10
Consumer 0 processed 9
Consumer 0 processed 10
Consumer 2 processed 8
Producer generated 11
Producer generated 12
Producer generated 13
Producer generated 14
Producer generated 15
Producer generated 16
Consumer 2 processed 11
Consumer 2 processed 12
Consumer 2 processed 13
Consumer 2 processed 14
Consumer 2 processed 16
Consumer 0 processed 15
Producer generated 17
Producer generated 18
Producer generated 19
Consumer 1 processed 17
Consumer 2 processed 18
Producer ended
Consumer 0 processed 19
Consumer 0 ended
(It gets stuck here indefinitely until the process is killed)
我尝试了以下尝试让消费者在生产者结束后从缓冲区读取剩余数据然后退出,但似乎没有一个给我预期的结果。
- 不使用布尔标志并依赖于 bufferHasData 信号量:每个消费者消耗一次。之后,由于缓冲区为空,它退出。
- 将布尔标志与 bufferHasData 信号量结合使用(上面的示例实现了这一点):现在它可以正常工作,但只有第一个使用者正常退出。其余的消费者陷入等待更多数据的信号量中,这不会发生,因为生产者已终止。
- 仅使用布尔标志:一旦生产者结束,消费者将终止而不处理缓冲区的其余部分。
- 不使用任何东西,让消费者无限运行:它消耗了所有数据,但现在我无法告诉消费者停止消费并优雅退出。
那么,告诉消费者生产者已经完成的正确方法是什么,以及如何在不让多个消费者被信号量锁定的情况下消耗缓冲区的剩余部分?
解决方法
我使用 Ada 编写生产者-消费者程序,这大大简化了逻辑。也许我使用的逻辑可能适用于您的 C 实现。
Ada 有一个实体称为任务,类似于线程。 Ada 也有一个实体称为受保护对象,用于创建任务共享的缓冲区。受保护对象实现了一个高级监视器结构,具有自动处理共享数据的锁定和解锁功能。
我已经使用 Ada 包实现了这个示例来封装生产者-消费者代码。 Ada 包有两部分。包规范定义了一种类似于 C 头文件的 API。包体定义了规范中公开的功能、过程、任务或受保护对象的实现。包体还可以包含类型定义、过程、函数、任务和程序使用但不暴露给 API 的受保护对象。
我的包装规格是
package Stopping_Consumers is
task type Consumer;
task Producer;
end Stopping_Consumers;
如您所见,规范中只公开了两件事。称为 Consumer 的任务类型允许程序员根据需要创建尽可能多的 Consumer 实例。任务生产者不是任务类型。此 API 中只有一个可能的 Producer 实例。
包主体包含此答案的所有有趣内容。
with Ada.Text_IO; use Ada.Text_IO;
package body Stopping_Consumers is
-------------------
-- Shared_Buffer --
-------------------
type Buf_Index is mod 10;
type Buf_Array is array (Buf_Index) of Integer;
protected Shared_Buffer is
entry Write (Value : Integer);
function Producer_Is_Done return Boolean;
procedure Signal_Done;
entry Read (Value : out Integer);
function Is_Empty return Boolean;
procedure Get_Id (Id : out Natural);
private
Buf : Buf_Array;
Write_Index : Buf_Index := 0;
Read_Index : Buf_Index := 0;
Count : Natural := 0;
Is_Done : Boolean := False;
Consumer_Num : Natural := 0;
end Shared_Buffer;
protected body Shared_Buffer is
entry Write (Value : Integer) when Count < Buf_Index'Modulus is
begin
Buf (Write_Index) := Value;
Write_Index := Write_Index + 1;
Count := Count + 1;
end Write;
function Producer_Is_Done return Boolean is
begin
return Is_Done;
end Producer_Is_Done;
procedure Signal_Done is
begin
Is_Done := True;
end Signal_Done;
entry Read (Value : out Integer) when Count > 0 is
begin
Value := Buf (Read_Index);
Read_Index := Read_Index + 1;
Count := Count - 1;
end Read;
function Is_Empty return Boolean is
begin
return Count = 0;
end Is_Empty;
procedure Get_Id (Id : out Natural) is
begin
Id := Consumer_Num;
Consumer_Num := Consumer_Num + 1;
end Get_Id;
end Shared_Buffer;
--------------
-- Consumer --
--------------
task body Consumer is
Id : Natural;
Value : Integer;
begin
Shared_Buffer.Get_Id (Id);
loop
exit when Shared_Buffer.Producer_Is_Done
and then Shared_Buffer.Is_Empty;
select
Shared_Buffer.Read (Value);
Put_Line ("Consumer" & Id'Image & " read" & Value'Image);
or
delay 0.001;
end select;
end loop;
end Consumer;
--------------
-- Producer --
--------------
task body Producer is
begin
for I in 1 .. 60 loop
Shared_Buffer.Write (I);
Put_Line ("Producer wrote" & I'Image);
delay 0.01;
end loop;
Shared_Buffer.Signal_Done;
end Producer;
end Stopping_Consumers;
包体包含消费者任务类型和生产者任务的实现。它还包含名为 Shared_Buffer 的受保护对象的规范和实现。这个例子真正有趣的部分在于 Shared_Buffer 的定义。
Shared_Buffer 包含几个列为过程、条目和函数的操作。它还包含几个数据项。 Shared_Buffer 首先由其 API 定义,然后定义实现。 Shared_Buffer API 是
protected Shared_Buffer is
entry Write (Value : Integer);
function Producer_Is_Done return Boolean;
procedure Signal_Done;
entry Read (Value : out Integer);
function Is_Empty return Boolean;
procedure Get_Id (Id : out Natural);
private
Buf : Buf_Array;
Write_Index : Buf_Index := 0;
Read_Index : Buf_Index := 0;
Count : Natural := 0;
Is_Done : Boolean := False;
Consumer_Num : Natural := 0;
end Shared_Buffer;
名为 Buf 的数据元素被定义为 Buf_Array 类型的实例。 Buf_Array 是由名为 Buf_Index 的模块化类型索引的整数值数组。 Ada 模数类型是表现模数运算的无符号整数类型。在这个例子中,Buf_Index 被定义为
type Buf_Index is mod 10;
此类型的值范围是 0 到 9。所有算术都不是模类型本身就是模的。例如,将 Buf_Index 的值递增,初始值为 9,结果为 0。这种相加的所有值都等同于 C 代码
num = (num + 1) % 10;
使用模块化类型作为数组的索引类型会创建一个循环数组,这正是生产者-消费者缓冲区中所需要的。
生产者必须先写入数据才能读取。因此,生产者使用 Write_Index 数据成员进行写入,而消费者使用 Read_Index 进行读取。 Count 数据成员用于确定缓冲区何时已满和何时为空。
Is_Done 成员是由生产者设置的标志,指示生产者何时完成对缓冲区的写入。
Consumer_Num 在注册从缓冲区读取数据时为每个消费者分配一个 ID 号。
Ada 保护对象可以有三种操作;过程、条目和函数。过程无条件地修改受保护对象中的数据,自动操纵受保护对象上的独占读/写锁。条目有条件地修改受保护对象中的数据,自动操纵受保护对象上的独占读/写锁。函数是只读的并且操作受保护对象上的共享读锁。编译器为程序员编写锁操作代码。
写入条目的实现表达了与条目执行相关的边界条件。
entry Write (Value : Integer) when Count < Buf_Index'Modulus is
begin
Buf (Write_Index) := Value;
Write_Index := Write_Index + 1;
Count := Count + 1;
end Write;
控制条件表示为“当Count
Read 条目只允许消费者在缓冲区不为空时从缓冲区读取数据。
entry Read (Value : out Integer) when Count > 0 is
begin
Value := Buf (Read_Index);
Read_Index := Read_Index + 1;
Count := Count - 1;
end Read;
生产者只需将一系列数字写入受保护的对象,然后调用 Signal_Done 过程来指示生产者已完成。
task body Producer is
begin
for I in 1 .. 60 loop
Shared_Buffer.Write (I);
Put_Line ("Producer wrote" & I'Image);
end loop;
Shared_Buffer.Signal_Done;
end Producer;
生产者在 Shared_Buffer 满时自动挂起,当 Shared_Buffer 不再满时自动取消挂起。
消费者读取 Shared_Buffer 直到 Producer 完成并且 Shared_Buffer 为空。消费者将继续从共享缓冲区读取,直到两个条件都为真。
task body Consumer is
Id : Natural;
Value : Integer;
begin
Shared_Buffer.Get_Id (Id);
loop
exit when Shared_Buffer.Producer_Is_Done
and then Shared_Buffer.Is_Empty;
select
Shared_Buffer.Read (Value);
Put_Line ("Consumer" & Id'Image & " read" & Value'Image);
or
delay 0.001;
end select;
end loop;
end Consumer;
Consumer 循环中的“select”子句允许 Consumer 轮询缓冲区,以便在 Producer 终止时,Consumer 不会永久挂起在 Read entry 队列中。语句“延迟 0.001;”导致消费者在退出读取条目队列并重试之前等待 1 毫秒。
这种方法允许任意数量的消费者任务读取数据,避免重复读取并消耗所有产生的数据,而无需生产者和消费者之间的直接通信。
编辑: 我的经验告诉我,优雅且协调的线程终止是一个持久且困难的问题。
我修改了上面的例子,以更贴近作者的例子。
包装规格:
package Stopping_Consumers is
task type Consumer;
task Producer;
end Stopping_Consumers;
包正文:
with Ada.Text_IO; use Ada.Text_IO;
package body Stopping_Consumers is
-------------------
-- Shared_Buffer --
-------------------
type Buf_Index is mod 6;
type Buf_Array is array (Buf_Index) of Integer;
protected Shared_Buffer is
entry Write (Value : Integer);
function Producer_Is_Done return Boolean;
procedure Signal_Done;
entry Read (Value : out Integer);
function Is_Empty return Boolean;
procedure Get_Id (Id : out Natural);
private
Buf : Buf_Array;
Write_Index : Buf_Index := 0;
Read_Index : Buf_Index := 0;
Count : Natural := 0;
Is_Done : Boolean := False;
Consumer_Num : Natural := 0;
end Shared_Buffer;
protected body Shared_Buffer is
entry Write (Value : Integer) when Count < Buf_Index'Modulus is
begin
Buf (Write_Index) := Value;
Write_Index := Write_Index + 1;
Count := Count + 1;
end Write;
function Producer_Is_Done return Boolean is
begin
return Is_Done;
end Producer_Is_Done;
procedure Signal_Done is
begin
Is_Done := True;
end Signal_Done;
entry Read (Value : out Integer) when Count > 0 is
begin
Value := Buf (Read_Index);
Read_Index := Read_Index + 1;
Count := Count - 1;
end Read;
function Is_Empty return Boolean is
begin
return Count = 0;
end Is_Empty;
procedure Get_Id (Id : out Natural) is
begin
Id := Consumer_Num;
Consumer_Num := Consumer_Num + 1;
end Get_Id;
end Shared_Buffer;
--------------
-- Consumer --
--------------
task body Consumer is
Id : Natural;
Value : Integer;
begin
Shared_Buffer.Get_Id (Id);
loop
exit when Shared_Buffer.Producer_Is_Done
and then Shared_Buffer.Is_Empty;
select
Shared_Buffer.Read (Value);
Put_Line ("Consumer" & Id'Image & " read" & Value'Image);
or
delay 0.009;
end select;
end loop;
Put_line("Consumer" & Id'Image & " ended.");
end Consumer;
--------------
-- Producer --
--------------
task body Producer is
begin
delay 0.001;
for I in 1 .. 20 loop
Shared_Buffer.Write (I);
Put_Line ("Producer wrote" & I'Image);
end loop;
Shared_Buffer.Signal_Done;
Put_Line("Producer ended.");
end Producer;
end Stopping_Consumers;
程序主要程序:
with Stopping_Consumers; use Stopping_Consumers;
procedure Main is
Consumers : array(0..2) of Consumer;
begin
null;
end Main;
样本输出:
Producer wrote 1
Consumer 0 read 1
Consumer 1 read 2
Producer wrote 2
Producer wrote 3
Consumer 2 read 3
Producer wrote 4
Consumer 0 read 4
Consumer 1 read 5
Producer wrote 5
Producer wrote 6
Consumer 2 read 6
Consumer 0 read 7
Producer wrote 7
Consumer 1 read 8
Producer wrote 8
Producer wrote 9
Consumer 1 read 9
Producer wrote 10
Consumer 0 read 11
Consumer 2 read 10
Producer wrote 11
Producer wrote 12
Consumer 1 read 12
Consumer 2 read 13
Producer wrote 13
Producer wrote 14
Consumer 0 read 14
Consumer 1 read 15
Producer wrote 15
Producer wrote 16
Consumer 2 read 16
Consumer 0 read 17
Producer wrote 17
Producer wrote 18
Consumer 1 read 18
Consumer 0 read 19
Producer wrote 19
Producer wrote 20
Producer ended.
Consumer 2 read 20
Consumer 2 ended.
Consumer 0 ended.
Consumer 1 ended.
,
如何在所有生产者结束后正确终止多个消费者?
简单的解决方案是让生产者产生一个特殊的“终止自己”值(每个消费者一个);消费者识别此值并自行终止,而不是像往常一样处理它。
例如(使用 INT_MAX
是“终止自己”值):
void *producer_routine()
{
int producedCell;
int terminator = INT_MAX;
for (int i = 0; i < 20; i++)
{
sem_wait(&bufferHasSpace);
sem_wait(&mutexFill);
producedCell = producerCell;
producerCell = (producerCell + 1) % bufferSize;
sem_post(&mutexFill);
memcpy(&buffer[producedCell],&i,sizeof(i));
printf("Producer generated %i\n",i);
sem_post(&bufferHasData);
}
printf("Producer ending\n");
for (int i = 0; i < consumerAmount; i++)
{
sem_wait(&bufferHasSpace);
sem_wait(&mutexFill);
producedCell = producerCell;
producerCell = (producerCell + 1) % bufferSize;
sem_post(&mutexFill);
memcpy(&buffer[producedCell],&terminator,sizeof(terminator));
sem_post(&bufferHasData);
}
printf("Producer ended\n");
}
void *consumer_routine(void *id)
{
int consumedCell,semaphoreValue,value;
do
{
sem_wait(&bufferHasData);
sem_wait(&mutexEmpty);
consumedCell = consumerCell;
consumerCell = (consumerCell + 1) % bufferSize;
sem_getvalue(&bufferHasData,&semaphoreValue);
sem_post(&mutexEmpty);
value = buffer[consumedCell];
sem_post(&bufferHasSpace);
if(value != INT_MAX) {
printf("Consumer %i processed %i\n",*(int *)id,buffer[consumedCell]);
} else {
printf("Consumer %i told to terminate\n",*(int *)id);
break;
}
}while (semaphoreValue > 0);
printf("Consumer %i ended\n",*(int *)id);
}