问题描述
使用Axon 4.3.5在我的Saga中从跟踪模式切换到订阅模式时,我看到了意外的行为
在订阅模式下,当两个线程同时到达两个@StarSaga方法时,似乎为相同的关联键值创建了两个sagas。 我想念什么吗?
我有这个可以复制它:
@Saga
@ProcessingGroup("Saga")
public class RaceSaga {
@Inject
protected transient CommandGateway commandGateway;
@StartSaga
@SagaEventHandler(associationProperty = "executionId")
public void on(Exec exec) {
commandGateway.sendAndWait(new CreateExecCommand(exec.getExecutionId(),exec.getDescription()));
}
@StartSaga
@SagaEventHandler(associationProperty = "executionId")
public void on(Risk risk) {
commandGateway.sendAndWait(new CreateRiskCommand(risk.getExecutionId(),risk.getResult()));
}
}
@IntegrationTest
class RaceConditionTest extends BaseIntegrationTest {
@Autowired
private EventGateway eventGateway;
@Autowired
private SagaStore sagaStore;
@Test
void sagarace() {
var execId = UUID.randomUUID();
CompletableFuture.runAsync(() -> eventGateway.publish(new Exec(execId.toString(),"desc")));
CompletableFuture.runAsync(() -> eventGateway.publish(new Risk(execId.toString(),"OK")));
var association = new AssociationValue("executionId",execId.toString());
await().during(5,SECONDS)
.untilAsserted(() -> assertthat(sagaStore.findSagas(RaceSaga.class,association))
.hasSize(1));
}
}
解决方法
说实话,这是给定测试设置的预期行为,但需要一些解释。
请注意,以下是订阅(SEP)和跟踪事件处理器(TEP)之间的主要区别:
-
SubscribingEventProcessor
-在EventBus
上发布事件的线程中调用,类似于推送机制。 -
TrackingEventProcessor
-在单独的线程中调用,从EventStore
中检索事件,类似于拉动机制。
这可以确保无论事件并发发布的方式如何,TEP都会确保案件的事件处理顺序。
在谈到SEP时,情况略有不同,为此,我们需要稍微深入研究一下实现。您可以假设两个或多个事件的发布并不奇怪。给定域内的正确要求,大量的聚合实现可以做到这一点。该框架具有一种将多个事件的这些事务分组在一起的方式。为此,它使用UnitOfWork
。例如,如果要输入聚合的命令处理功能,则可以确保UnitOfWork
处于活动状态以协调生命周期。这些任务之一是将事件配对以进行发布。
在您的测试案例中,您将直接使用EventGateway
。本来就很完美,但是测试是在没有开始UnitOfWork
的情况下进行的,以便将这两个事件按顺序进行协调。仔细研究代码以了解如何发布到SEP,您将在此阶段进入AbstractEventProcessor
。进行验证以检查调用UnitOfWork
时EventProcessor#publish(List<EventMessage>)
是否处于活动状态。如果是这样,则将事件添加到UnitOfWork
的右侧。
尽管没有激活UnitOfWork
(UoW)时,处理程序将立即被调用。
因此,在使用TrackingEventProcessor
时,正是该框架将有意识地启动了一个UoW,以对事件进行批处理以按顺序进行处理。使用SubscribingEventProcessor
时,这项工作留给用户,并假设用户通常会通过[命令处理->事件发布->事件处理]的常规流程,这将确保UoW处于活动状态。由于在集成测试中不是这种情况,因此两个发布操作都会立即调用RaceSaga
的{{1}},由于并发性而创建两个实例。
请注意,建议对此类过程使用TEP。为Saga使用SEP可能意味着您将在应用程序(错误)关闭期间失去一些事件。由于SEP是一种推送机制,因此无法从这些“丢失的”事件(从事件处理器的角度)恢复。 TEP将自行解决事件并跟踪流程,因此TEP将解决此问题。
相信这一点,@ matpiera会为您澄清一切。