多线程读取导致 Cassandra 会话数据损坏

问题描述

我在使用 cassandra api 和选择查询时遇到问题。在单线程上下文中它工作得很好。但是,当多个线程使用同一个对象并调用函数时,即使只有两个线程,cassandra 也会返回带有错误数据的期货(有时似乎被来自其他选择查询的数据覆盖,有时它只是垃圾)。我有一个单例对象处理我对 cassandra 的调用,还有一个字节缓冲区结构来保存返回的数据。我已经确定,在单个线程上,一切都按预期工作,但是当我添加更多线程使用相同的对象并调用相同的函数时,错误的数据会被引入从 cassandra 返回的数据中。

值大小约为 1kb,密钥为 32 字节。

您可以看到被注释掉的互斥锁包围的 2 行。如果取消注释,这可以防止错误数据问题,但也会抵消增加线程数量带来的任何性能提升。

此外,损坏查询的百分比约为 33%。

cassandra api 应该能够处理多个线程和会话连接而不会出现任何问题(根据 http://datastax.github.io/cpp-driver/topics/),那么为什么我会返回错误数据?

我使用 Centos7、c++14 和 cassandra api 2.15 和 cassandra 3.11.4

#include <cassert> 
#include "DataObj.h"//contains the rest of my includes

//byteBuf has two members: a const uint8_t pointer,and a uint32_t variable for declaring the size
DataObj::byteBuf  DataObj::threadGet(DataObj::byteBuf key)
{
  CassError rc = CASS_OK;
  Cassstatement* statement = NULL;
  CassFuture* future = NULL;                        
  string temp = "SELECT value FROM "+ keyspace+"."+tablename+" WHERE id = ?";// variables defined elsewhere
  const char* query = (const char*)temp.c_str();
  statement = cass_statement_new(query,1);
 //I am also having issues with prepared statements not working properly,//but I believe it is not directly related to this question. This setup was working fine on 1 thread    
  cass_statement_bind_bytes(statement,key.data,key.size);
  //rw_mut.lock();   
  future = cass_session_execute(m_session,statement);// m_session is the cassandra session of the object
  cass_future_wait(future);
  //rw_mut.unlock();
//The two statements above,when gated by the mutex,do not produce errors when multithreading,// but also do not gain any performance.
// When not gated,works fine on a single thread,but corrupts the return data on 2 or more threads

  const  uint8_t* st=nullptr;
  size_t len=0;                 
  rc = cass_future_error_code(future);
  if (rc != CASS_OK)
  {
    //error handling...
  }
  else
  {
    const CassResult* result = cass_future_get_result(future);
    CassIterator* iterator = cass_iterator_from_result(result);                     
         if (cass_iterator_next(iterator))
          {                       
             const CassRow* row = cass_iterator_get_row(iterator);
             cass_value_get_bytes(cass_row_get_column_by_name(row,"value"),&st,&len);
          }                         
    cass_result_free(result);
    cass_iterator_free(iterator);
  }               
  DataObj::byteBuf  res((uint8_t *) st,len);
  //was able to use gdb here and confirm that the data is corrupt

  cass_future_free(future);      

  return res;
}  

解决方法

看起来您是在将值指针复制到 byteBuf 之前释放结果。在单线程版本中,您可能很幸运,解引用的内存仍然完好无损。多线程,你很可能会覆盖。