问题描述
让我们考虑一下这个功能
@Transactional
fun conditionalInsertEntity(dbEntity: DBEntity): Mono<DBEntity> {
return fetchObjectByPublicId(dbEntity.publicId)
.switchIfEmpty {
r2DatabaseClient.insert()
.into(DBEntity::class.java)
.using(Flux.just(dbEntity))
.fetch()
.one()
.map { it["entity_id"] as Long }
.flatMap { fetchObjectById(it) }
}
}
使用以下驱动程序代码运行上述功能时,如果列表包含重复项,则会出现重复输入错误。理想情况下,不应给出该错误,因为上述功能已经在处理重复插入的情况!
val result = Flux.fromIterable(listof(dbEntity1,dbEntity1,dbEntity2))
.flatMap { conditionalInsertEntity(it) }
.collectList()
.block()
解决方法
意识到这是使用flatMap而不是concatMap的问题。 与flatMap不同,ConcatMap顺序地从各个发布者收集结果。 (更多here)
由于我使用flatMap,因此多个发布者认为该实体在数据库中尚不可用