消费者生产者多线程冻结

问题描述

我正在尝试创建一个简单的消费者/生产者代码以供学习,其中生产者将数字压入堆栈,消费者线程打印数字,这是我得到的:

const int N_THREADS = 10;
const int N_TESTS = 100;
bool finished = false;
queue<int> q;

void produce()
{
    for (int i = 0; i < N_TESTS; i++)
    {
        q.push(i);
        cv.notify_all();        
    }
    finished = true;
}

void consume()
{
    while (!q.empty() || !finished)
    {   
        unique_lock<mutex> lock(m);
        cv.wait(lock,[] {return !q.empty(); });
        int i = q.front();
        cout << i << endl;
        q.pop();
    }   
}

int main()
{
    //thread that will be used for producing
    thread producer(produce);
    
    //vector of consumer threadss
    vector<thread> consumers(N_THREADS);
    for (int i = 0; i < N_THREADS; i++)
    {
        consumers[i] = thread(consume);
    }

    //joining all threads
    producer.join();
    for (int i = 0; i < N_THREADS; i++)
    {
        consumers[i].join();
    }
    
    return 0;   
}

但是,当我运行代码时,它会打印数字但它只是冻结,它永远不会结束:

Console

可以做些什么来结束它?

解决方法

我在您的代码中发现了一些错误。

首先为什么你调用 notify_all() 你只推送了一个元素,我认为调用 notify_one() 更好。

想象一下这个场景: 生产者推送一个元素然后调用 notify_all() 然后其他线程唤醒其中一个将使用互斥锁并执行工作并弹出元素然后其他线程检查条件并查看队列为空然后他们将进入睡眠状态(浪费资源) .

另一个重要的错误是当生产者完成他的工作时它会返回此时可能队列仍然有元素在这种情况下某些线程将等待通知但由于生产者退出因此生产者不会再发送通知,这会导致一些线程在 cv.wai() 处阻塞。即使它是通知条件变量,它也会阻塞,因为 !q.empty() 的条件返回 false(因为生产者完成后 q.empty() 已经为真)。

为了修复此代码,您应该修改代码的某些部分。

首先,我建议您在推入队列时也获取互斥锁,因为您可能会面临竞争条件(在这里您使用的是队列,我认为它不会在这里发生,但是如果您使用向量,您肯定会看到) .

因为代码在 cv.wait() 处阻塞(但要注意某些线程会退出,因为它们会在生产者完成之前通知并且在生产者完成后他们将检查 while 条件并且他们看到完成的变量为真,因此他们有机会退出但其他线程将在 cv.wait() 处阻塞)。为了解决这个问题,你应该在你的条件变量中加入另一个条件:

cv.wait(lock,[] {return !q.empty() || (finished && q.empty()); });
if (finished && q.empty())
        break;

并且您的代码成功退出。

我还建议您阅读以下主题:

C++11 Can I ensure a condition_variable.wait() won't miss a notification?

https://www.modernescpp.com/index.php/c-core-guidelines-be-aware-of-the-traps-of-condition-variables

,

您的代码中的 cvm 是什么?我可能会猜到它们,但 Minimal,Reproducible Example 会更好。

q.empty() 中的

finishedconsume 无需保护即可访问。在该访问之前移动 unique_lock<mutex> lock(m)。 while 循环中的条件看起来不对。不应该是while (!finished)吗?

void consume()
{
    unique_lock<mutex> lock(m);
    while (!finished)
    {   
        cv.wait(lock,[] {return !q.empty(); });
        int i = q.front();
        cout << i << endl;
        q.pop();
    }   
}

produce 可以在没有听众时通知听众。修改前不要锁定队列。

void produce()
{
    unique_lock<mutex> lock(m);
    for (int i = 0; i < N_TESTS; i++)
    {
        q.push(i);
        lock.unlock();
        cv.notify_all();        
        lock.lock();
    }
    finished = true;
}