问题描述
我正在尝试创建打包任务队列,并从期货中获取结果。我正在使用Stroustrup C ++第四版第42章中介绍的条件变量队列,以及他在第1241页上使用此队列进行打包任务的想法。不幸的是,我遇到了一些编译错误。感谢是否有人对使此代码有效地演示这一想法有任何指导。
代码:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <future>
#include <utility>
using namespace std;
template<typename T>
class SQ {
public:
void put(const T& val);
void put(T&& val);
void get(T& val); // T * const v;
private:
mutex mtx;
condition_variable cond;
priority_queue<T> q;
};
template<typename T>
void SQ<T>::put(const T& val)
{
{
unique_lock<mutex> lck(mtx);
while (q.size() > 3) // no q bigger than
cond.wait(lck);
q.push(val);
}
cond.notify_one();
}
template<typename T>
void SQ<T>::put(T&& val)
{
{
unique_lock<mutex> lck(mtx);
while (q.size() > 3) // no q bigger than
cond.wait(lck);
q.push(forward<T&&>(val));
}
cond.notify_one();
}
template<typename T>
void SQ<T>::get(T& val)
{
unique_lock<mutex> lck(mtx);
while (q.empty())
cond.wait(lck);
val = move(q.top());
q.pop();
lck.unlock();
cond.notify_all();
}
struct QP {
packaged_task<int(int)> pt;
int pri;
};
bool operator<(const QP& a,const QP& b)
{
return a.pri < b.pri;
}
SQ<QP> mq;
#include <random>
#include <chrono>
using namespace std::chrono;
default_random_engine gen(random_device{}());
uniform_int_distribution<int> p{0,50};
int f(int i) { return i; }
void producer()
{
vector<future<int>> fv;
int i = 5;
while (i--) {
QP m{packaged_task<int(int)>{f}};
fv.push_back(m.pt.get_future());
mq.put(move(m));
this_thread::sleep_for(milliseconds{p(gen)});
}
// get the results
for (auto& p : fv)
cout << p.get() << '\n';
}
mutex cout_lock;
void consumer()
{
int i = 0;
while (true) {
QP m;
mq.get(m);
#if 0
{
lock_guard<mutex> l{cout_lock};
cout << this_thread::get_id() << ": " << '\n';
}
#endif
this_thread::sleep_for(milliseconds{p(gen)});
m.pt(i++);
}
}
int main()
{
thread t0{consumer};
thread t1{producer};
t0.join(); t1.join();
return 0;
}
编译:
clang++ -Wall -pedantic -std=c++11 -lpthread test264.cc && ./a.out
test264.cc:52:9: error: object of type 'QP' cannot be assigned because its copy
assignment operator is implicitly deleted
val = move(q.top());
^
test264.cc:99:12: note: in instantiation of member function 'SQ<QP>::get' requested
here
mq.get(m);
^
test264.cc:59:29: note: copy assignment operator of 'QP' is implicitly deleted
because field 'pt' has a deleted copy assignment operator
packaged_task<int(int)> pt;
^
/usr/bin/../lib/gcc/x86_64-linux-gnu/8/../../../../include/c++/8/future:1517:22: note:
'operator=' has been explicitly marked deleted here
packaged_task& operator=(const packaged_task&) = delete;
^
1 error generated.
解决方法
priority_queue
不能与packaged_task
一起使用,因为packaged_task
只能移动,不能复制。因此,get
无法将元素移出priority_queue。作为解决方案,提供了基于标准queue
的代码,并且可以正常工作。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <future>
#include <utility>
using namespace std;
template<typename T>
class SQ {
public:
void put(const T& val);
void put(T&& val);
void get(T& val); // T * const v;
private:
mutex mtx;
condition_variable cond;
queue<T> q;
};
template<typename T>
void SQ<T>::put(const T& val)
{
{
unique_lock<mutex> lck(mtx);
while (q.size() > 3) // no q bigger than
cond.wait(lck);
q.push(val);
}
cond.notify_one();
}
template<typename T>
void SQ<T>::put(T&& val)
{
{
unique_lock<mutex> lck(mtx);
while (q.size() > 3) // no q bigger than
cond.wait(lck);
q.push(forward<T&&>(val));
}
cond.notify_one();
}
template<typename T>
void SQ<T>::get(T& val)
{
unique_lock<mutex> lck(mtx);
while (q.empty())
cond.wait(lck);
val = move(q.front());
q.pop();
lck.unlock();
cond.notify_all();
}
struct QP {
QP() {}
QP(packaged_task<int(int)> &&pt) : pt{move(pt)} {}
packaged_task<int(int)> pt;
};
SQ<QP> mq;
#include <random>
#include <chrono>
using namespace std::chrono;
default_random_engine gen(random_device{}());
uniform_int_distribution<int> p{0,50};
int f(int i) { return i; }
void producer()
{
vector<future<int>> fv;
int i = 5;
while (i--) {
QP m{packaged_task<int(int)>{f}};
fv.push_back(m.pt.get_future());
mq.put(move(m));
this_thread::sleep_for(milliseconds{p(gen)});
}
// get the results
for (auto& p : fv)
cout << p.get() << '\n';
}
mutex cout_lock;
void consumer()
{
while (true) {
QP m;
mq.get(m);
this_thread::sleep_for(milliseconds{p(gen)});
m.pt(0);
}
}
int main()
{
thread t0{consumer};
thread t1{producer};
t0.join(); t1.join();
return 0;
}
编译和结果:
clang++ -Wall -pedantic -std=c++11 -lpthread test262.cc && ./a.out
0
0
0
0
0