使用 Spring WebFlux、Reactor、R2DBC 在专用事务中处理每个通量项

问题描述

我正在使用 WebFlux、Spring 数据 R2DBC 和 Postgresql 开发 Spring Boot 2.4 网络应用程序。实际上,我无法在专用事务中成功处理 Flux 中的每个项目。这个用例在我看来很平常:

  • 从远程 API 检索项目,
  • 配置返回的 Flux 实例,以便在专用事务中处理每个项目。
  • 如果一个项目的事务失败,它不能回滚其他项目的事务。提交也是如此。

实际上,我使用了下面的代码,但事情并没有像我预期的那样工作,可能是因为我误解了响应式上下文的使用方式:

public ParallelFlux<Item> processAllItems() {
    LOGGER.debug("Starting item processing");
    return client.findItems()
        .parallel()
        .runOn(Schedulers.boundedElastic())
        .doOnNext(item -> LOGGER.debug("Item: {}",item.getId()))
        .flatMap(this::processItem);
}

@Transactional
public Mono<Item> processItem(Item item) {
    // Process the item asynchronously,potentially updating a Postgresql
}

实际上,当通量发出 2 个项目时,我可以在日志中看到 R2dbcTransactionManager 在不同的线程中为每个项目创建一个事务。但在那之后,对一个事务(提交/回滚)的任何操作也会在另一个事务上执行,两者都在同一线程中。例如,在下面的日志中,一项处理失败并回滚了两个事务:

DEBUG [        scheduling-1] b.w.d.u.c.m.itemprocessor       : Starting item processing
DEBUG [    boundedElastic-3] b.w.d.u.c.m.itemprocessor       : Item: 35ZRNT9RVOQK
DEBUG [    boundedElastic-2] b.w.d.u.c.m.itemprocessor       : Item: 3XDSWAMB38KB
DEBUG [    boundedElastic-3] o.s.r.c.R2dbcTransactionManager : Creating new transaction with name [b.w.d.u.c.m.itemprocessor.processItem]: PROPAGATION_required,ISOLATION_DEFAULT
DEBUG [    boundedElastic-2] o.s.r.c.R2dbcTransactionManager : Creating new transaction with name [b.w.d.u.c.m.itemprocessor.processItem]: PROPAGATION_required,ISOLATION_DEFAULT
DEBUG [    boundedElastic-3] o.s.r.c.R2dbcTransactionManager : Acquired Connection [Monoretry] for R2DBC transaction
DEBUG [    boundedElastic-3] o.s.r.c.R2dbcTransactionManager : Switching R2DBC Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@70a7492a,codecs=io.r2dbc.postgresql.codec.DefaultCodecs@7d2961f9}]] to manual commit
DEBUG [    boundedElastic-3] o.s.r.c.R2dbcTransactionManager : Acquired Connection [Monoretry] for R2DBC transaction
DEBUG [    boundedElastic-3] o.s.r.c.R2dbcTransactionManager : Switching R2DBC Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@264df4f5,codecs=io.r2dbc.postgresql.codec.DefaultCodecs@5074cdea}]] to manual commit
ERROR [   reactor-tcp-nio-1] b.w.d.u.c.m.itemprocessor       : UnkNown item: 3XDSWAMB38KB
DEBUG [   reactor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager : Initiating transaction rollback
DEBUG [   reactor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager : Rolling back R2DBC transaction on Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@264df4f5,codecs=io.r2dbc.postgresql.codec.DefaultCodecs@5074cdea}]]
DEBUG [   reactor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager : Releasing R2DBC Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@264df4f5,codecs=io.r2dbc.postgresql.codec.DefaultCodecs@5074cdea}]] after transaction
DEBUG [   reactor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager : Initiating transaction rollback
DEBUG [   reactor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager : Rolling back R2DBC transaction on Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@70a7492a,codecs=io.r2dbc.postgresql.codec.DefaultCodecs@7d2961f9}]]

我非常怀疑在单独的线程上处理项目不足以为每个项目启动一个事务上下文。

在自己的事务中处理通量的每个项目,并让一些项目事务提交而其他项目事务回滚的正确方法是什么?

在此先感谢您的帮助! 问候

编辑

我想我找到了为什么这个实现不起作用。

实际上当一个item处理失败时,事务按预期回滚,错误信号传播到源流,源流依次失败。这当然不是预期的行为,必须捕获错误信号以确保通量中的其他项目独立处理。无论每个项目发生什么,通量都必须成功完成。

下面的代码允许这样做:

public ParallelFlux<Item> processAllItems() {
    LOGGER.debug("Starting item processing");
    return client.findItems()
        .parallel()
        .runOn(Schedulers.boundedElastic())
        .doOnNext(item -> LOGGER.debug("Item: {}",item.getId()))
        .flatMap(item -> processItem(item)
            .onErrorResume(t -> Mono.<Item>empty().doOnSuccess(data -> LOGGER.error("Item skipped after error",t)
        );
}

唯一我仍然不明白的是每个项目的事务上下文是如何在每个发布者及其反应上下文中处理的:当 onErrorResume 没有捕获到错误时,错误信号会回滚其他事务.在 Spring Data R2DBC 中,我没有找到实现说明,尤其是关于 ReactiveTransactionManagerR2dbcTransactionManager 实现的说明。

请随时提供有关这一点的详细信息,特别是! 提前致谢! 问候

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)