新实体的多个条件插入会在R2DBC中产生重复的输入错误

问题描述

让我们考虑一下这个功能

@Transactional
fun conditionalInsertEntity(dbEntity: DBEntity): Mono<DBEntity> {
    return fetchObjectByPublicId(dbEntity.publicId)
        .switchIfEmpty {
            r2DatabaseClient.insert()
                .into(DBEntity::class.java)
                .using(Flux.just(dbEntity))
                .fetch()
                .one()
                .map { it["entity_id"] as Long }
                .flatMap { fetchObjectById(it) }
        }
}

使用以下驱动程序代码运行上述功能时,如果列表包含重复项,则会出现重复输入错误。理想情况下,不应给出该错误,因为上述功能已经在处理重复插入的情况!

val result = Flux.fromIterable(listof(dbEntity1,dbEntity1,dbEntity2))
    .flatMap { conditionalInsertEntity(it) }
    .collectList()
    .block()

解决方法

意识到这是使用flatMap而不是concatMap的问题。 与flatMap不同,ConcatMap顺序地从各个发布者收集结果。 (更多here

由于我使用flatMap,因此多个发布者认为该实体在数据库中尚不可用