在 webflux 中使用协程的 spring @Transactional 抛出错误

问题描述

向处理程序调用的服务挂起函数添加 @Transactional 注释时,出现以下错误。如果我没有注释,那么代码会按预期工作,但如果出现错误,则无法回滚。

either 来自 arrow-kt-core

asHandlerFunction 用作记录 API 的桥梁。

知道会发生什么吗?

实体和存储库位于 io.x.a 下。该服务位于 io.x 内。存储库仅扫描 io.x.a

错误

java.lang.IllegalArgumentException: object is not an instance of declaring class
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
    |_ checkpoint ⇢ HTTP POST "/api/assets/130473/one-second-resolution" [ExceptionHandlingWebHandler]
Stack trace:
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at kotlin.reflect.jvm.internal.calls.InlineClassAwareCaller.call(InlineClassAwareCaller.kt:134)
        at kotlin.reflect.jvm.internal.KCallableImpl.call(KCallableImpl.kt:108)
        at kotlin.reflect.full.KCallables.callSuspend(KCallables.kt:55)
        at org.springframework.core.CoroutinesUtils$invokeSuspendingFunction$mono$1.invokeSuspend(CoroutinesUtils.kt:64)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.internal.dispatchedContinuationKt.resumeCancellableWith(dispatchedContinuation.kt:377)
        at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:30)
        at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable$default(Cancellable.kt:25)
        at kotlinx.coroutines.Coroutinestart.invoke(Coroutinestart.kt:110)
        at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:126)
        at kotlinx.coroutines.reactor.MonoKt.monoInternal$lambda-2(Mono.kt:90)
        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        at reactor.core.publisher.MonoUsingWhen.subscribe(MonoUsingWhen.java:87)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
        at reactor.core.publisher.MonopeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonopeekTerminal.java:299)
        at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:88)
        at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:88)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2057)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
        at reactor.core.publisher.MonopeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonopeekTerminal.java:299)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:259)
        at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:88)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
        at reactor.core.publisher.MonoCompletionStage.lambda$subscribe$0(MonoCompletionStage.java:82)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
        at com.github.jasync.sql.db.util.FutureUtilsKt.success(FutureUtils.kt:16)
        at com.github.jasync.sql.db.MysqL.MysqLConnection$succeedQueryPromise$1.accept(MysqLConnection.kt:344)
        at com.github.jasync.sql.db.MysqL.MysqLConnection$succeedQueryPromise$1.accept(MysqLConnection.kt:54)
        at java.base/java.util.Optional.ifPresent(Optional.java:183)
        at com.github.jasync.sql.db.MysqL.MysqLConnection.succeedQueryPromise(MysqLConnection.kt:343)
        at com.github.jasync.sql.db.MysqL.MysqLConnection.onOk(MysqLConnection.kt:218)
        at com.github.jasync.sql.db.MysqL.codec.MysqLConnectionHandler.channelRead0(MysqLConnectionHandler.kt:119)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.codec.BytetoMessageDecoder.fireChannelRead(BytetoMessageDecoder.java:324)
        at io.netty.handler.codec.BytetoMessageDecoder.channelRead(BytetoMessageDecoder.java:296)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.base/java.lang.Thread.run(Thread.java:829)


路由器:

    @Bean
    fun router(...): RouterFunction<ServerResponse> {
        return route()
            .POST(
                "...",asHandlerFunction { createNewOnesecondResolutionSession(it) })
            .build()
    }

    private fun asHandlerFunction(init: suspend (ServerRequest) -> ServerResponse) = HandlerFunction {
        mono(dispatchers.Unconfined) {
            init(it)
        }
    }

处理程序:

    private suspend fun theFun(req: ServerRequest): ServerResponse {
        val a = ...

        val b = ...

        return service.theFun(a,b).fold(
            { error ->
                internalServerErrorResponse("Client user already exists.")
            },{ ServerResponse.ok().bodyValueAndAwait(it) }
        )
    }

服务:

    @Transactional("tm1")
    suspend fun theFun(
        a: A,b: B
    ): Either<Error,Result> = either {
        val user= userService.createNewUser(username = "test",password = "pw")
            .mapLeft {
                log
                Error
            }
            .bind()
            throw RuntimeException("xx")
    }

持久化配置:

@Configuration
@EnableR2dbcRepositories(
    basePackages = ["io.x.a"],entityOperationsRef = "operations1"
)
class PersistenceConfig1(
    @Value("\${spring.datasource.d1.r2dbcUrl}") private val r2dbcUrl: String
) {

    @Bean
    @Qualifier("d1")
    fun connectionFactory(): ConnectionFactory {
        return ConnectionFactories.get(ConnectionFactoryOptions.parse(r2dbcUrl))
    }

    @Bean
    fun r2dbcEntityOperations(@Qualifier("d1") connectionFactory: ConnectionFactory): R2dbcEntityOperations {
        val databaseClient = DatabaseClient.create(connectionFactory)

        return R2dbcEntityTemplate(databaseClient,DefaultReactiveDataAccessstrategy(MysqLDialect.INSTANCE));
    }

    @Bean("tm1")
    fun transactionManager(@Qualifier("d1") connectionFactory: ConnectionFactory): ReactiveTransactionManager {
        return R2dbcTransactionManager(connectionFactory)
    }
}

解决方法

我使用 TransactionalOperator 找到了解决方案。

    @Bean("transactionalOperator1")
    fun transactionalOperator1(@Qualifier("tm1") transactionManager: ReactiveTransactionManager): TransactionalOperator {
        return TransactionalOperator.create(transactionManager)
    }

    @Bean("transactionalOperator2")
    fun transactionalOperator2(@Qualifier("tm2") transactionManager: ReactiveTransactionManager): TransactionalOperator {
        return TransactionalOperator.create(transactionManager)
    }
    @Qualifier("transactionalOperator1") private val transactionalOperator1: TransactionalOperator,@Qualifier("transactionalOperator2") private val transactionalOperator2: TransactionalOperator,suspend fun theFun(...): Either<Error,Result> = 
        transactionalOperator1.executeAndAwait { t1->
            transactionalOperator2.executeAndAwait { t2->
                ...
                    .handleErrorWith {
                        t1.setRollbackOnly()
                        t2.setRollbackOnly()
                    }
            }
        }!!

executeAndAwait 的末尾提交代码。如果设置了 rollbackOnly 标志,它将指示 ReactiveTransactionManager 回滚更改。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...