无法在生产者-消费者问题中获得第一批的完美同步

问题描述

std::vector<infer_request_t*> infer_req_vec;
std::vector<infer_request_t*> infer_req_queue;

std::mutex produce_mutex;
std::condition_variable pcv;
std::condition_variable icv;

void produce_blobs(){

    bool enable_inf_stats = true;
    while(!stop_test && (batchCount < appargs.queryCount || appargs.isInfinite))
    {   
        std::unique_lock<std::mutex> produce_lock(produce_mutex);
        pcv.wait_for(produce_lock,std::chrono::seconds(2),[]{return (((appargs.samplesPerQuery - infer_req_vec.size())>0));});
        if(stop_test || (batchCount >= appargs.queryCount && appargs.queryCount != -1)) break;
        // CODE FOR GENErating IPBLOB AND OPBLOB FOR QUERY
        infer_request_t *infReq = NULL;
        infer_req_vec.push_back(infReq);
        int infer_req_vec_size = infer_req_vec.size();
        status = API(session,hEndPt,model,ipblob,opblob,enable_inf_stats,&infer_req_vec[infer_req_vec_size-1]);
        icv.notify_one();
        produce_lock.unlock();
        
    }
}

void infer_wait(){

    while(!stop_test && (batchCount < appargs.queryCount || appargs.isInfinite ))
    {
        infer_request_t* current_infer_req_wait;
        {
            std::unique_lock<std::mutex> infer_lock(produce_mutex);
            icv.wait(infer_lock,[]{return (((infer_req_vec.size()>0) && (infer_req_vec[0] != NULL)));});
            if(stop_test || (batchCount >= appargs.queryCount && appargs.queryCount != -1)) break;
            infer_req_queue.push_back(infer_req_vec[0]);
            infer_req_vec.erase(infer_req_vec.begin()+0);
            pcv.notify_one();
            infer_lock.unlock();
        }
        int queue_size = infer_req_queue.size();
        for(int i=0; i<queue_size; i++)
        {
            current_infer_req_wait = infer_req_queue[0];
            infer_req_queue.erase(infer_req_queue.begin()+0);
            //FURTHER PROCESSING ON `current_infer_req_wait`
        }
    }
}

int main(int argc,char* argv[]){

    std::thread t1(produce_blobs);
    std::thread t2(infer_wait);

    t1.join();
    t2.join();
}

以上是我写的生产者-消费者问题。

API - 接受一批输入,批号取决于 appargs.samplesPerQuery

我面临的问题是,对于第一批,除非它推送所有 (appargs.samplesPerQuery) 输入和输出 blob,否则 infer_wait 线程不会被执行,即使我正在调用 { {1}} 立即。

这只发生在第一批,从下一次开始,它以完美的同步发生,即 icv.notify_one() 消耗来自 infer_wait() 向量的一个请求,{{ 1}} 再推一个,一直持续到执行结束。

我也想为第一批获得这种完美的同步。

如果我遗漏了什么或做错了什么,请指出。

谢谢

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)