使用反应式消息传递时如何传播 JTA 状态?

问题描述

我想在向反应式消息传递连接器发送消息的事务性 REST 端点之间传播 JTA 状态(= 事务)。

@Inject
@Channel("test")
Emitter<String> emitter;

@POST
@Transactional
public Response test() {
    emitter.send("test");
}

@ApplicationScoped
@Connector("test")
public class TestConnector implements OutgoingConnectorFactory {

    @Inject
    TransactionManager tm;

    @Override
    public SubscriberBuilder<? extends Message<?>,Void> getSubscriberBuilder(Config config) {
        return ReactiveStreams.<Message<?>>builder()
            .flatMapCompletionStage(message -> {
                tm.getTransaction(); // = null
                return message.ack();
            })
            .ignore();
    }
}

据我所知,上下文传播负责使事务可用(请参阅 io.smallrye.context.jta.context.propagation.JtaContextProvider#currentContext)。问题似乎是,currentContext 是在订阅时创建的,当注入点 (Emitter<String> emitter) 获取其实例时会发生这种情况。正确捕获交易还为时过早。

我错过了什么?

顺便说一下,我在使用 @Incoming / @Outgoing 而不是发射器时遇到了同样的问题。我决定给你这个例子,因为它很容易理解和重现。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)