问题描述
在ScalarDB
中,向Cassandra
添加了ACID功能的库中,出现以下错误
2020-09-24 18:51:33,607 [WARN] from com.scalar.db.transaction.consensuscommit.CommitHandler in ScalaTest-run-running-AllRepositorySpecs - preparing records Failed
com.scalar.db.exception.storage.NoMutationException: no mutation was applied.
我正在运行一个测试案例,在该案例中,我得到一条记录以检查它是否不存在,然后添加该记录,然后获取它以查看是否成功添加了它,然后更新了它,然后得到了它再次查看该值是否已更新。
"update an answer if the answer exists" in {
beforeEach()
embeddedCassandraManager.executeStatements(cqlStartupStatements)
val cassandraConnectionService = CassandraConnectionManagementService()
val (cassandraSession,cluster) = cassandraConnectionService.connectWithCassandra("cassandra://localhost:9042/codingjedi","codingJediCluster")
//TodoM - pick the database and keyspace names from config file.
cassandraConnectionService.initKeySpace(cassandraSession.get,"codingjedi")
val transactionService = cassandraConnectionService.connectWithCassandraWithTransactionSupport("localhost","9042","codingJediCluster" /*,dbUsername,dbPassword*/)
val repository = new AnswersTransactionRepository("codingjedi","answer_by_user_id_and_question_id")
val answerKey = AnswerKeys(repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.answer_id.get,repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.question_id,Some(repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.answer_id.get))
logger.trace(s"checking if answer already exists")
val distributedTransactionBefore = transactionService.get.start()
val resultBefore = repository.get(distributedTransactionBefore,answerKey) //answer should not exist
distributedTransactionBefore.commit()
resultBefore.isLeft mustBe true
resultBefore.left.get.isinstanceOf[AnswerNotFoundException] mustBe true
logger.trace(s"no answer found. adding answer")
val distributedTransactionDuring = transactionService.get.start()
repository.add(distributedTransactionDuring,repoTestEnv.answerTestEnv.answerOfAPracticeQuestion)//add answer
distributedTransactionDuring.commit()
logger.trace(s"answer added")
val distributedTransactionAfter = transactionService.get.start()
val result = repository.get(distributedTransactionAfter,answerKey) //Now answer should exist
distributedTransactionAfter.commit()
result mustBe (Right(repoTestEnv.answerTestEnv.answerOfAPracticeQuestion))
logger.trace(s"got answer from repo ${result}")
val updatednotes = if(repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.notes.isDefined)
Some(repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.notes.get+"updated") else Some("updated notes")
val updatedAnswer = repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.copy(notes=updatednotes) //updated answer
logger.trace(s"old notes ${repoTestEnv.answerTestEnv.answerOfAPracticeQuestion.notes} vs new notes ${updatednotes}")
logger.trace(s"updated answer ${updatedAnswer}")
val distributedTransactionForUpdate = transactionService.get.start()
val resultOfupdate = repository.update(distributedTransactionForUpdate,updatedAnswer) //update answer
distributedTransactionForUpdate.commit() //fails here
logger.trace(s"update done. getting answer again")
val distributedTransactionAfterUpdate = transactionService.get.start()
val resultAfterUpdate = repository.get(distributedTransactionAfterUpdate,answerKey)
distributedTransactionForUpdate.commit()
resultAfterUpdate mustBe (Right(updatedAnswer))
logger.trace(s"got result after update ${resultAfterUpdate}")
afterEach()
}
def update(transaction:distributedTransaction,answer:AnswerOfAPracticeQuestion) = {
logger.trace(s"updating answer value ${answer}")
//checktest-update an answer if the answer exists
add(transaction,answer,new PutIfExists)
}
def add(transaction:distributedTransaction,answer:AnswerOfAPracticeQuestion,mutationCondition:MutationCondition = new PutIfNotExists()) = {
logger.trace(s"adding answer ${answer} with mutation state ${mutationCondition}")
val pAnswerKey = new Key(new TextValue("answered_by_user",answer.answeredBy.get.answerer_id.toString),new TextValue("question_id",answer.question_id.toString))
//to check duplication,both partition and clustering keys need to be present
//val cAnswerKey = new Key(new TextValue("answer_id",answer.answer_id.toString))
//logger.trace(s"created keys. ${pAnswerKey},${cAnswerKey}")
val imageData = answer.image.map(imageList=>imageList).getorElse(List())
logger.trace(s"will check in ${keyspaceName},${tablename}")
val putAnswer: Put = new Put(pAnswerKey/*,cAnswerKey*/)
.forNamespace(keyspaceName)
.forTable(tablename)
.withCondition(mutationCondition)
.withValue(new TextValue("answer_id",answer.answer_id.get.toString))
.withValue(new TextValue("image",convertimageToString(imageData)))
.withValue(new TextValue("answer",convertAnswersFromModelToString(answer.answer)))
.withValue(new BigIntValue("creation_year",answer.creationYear.getorElse(0)))
.withValue(new BigIntValue("creation_month",answer.creationMonth.getorElse(0)))
.withValue(new TextValue("notes",answer.notes.getorElse("")))
logger.trace(s"putting answer ${putAnswer}")
//checktest-add answer to respository
//checktest-not add answer to respository if duplicate
transaction.put(putAnswer)
}
即使在现有notes
和更新后的answer
之间更改了answer
字段,我为什么也会收到错误消息
错误跟踪为(请注意,它显示为IF NOT EXISTS
!)。不应该是IF EXISTS
吗?还存在一条跟踪 there was a hit in the statement cache for [INSERT INTO codingjedi.answer_by_user_id_and_question_id (answered_by_user,question_id,tx_id,tx_state,tx_prepared_at,answer_id,image,creation_year,creation_month,notes,tx_version) VALUES (?,?,?) IF NOT EXISTS;].
,这是否意味着先前的put
仍在缓存中,并且会导致冲突?
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.StatementHandler in ScalaTest-run-running-AllRepositorySpecs - query to prepare : [INSERT INTO codingjedi.answer_by_user_id_and_question_id (answered_by_user,?) IF NOT EXISTS;].
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.StatementHandler in ScalaTest-run-running-AllRepositorySpecs - there was a hit in the statement cache for [INSERT INTO codingjedi.answer_by_user_id_and_question_id (answered_by_user,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[11111111-1111-1111-1111-111111111111] is bound to 0
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[11111111-1111-1111-1111-111111111111] is bound to 1
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[468492df-0960-4160-8391-27fe7fa626c5] is bound to 2
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - 1 is bound to 3
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - 1600969893592 is bound to 4
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[11111111-1111-1111-1111-111111111111] is bound to 5
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[{"image":["image1binarydata","image2binarydata"]}] is bound to 6
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[{"answer":[{"filename":"c.js","answer":"some answer"}]}] is bound to 7
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - 2019 is bound to 8
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - 12 is bound to 9
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - Optional[some notesupdated] is bound to 10
2020-09-24 18:51:33,593 [DEBUG] from com.scalar.db.storage.cassandra.ValueBinder in ScalaTest-run-running-AllRepositorySpecs - 1 is bound to 11
2020-09-24 18:51:33,607 [WARN] from com.scalar.db.transaction.consensuscommit.CommitHandler in ScalaTest-run-running-AllRepositorySpecs - preparing records Failed
com.scalar.db.exception.storage.NoMutationException: no mutation was applied.
更新 痕迹
对于第一个put
,putAnswer
是
putting answer Put{namespace=Optional[codingjedi],table=Optional[answer_by_user_id_and_question_id],partitionKey=Key{TextValue{name=answered_by_user,value=Optional[11111111-1111-1111-1111-111111111111]},TextValue{name=question_id,value=Optional[11111111-1111-1111-1111-111111111111]}},clusteringKey=Optional.empty,values={answer_id=TextValue{na
me=answer_id,image=TextValue{name=image,value=Optional[{"image":["image1binarydata","image2binarydata"]}]},answer=TextValue{name=answer,value=Optional[{"answer":[{"filename":"c.j
s","answer":"some answer"}]}]},creation_year=BigIntValue{name=creation_year,value=2019},creation_month=BigIntValue{name=creation_month,value=12},notes=TextValue{name=notes,value=Optional[some notes]}},consistency=SEQUENTIAL,condi
tion=Optional[com.scalar.db.api.PutIfNotExists@21bf308]}
对于第二个put
,putAnswer
是
putting answer Put{namespace=Optional[codingjedi],values={answer_id=TextValue{name=answer_id,value=Optional[{"answer":[{"filename":"c.js",value=Optional[some notesupdated]}},condition=Optional[com.scalar.db.api.PutIfExists@2e057637]}
notes
字段已从notes=TextValue{name=notes,
更改为notes=TextValue{name=notes,value=Optional[some notesupdated]}}
执行第二个put
时,我看到使用的mutation
条件为IfNotExists
2020-09-25 12:35:34,188 [DEBUG] from com.scalar.db.storage.cassandra.Cassandra in ScalaTest-run-running-AllRepositorySpecs - executing put operation with Put{namespace=Optional[codingjedi],values={tx_id=TextValue{name=tx_id,value=Optional[c6bc39e9-656a-440c-8f68-af6005f37f7c]},tx_state=IntValue{name=tx_state,value=1},tx_prepared_at=BigIntValue{name=tx_prepared_at,value=1601033734188},answer_id=TextValue{name=answer_id,value=Optional[**some notesupdated**]},tx_version=IntValue{name=tx_version,value=1}},consistency=LINEARIZABLE,condition=Optional[com.scalar.db.api.**PutIfNotExists**@21bf308]}
解决方法
标量数据库不允许对现有记录进行盲写。看来更新之前没有进展。
我认为此过程应检查当前值并更新事务中的值。在这段代码中,不能保证get和update之间的原子性。
,在Yuji在下面发表评论之后,我更改了update
方法,在get
之前添加了put
,现在操作成功了。
def update(transaction:DistributedTransaction,answer:AnswerOfAPracticeQuestion) = {
logger.trace(s"updating answer value ${answer}")
val key = AnswerKeys(answer.answeredBy.get.answerer_id,answer.question_id,answer.answer_id)
val result = get(transaction,key)
if(result.isLeft) throw result.left.get else add(transaction,answer,new PutIfExists())
}
但是我不明白为什么在get
中的update
或delete
之前需要Scalardb
由于该协议基于快照隔离,因此需要先在快照中获取一条记录以更新记录。