Axon Saga订阅模式并发问题

问题描述

使用Axon 4.3.5在我的Saga中从跟踪模式切换到订阅模式时,我看到了意外的行为

订阅模式下,当两个线程同时到达两个@StarSaga方法时,似乎为相同的关联键值创建了两个sagas。 我想念什么吗?

我有这个可以复制它:

@Saga
@ProcessingGroup("Saga")
public class RaceSaga {

    @Inject
    protected transient CommandGateway commandGateway;

    @StartSaga
    @SagaEventHandler(associationProperty = "executionId")
    public void on(Exec exec) {
        commandGateway.sendAndWait(new CreateExecCommand(exec.getExecutionId(),exec.getDescription()));
    }

    @StartSaga
    @SagaEventHandler(associationProperty = "executionId")
    public void on(Risk risk) {
        commandGateway.sendAndWait(new CreateRiskCommand(risk.getExecutionId(),risk.getResult()));
    }
}

@IntegrationTest
class RaceConditionTest extends BaseIntegrationTest {

    @Autowired
    private EventGateway eventGateway;
    @Autowired
    private SagaStore sagaStore;

    @Test
    void sagarace() {
        var execId = UUID.randomUUID();

        CompletableFuture.runAsync(() -> eventGateway.publish(new Exec(execId.toString(),"desc")));
        CompletableFuture.runAsync(() -> eventGateway.publish(new Risk(execId.toString(),"OK")));

        var association = new AssociationValue("executionId",execId.toString());
        await().during(5,SECONDS)
                .untilAsserted(() -> assertthat(sagaStore.findSagas(RaceSaga.class,association))
                        .hasSize(1));
    }
}

使用跟踪模式时,测试通过,但订阅失败。 (yml配置

解决方法

说实话,这是给定测试设置的预期行为,但需要一些解释。

请注意,以下是订阅(SEP)和跟踪事件处理器(TEP)之间的主要区别:

  • SubscribingEventProcessor-在EventBus上发布事件的线程中调用,类似于推送机制。
  • TrackingEventProcessor-在单独的线程中调用,从EventStore中检索事件,类似于拉动机制。

这可以确保无论事件并发发布的方式如何,TEP都会确保案件的事件处理顺序。

在谈到SEP时,情况略有不同,为此,我们需要稍微深入研究一下实现。您可以假设两个或多个事件的发布并不奇怪。给定域内的正确要求,大量的聚合实现可以做到这一点。该框架具有一种将多个事件的这些事务分组在一起的方式。为此,它使用UnitOfWork。例如,如果要输入聚合的命令处理功能,则可以确保UnitOfWork处于活动状态以协调生命周期。这些任务之一是将事件配对以进行发布。

在您的测试案例中,您将直接使用EventGateway。本来就很完美,但是测试是在没有开始UnitOfWork的情况下进行的,以便将这两个事件按顺序进行协调。仔细研究代码以了解如何发布到SEP,您将在此阶段进入AbstractEventProcessor。进行验证以检查调用UnitOfWorkEventProcessor#publish(List<EventMessage>)是否处于活动状态。如果是这样,则将事件添加到UnitOfWork的右侧。

尽管没有激活UnitOfWork(UoW)时,处理程序将立即被调用。

因此,在使用TrackingEventProcessor时,正是该框架将有意识地启动了一个UoW,以对事件进行批处理以按顺序进行处理。使用SubscribingEventProcessor时,这项工作留给用户,并假设用户通常会通过[命令处理->事件发布->事件处理]的常规流程,这将确保UoW处于活动状态。由于在集成测试中不是这种情况,因此两个发布操作都会立即调用RaceSaga的{​​{1}},由于并发性而创建两个实例。

请注意,建议对此类过程使用TEP。为Saga使用SEP可能意味着您将在应用程序(错误)关闭期间失去一些事件。由于SEP是一种推送机制,因此无法从这些“丢失的”事件(从事件处理器的角度)恢复。 TEP将自行解决事件并跟踪流程,因此TEP将解决此问题。

相信这一点,@ matpiera会为您澄清一切。