问题描述
向处理程序调用的服务挂起函数添加 @Transactional
注释时,出现以下错误。如果我没有注释,那么代码会按预期工作,但如果出现错误,则无法回滚。
either
来自 arrow-kt-core
asHandlerFunction
用作记录 API 的桥梁。
知道会发生什么吗?
实体和存储库位于 io.x.a
下。该服务位于 io.x
内。存储库仅扫描 io.x.a
错误:
java.lang.IllegalArgumentException: object is not an instance of declaring class
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
|_ checkpoint ⇢ HTTP POST "/api/assets/130473/one-second-resolution" [ExceptionHandlingWebHandler]
Stack trace:
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at kotlin.reflect.jvm.internal.calls.InlineClassAwareCaller.call(InlineClassAwareCaller.kt:134)
at kotlin.reflect.jvm.internal.KCallableImpl.call(KCallableImpl.kt:108)
at kotlin.reflect.full.KCallables.callSuspend(KCallables.kt:55)
at org.springframework.core.CoroutinesUtils$invokeSuspendingFunction$mono$1.invokeSuspend(CoroutinesUtils.kt:64)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.internal.dispatchedContinuationKt.resumeCancellableWith(dispatchedContinuation.kt:377)
at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:30)
at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable$default(Cancellable.kt:25)
at kotlinx.coroutines.Coroutinestart.invoke(Coroutinestart.kt:110)
at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:126)
at kotlinx.coroutines.reactor.MonoKt.monoInternal$lambda-2(Mono.kt:90)
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoUsingWhen.subscribe(MonoUsingWhen.java:87)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
at reactor.core.publisher.MonopeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonopeekTerminal.java:299)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:88)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:88)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2057)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
at reactor.core.publisher.MonopeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonopeekTerminal.java:299)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:259)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:88)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoCompletionStage.lambda$subscribe$0(MonoCompletionStage.java:82)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at com.github.jasync.sql.db.util.FutureUtilsKt.success(FutureUtils.kt:16)
at com.github.jasync.sql.db.MysqL.MysqLConnection$succeedQueryPromise$1.accept(MysqLConnection.kt:344)
at com.github.jasync.sql.db.MysqL.MysqLConnection$succeedQueryPromise$1.accept(MysqLConnection.kt:54)
at java.base/java.util.Optional.ifPresent(Optional.java:183)
at com.github.jasync.sql.db.MysqL.MysqLConnection.succeedQueryPromise(MysqLConnection.kt:343)
at com.github.jasync.sql.db.MysqL.MysqLConnection.onOk(MysqLConnection.kt:218)
at com.github.jasync.sql.db.MysqL.codec.MysqLConnectionHandler.channelRead0(MysqLConnectionHandler.kt:119)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.BytetoMessageDecoder.fireChannelRead(BytetoMessageDecoder.java:324)
at io.netty.handler.codec.BytetoMessageDecoder.channelRead(BytetoMessageDecoder.java:296)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Thread.java:829)
路由器:
@Bean
fun router(...): RouterFunction<ServerResponse> {
return route()
.POST(
"...",asHandlerFunction { createNewOnesecondResolutionSession(it) })
.build()
}
private fun asHandlerFunction(init: suspend (ServerRequest) -> ServerResponse) = HandlerFunction {
mono(dispatchers.Unconfined) {
init(it)
}
}
处理程序:
private suspend fun theFun(req: ServerRequest): ServerResponse {
val a = ...
val b = ...
return service.theFun(a,b).fold(
{ error ->
internalServerErrorResponse("Client user already exists.")
},{ ServerResponse.ok().bodyValueAndAwait(it) }
)
}
服务:
@Transactional("tm1")
suspend fun theFun(
a: A,b: B
): Either<Error,Result> = either {
val user= userService.createNewUser(username = "test",password = "pw")
.mapLeft {
log
Error
}
.bind()
throw RuntimeException("xx")
}
持久化配置:
@Configuration
@EnableR2dbcRepositories(
basePackages = ["io.x.a"],entityOperationsRef = "operations1"
)
class PersistenceConfig1(
@Value("\${spring.datasource.d1.r2dbcUrl}") private val r2dbcUrl: String
) {
@Bean
@Qualifier("d1")
fun connectionFactory(): ConnectionFactory {
return ConnectionFactories.get(ConnectionFactoryOptions.parse(r2dbcUrl))
}
@Bean
fun r2dbcEntityOperations(@Qualifier("d1") connectionFactory: ConnectionFactory): R2dbcEntityOperations {
val databaseClient = DatabaseClient.create(connectionFactory)
return R2dbcEntityTemplate(databaseClient,DefaultReactiveDataAccessstrategy(MysqLDialect.INSTANCE));
}
@Bean("tm1")
fun transactionManager(@Qualifier("d1") connectionFactory: ConnectionFactory): ReactiveTransactionManager {
return R2dbcTransactionManager(connectionFactory)
}
}
解决方法
我使用 TransactionalOperator
找到了解决方案。
@Bean("transactionalOperator1")
fun transactionalOperator1(@Qualifier("tm1") transactionManager: ReactiveTransactionManager): TransactionalOperator {
return TransactionalOperator.create(transactionManager)
}
@Bean("transactionalOperator2")
fun transactionalOperator2(@Qualifier("tm2") transactionManager: ReactiveTransactionManager): TransactionalOperator {
return TransactionalOperator.create(transactionManager)
}
@Qualifier("transactionalOperator1") private val transactionalOperator1: TransactionalOperator,@Qualifier("transactionalOperator2") private val transactionalOperator2: TransactionalOperator,suspend fun theFun(...): Either<Error,Result> =
transactionalOperator1.executeAndAwait { t1->
transactionalOperator2.executeAndAwait { t2->
...
.handleErrorWith {
t1.setRollbackOnly()
t2.setRollbackOnly()
}
}
}!!
在 executeAndAwait
的末尾提交代码。如果设置了 rollbackOnly
标志,它将指示 ReactiveTransactionManager
回滚更改。