问题描述
我在使用 cassandra api 和选择查询时遇到问题。在单线程上下文中它工作得很好。但是,当多个线程使用同一个对象并调用该函数时,即使只有两个线程,cassandra 也会返回带有错误数据的期货(有时似乎被来自其他选择查询的数据覆盖,有时它只是垃圾)。我有一个单例对象处理我对 cassandra 的调用,还有一个字节缓冲区结构来保存返回的数据。我已经确定,在单个线程上,一切都按预期工作,但是当我添加更多线程使用相同的对象并调用相同的函数时,错误的数据会被引入从 cassandra 返回的数据中。
值大小约为 1kb,密钥为 32 字节。
您可以看到被注释掉的互斥锁包围的 2 行。如果取消注释,这可以防止错误数据问题,但也会抵消增加线程数量带来的任何性能提升。
此外,损坏查询的百分比约为 33%。
cassandra api 应该能够处理多个线程和会话连接而不会出现任何问题(根据 http://datastax.github.io/cpp-driver/topics/),那么为什么我会返回错误数据?
我使用 Centos7、c++14 和 cassandra api 2.15 和 cassandra 3.11.4
#include <cassert>
#include "DataObj.h"//contains the rest of my includes
//byteBuf has two members: a const uint8_t pointer,and a uint32_t variable for declaring the size
DataObj::byteBuf DataObj::threadGet(DataObj::byteBuf key)
{
CassError rc = CASS_OK;
Cassstatement* statement = NULL;
CassFuture* future = NULL;
string temp = "SELECT value FROM "+ keyspace+"."+tablename+" WHERE id = ?";// variables defined elsewhere
const char* query = (const char*)temp.c_str();
statement = cass_statement_new(query,1);
//I am also having issues with prepared statements not working properly,//but I believe it is not directly related to this question. This setup was working fine on 1 thread
cass_statement_bind_bytes(statement,key.data,key.size);
//rw_mut.lock();
future = cass_session_execute(m_session,statement);// m_session is the cassandra session of the object
cass_future_wait(future);
//rw_mut.unlock();
//The two statements above,when gated by the mutex,do not produce errors when multithreading,// but also do not gain any performance.
// When not gated,works fine on a single thread,but corrupts the return data on 2 or more threads
const uint8_t* st=nullptr;
size_t len=0;
rc = cass_future_error_code(future);
if (rc != CASS_OK)
{
//error handling...
}
else
{
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
if (cass_iterator_next(iterator))
{
const CassRow* row = cass_iterator_get_row(iterator);
cass_value_get_bytes(cass_row_get_column_by_name(row,"value"),&st,&len);
}
cass_result_free(result);
cass_iterator_free(iterator);
}
DataObj::byteBuf res((uint8_t *) st,len);
//was able to use gdb here and confirm that the data is corrupt
cass_future_free(future);
return res;
}
解决方法
看起来您是在将值指针复制到 byteBuf
之前释放结果。在单线程版本中,您可能很幸运,解引用的内存仍然完好无损。多线程,你很可能会覆盖。