问题描述
private var db: Option[DatabaseDef] = ....
typedRecords.foreach(publishToDatabase(_))
def publishToDatabase(record: T): Unit = {
Await.result(runDatabaseAction(record),getQueryTimeOut(settings))
}
private def runDatabaseAction(record: T): Future[Int] = {
db.get.run(
executeStreamingAction(record,settings)
.transactionally
.withStatementParameters(statementinit = _.setQueryTimeout(getQueryTimeOut(settings).toSeconds.toInt)))
}
override def executeStreamingAction(record: TFFactDataIngestionMessage,settings: GSConnectorSettings):
DBIOAction[Int,NoStream,Effect with Effect.Transactional] = {
val sqlUAction =
sqlu"""
INSERT INTO ...
"""
sqlUAction.transactionally
}
db.get.run 方法不接受操作数组。
正在寻找一种一次性发布 typedRecords 的方法,不知道如何转换。
解决方法
您必须使用一元运算符链接 DBIO 序列:
db.run(
operation1
.flatMap(_ => operation2)
.flatMap(_ => operation3)
...
.transactionally
)
如果你更喜欢理解:
// you don't have to ditch the results with _,you can do whatever you want
db.run(
(
for {
_ <- operation1
_ <- operation2
...
} yield ()
).transactionally
)
或者如果您想将 List[DBIO[A]]
变成 DBIO[List[A]]
db.run(
DBIO.sequence(operations).transactionally
)