要创建打包任务的优先级队列吗?

问题描述

我正在尝试创建打包任务队列,并从期货中获取结果。我正在使用Stroustrup C ++第四版第42章中介绍的条件变量队列,以及他在第1241页上使用此队列进行打包任务的想法。不幸的是,我遇到了一些编译错误。感谢是否有人对使此代码有效地演示这一想法有任何指导。

代码:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <future>
#include <utility>
using namespace std;

template<typename T>
class SQ {
public:
    void put(const T& val);
    void put(T&& val);
    void get(T& val);  // T * const v;
private:
    mutex mtx;
    condition_variable cond;
    priority_queue<T> q;
};

template<typename T>
void SQ<T>::put(const T& val)
{
    {
        unique_lock<mutex> lck(mtx);
        while (q.size() > 3) // no q bigger than
            cond.wait(lck);
        q.push(val);
    }
    cond.notify_one();
}

template<typename T>
void SQ<T>::put(T&& val)
{
    {
        unique_lock<mutex> lck(mtx);
        while (q.size() > 3) // no q bigger than
            cond.wait(lck);
        q.push(forward<T&&>(val));
    }
    cond.notify_one();
}

template<typename T>
void SQ<T>::get(T& val)
{
    unique_lock<mutex> lck(mtx);
    while (q.empty())
        cond.wait(lck);
    val = move(q.top());
    q.pop();
    lck.unlock();
    cond.notify_all();
}

struct QP {
    packaged_task<int(int)> pt;
    int pri;
};
bool operator<(const QP& a,const QP& b)
{
    return a.pri < b.pri;
}

SQ<QP> mq;

#include <random>
#include <chrono>
using namespace std::chrono;
default_random_engine gen(random_device{}());
uniform_int_distribution<int> p{0,50};

int f(int i) { return i; }

void producer()
{
    vector<future<int>> fv;
    int i = 5;
    while (i--) {
        QP m{packaged_task<int(int)>{f}};
        fv.push_back(m.pt.get_future());
        mq.put(move(m));
        this_thread::sleep_for(milliseconds{p(gen)});
    }

    // get the results
    for (auto& p : fv)
        cout << p.get() << '\n';
}

mutex cout_lock;
void consumer()
{
    int i = 0;
    while (true) {
        QP m;
        mq.get(m);
#if 0
        {
            lock_guard<mutex> l{cout_lock};
            cout << this_thread::get_id() << ": " << '\n';
        }
#endif
        this_thread::sleep_for(milliseconds{p(gen)});
        m.pt(i++);
    }
}

int main()
{
    thread t0{consumer};
    thread t1{producer};

    t0.join(); t1.join();
    
    return 0;
}

编译:

clang++ -Wall -pedantic -std=c++11 -lpthread test264.cc && ./a.out
test264.cc:52:9: error: object of type 'QP' cannot be assigned because its copy
      assignment operator is implicitly deleted
    val = move(q.top());
        ^
test264.cc:99:12: note: in instantiation of member function 'SQ<QP>::get' requested
      here
        mq.get(m);
           ^
test264.cc:59:29: note: copy assignment operator of 'QP' is implicitly deleted
      because field 'pt' has a deleted copy assignment operator
    packaged_task<int(int)> pt;
                            ^
/usr/bin/../lib/gcc/x86_64-linux-gnu/8/../../../../include/c++/8/future:1517:22: note: 
      'operator=' has been explicitly marked deleted here
      packaged_task& operator=(const packaged_task&) = delete;
                     ^
1 error generated.

解决方法

priority_queue不能与packaged_task一起使用,因为packaged_task只能移动,不能复制。因此,get无法将元素移出priority_queue。作为解决方案,提供了基于标准queue的代码,并且可以正常工作。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <future>
#include <utility>
using namespace std;

template<typename T>
class SQ {
public:
    void put(const T& val);
    void put(T&& val);
    void get(T& val);  // T * const v;
private:
    mutex mtx;
    condition_variable cond;
    queue<T> q;
};

template<typename T>
void SQ<T>::put(const T& val)
{
    {
        unique_lock<mutex> lck(mtx);
        while (q.size() > 3) // no q bigger than
            cond.wait(lck);
        q.push(val);
    }
    cond.notify_one();
}

template<typename T>
void SQ<T>::put(T&& val)
{
    {
        unique_lock<mutex> lck(mtx);
        while (q.size() > 3) // no q bigger than
            cond.wait(lck);
        q.push(forward<T&&>(val));
    }
    cond.notify_one();
}

template<typename T>
void SQ<T>::get(T& val)
{
    unique_lock<mutex> lck(mtx);
    while (q.empty())
        cond.wait(lck);
    val = move(q.front());
    q.pop();
    lck.unlock();
    cond.notify_all();
}

struct QP {
    QP() {}
    QP(packaged_task<int(int)> &&pt) : pt{move(pt)} {}
    packaged_task<int(int)> pt;
};

SQ<QP> mq;

#include <random>
#include <chrono>
using namespace std::chrono;
default_random_engine gen(random_device{}());
uniform_int_distribution<int> p{0,50};

int f(int i) { return i; }

void producer()
{
    vector<future<int>> fv;
    int i = 5;
    while (i--) {
        QP m{packaged_task<int(int)>{f}};
        fv.push_back(m.pt.get_future());
        mq.put(move(m));
        this_thread::sleep_for(milliseconds{p(gen)});
    }

    // get the results
    for (auto& p : fv)
        cout << p.get() << '\n';
}

mutex cout_lock;
void consumer()
{
    while (true) {
        QP m;
        mq.get(m);

        this_thread::sleep_for(milliseconds{p(gen)});
        m.pt(0);
    }
}

int main()
{
    thread t0{consumer};
    thread t1{producer};

    t0.join(); t1.join();
    
    return 0;
}

编译和结果:

clang++ -Wall -pedantic -std=c++11 -lpthread test262.cc && ./a.out
0
0
0
0
0

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...