如果最近添加了记录,则PutIfExists将失败

问题描述

ScalarDB中,向Cassandra添加了ACID功能的库中,出现以下错误

2020-09-24 18:51:33,607 [WARN] from com.scalar.db.transaction.consensuscommit.CommitHandler in ScalaTest-run-running-AllRepositorySpecs - preparing records Failed
com.scalar.db.exception.storage.NoMutationException: no mutation was applied.

我正在运行一个测试案例,在该案例中,我得到一条记录以检查它是否不存在,然后添加该记录,然后获取它以查看是否成功添加了它,然后更新了它,然后得到了它再次查看该值是否已更新。

"update an answer if the answer exists" in {
      beforeEach()
      embeddedCassandraManager.executeStatements(cqlStartupStatements)

      val cassandraConnectionService = CassandraConnectionManagementService()
      val (cassandraSession,cluster) = cassandraConnectionService.connectWithCassandra("cassandra://localhost:9042/codingjedi","codingJediCluster")
      //TodoM - pick the database and keyspace names from config file.
      cassandraConnectionService.initKeySpace(cassandraSession.get,"codingjedi")
      val transactionService = cassandraConnectionService.connectWithCassandraWithTransactionSupport("localhost","9042","codingJediCluster" /*,dbUsername,dbPassword*/)

      val repository = new AnswersTransactionRepository("codingjedi","answer_by_user_id_and_question_id")

      val answerKey = AnswerKeys(repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.answer_id.get,repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.question_id,Some(repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.answer_id.get))

      logger.trace(s"checking if answer already exists")
      val distributedTransactionBefore = transactionService.get.start()

      val resultBefore = repository.get(distributedTransactionBefore,answerKey) //answer should not exist
      distributedTransactionBefore.commit()

      resultBefore.isLeft mustBe true
      resultBefore.left.get.isinstanceOf[AnswerNotFoundException] mustBe true

      logger.trace(s"no answer found. adding answer")
      val distributedTransactionDuring = transactionService.get.start()

      repository.add(distributedTransactionDuring,repoTestEnv.answerTestEnv.answerOfAPracticeQuestion)//add answer
      distributedTransactionDuring.commit()
      logger.trace(s"answer added")

      val distributedTransactionAfter = transactionService.get.start()

      val result = repository.get(distributedTransactionAfter,answerKey) //Now answer should exist
      distributedTransactionAfter.commit()

      result mustBe (Right(repoTestEnv.answerTestEnv.answerOfAPracticeQuestion))
      logger.trace(s"got answer from repo ${result}")

      val updatednotes = if(repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.notes.isDefined)
        Some(repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.notes.get+"updated") else Some("updated notes")
      val updatedAnswer = repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.copy(notes=updatednotes) //updated answer
      logger.trace(s"old notes ${repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.notes} vs new notes ${updatednotes}")
      logger.trace(s"updated answer ${updatedAnswer}")

      val distributedTransactionForUpdate = transactionService.get.start()
      val resultOfupdate = repository.update(distributedTransactionForUpdate,updatedAnswer) //update answer
      distributedTransactionForUpdate.commit() //fails here

      logger.trace(s"update done. getting answer again")

      val distributedTransactionAfterUpdate = transactionService.get.start()

      val resultAfterUpdate = repository.get(distributedTransactionAfterUpdate,answerKey)
      distributedTransactionForUpdate.commit()

      resultAfterUpdate mustBe (Right(updatedAnswer))
      logger.trace(s"got result after update ${resultAfterUpdate}")

      afterEach()
    }

update方法add条件调用putIfExists

 def update(transaction:distributedTransaction,answer:AnswerOfAPracticeQuestion) = {
    logger.trace(s"updating answer value ${answer}")
    //checktest-update an answer if the answer exists
    add(transaction,answer,new PutIfExists)
  }


def add(transaction:distributedTransaction,answer:AnswerOfAPracticeQuestion,mutationCondition:MutationCondition = new PutIfNotExists()) = {
    logger.trace(s"adding answer ${answer} with mutation state ${mutationCondition}")
    val pAnswerKey = new Key(new TextValue("answered_by_user",answer.answeredBy.get.answerer_id.toString),new TextValue("question_id",answer.question_id.toString))

    //to check duplication,both partition and clustering keys need to be present
    //val cAnswerKey = new Key(new TextValue("answer_id",answer.answer_id.toString))

    //logger.trace(s"created keys. ${pAnswerKey},${cAnswerKey}")
    val imageData = answer.image.map(imageList=>imageList).getorElse(List())
    logger.trace(s"will check in ${keyspaceName},${tablename}")
    val putAnswer: Put = new Put(pAnswerKey/*,cAnswerKey*/)
      .forNamespace(keyspaceName)
      .forTable(tablename)
      .withCondition(mutationCondition)
      .withValue(new TextValue("answer_id",answer.answer_id.get.toString))
      .withValue(new TextValue("image",convertimageToString(imageData)))
      .withValue(new TextValue("answer",convertAnswersFromModelToString(answer.answer)))
      .withValue(new BigIntValue("creation_year",answer.creationYear.getorElse(0)))
      .withValue(new BigIntValue("creation_month",answer.creationMonth.getorElse(0)))
      .withValue(new TextValue("notes",answer.notes.getorElse("")))

    logger.trace(s"putting answer ${putAnswer}")
    //checktest-add answer to respository
    //checktest-not add answer to respository if duplicate
    transaction.put(putAnswer)
  }

即使在现有notes和更新后的answer之间更改了answer字段,我为什么也会收到错误消息

错误跟踪为(请注意,它显示IF NOT EXISTS!)。不应该是IF EXISTS吗?还存在一条跟踪 there was a hit in the statement cache for [INSERT INTO codingjedi.answer_by_user_id_and_question_id (answered_by_user,question_id,tx_id,tx_state,tx_prepared_at,answer_id,image,creation_year,creation_month,notes,tx_version) VALUES (?,?,?) IF NOT EXISTS;].,这是否意味着先前的put仍在缓存中,并且会导致冲突?

2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.StatementHandler in ScalaTest-run-running-AllRepositorySpecs - query to prepare : [INSERT INTO codingjedi.answer_by_user_id_and_question_id (answered_by_user,?) IF NOT EXISTS;].
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.StatementHandler in ScalaTest-run-running-AllRepositorySpecs - there was a hit in the statement cache for [INSERT INTO codingjedi.answer_by_user_id_and_question_id (answered_by_user,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[11111111-1111-1111-1111-111111111111] is bound to 0
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[11111111-1111-1111-1111-111111111111] is bound to 1
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[468492df-0960-4160-8391-27fe7fa626c5] is bound to 2
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - 1 is bound to 3
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - 1600969893592 is bound to 4
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[11111111-1111-1111-1111-111111111111] is bound to 5
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[{"image":["image1binarydata","image2binarydata"]}] is bound to 6
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[{"answer":[{"filename":"c.js","answer":"some answer"}]}] is bound to 7
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - 2019 is bound to 8
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - 12 is bound to 9
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[some notesupdated] is bound to 10
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - 1 is bound to 11
2020-09-24 18:51:33,607 [WARN] from com.scalar.db.transaction.consensuscommit.CommitHandler in ScalaTest-run-running-AllRepositorySpecs - preparing records Failed
com.scalar.db.exception.storage.NoMutationException: no mutation was applied.

更新 痕迹

对于第一个putputAnswer

putting answer Put{namespace=Optional[codingjedi],table=Optional[answer_by_user_id_and_question_id],partitionKey=Key{TextValue{name=answered_by_user,value=Optional[11111111-1111-1111-1111-111111111111]},TextValue{name=question_id,value=Optional[11111111-1111-1111-1111-111111111111]}},clusteringKey=Optional.empty,values={answer_id=TextValue{na
me=answer_id,image=TextValue{name=image,value=Optional[{"image":["image1binarydata","image2binarydata"]}]},answer=TextValue{name=answer,value=Optional[{"answer":[{"filename":"c.j
s","answer":"some answer"}]}]},creation_year=BigIntValue{name=creation_year,value=2019},creation_month=BigIntValue{name=creation_month,value=12},notes=TextValue{name=notes,value=Optional[some notes]}},consistency=SEQUENTIAL,condi
tion=Optional[com.scalar.db.api.PutIfNotExists@21bf308]}

对于第二个putputAnswer

putting answer Put{namespace=Optional[codingjedi],values={answer_id=TextValue{name=answer_id,value=Optional[{"answer":[{"filename":"c.js",value=Optional[some notesupdated]}},condition=Optional[com.scalar.db.api.PutIfExists@2e057637]}

notes字段已从notes=TextValue{name=notes,更改为notes=TextValue{name=notes,value=Optional[some notesupdated]}}

执行第二个put时,我看到使用的mutation条件为IfNotExists

2020-09-25 12:35:34,188 [DEBUG] from com.scalar.db.storage.cassandra.Cassandra in ScalaTest-run-running-AllRepositorySpecs - executing put operation with Put{namespace=Optional[codingjedi],values={tx_id=TextValue{name=tx_id,value=Optional[c6bc39e9-656a-440c-8f68-af6005f37f7c]},tx_state=IntValue{name=tx_state,value=1},tx_prepared_at=BigIntValue{name=tx_prepared_at,value=1601033734188},answer_id=TextValue{name=answer_id,value=Optional[**some notesupdated**]},tx_version=IntValue{name=tx_version,value=1}},consistency=LINEARIZABLE,condition=Optional[com.scalar.db.api.**PutIfNotExists**@21bf308]} 

解决方法

标量数据库不允许对现有记录进行盲写。看来更新之前没有进展。

我认为此过程应检查当前值并更新事务中的值。在这段代码中,不能保证get和update之间的原子性。

,

在Yuji在下面发表评论之后,我更改了update方法,在get之前添加了put,现在操作成功了。

def update(transaction:DistributedTransaction,answer:AnswerOfAPracticeQuestion) = {
    logger.trace(s"updating answer value ${answer}")
    val key = AnswerKeys(answer.answeredBy.get.answerer_id,answer.question_id,answer.answer_id)
    val result = get(transaction,key)
    if(result.isLeft) throw result.left.get else add(transaction,answer,new PutIfExists())
  }

但是我不明白为什么在get中的updatedelete之前需要Scalardb

,

由于该协议基于快照隔离,因此需要先在快照中获取一条记录以更新记录。