问题描述
这更多是与CQRS和Axon有关的理论问题。下面是一个不言自明的设置,该代码是伪代码,并非旨在进行编译。
假设要从聚合“ Foo”处理命令,我们首先需要查询另一个聚合“ Bar”的状态以进行验证(从另一个有界上下文中进行,因此这不是简单地查找“ Foo”的成员聚集)。
我们在这里有两个选择,如伪代码所示。选择(1),我们仅使用查询网关从“ Foo”集合中的命令处理程序中运行查询。
选择(2),我们应用事件要求专用服务来处理查询,而不是“ Foo”。在查询系统的“ Bar”状态后,该服务将向“ Foo”发送命令。
第一种选择(从命令处理程序中直接直接查询)似乎违背了命令-查询分离的整个思想-毕竟,我们这样是在命令处理过程中执行查询,对吧?
第二种选择似乎更符合CQRS的精神:该命令仅导致一个事件(稍后将导致另一个命令,依此类推)。但这显然是有代价的:涉及更多的步骤2a,2b,2c,2d ...
我想听听社区对此的看法。对我来说,如果我们严格限制不将命令与查询处理混合使用,而是允许通过命令处理执行查询,这似乎很奇怪。还是我错过了什么?
@Aggregate
class AggregateFoo {
private QueryGateway queryGateway;
@CommandHandler
public void on(UpdateFooCommand command){
/*
Assume that in order to validate this command we first need
to query the state of another aggregate,"Bar".
*/
// 1. We can just issue the query directly from the command handler.
queryGateway
.query(new AskForStateOfBarQuery(command.getBarId()))
.then(queryResponse -> {
// Proceed with original command execution depending
// the result of the query response.
});
// 2a. Or we can issue an intermediate EVENT offloading the query handling
// to a dedicated service ("FooBarService",see below).
AggregateLifecycle.apply(new FooUpdateValidationRequestedEvent(command.getBarId()));
}
// 2d. "Foo" aggregate will react to the validation command from the
// dedicated service effectively executing the original command.
@CommandHandler
public void on(ProceedWithFooUpdateCommand command){
// Do other validations,issue events here. At this
// point we kNow that UpdateFooCommand was validated.
}
}
@Service
class FooBarService {
private QueryGateway queryGateway;
private CommandGateway commandGateway;
@EventHandler
public void on(FooUpdateValidationRequestedEvent event){
// 2b. The dedicated service will run the corresponding query,// asking for the state of "Bar".
queryGateway
.query(new AskForStateOfBarQuery(command.getBarId()))
.then(queryResponse -> {
// 2c. And will issue a COMMAND to the "Foo" aggregate
// indicating that it shoud proceed with the original
// command's (UpdateFooCommand) execution.
commandGateway.send(new ProceedWithFooUpdateCommand(command.getFooId()));
});
}
}
更新:
这是讨论91stefan给出的有益答案之后的更新(见下文)。
class AggregateFoo {
int f = 9;
// reference to the related Bar aggregate
UUID bar;
on(UpdateFooCommand){
// Assume we must execute ONLY IF f < 10 AND bar.b > 10.
// So we apply event to Saga (with f = 9),// Saga will ask Bar: (b = 15),so the condition holds
// and Saga issues ConfirmValidBarStateCommand
}
// Meanwhile,when Saga is busy validating,we process another
// command CHANGING the state of Foo
on(AnotherCommand) { f++; }
// or "ConfirmValidBarStateCommand" as in 91stefan's example
on(ProceedWithFooUpdateCommand){
// So when we get back here (from Saga),there is no guarantee
// that the actual state of Foo (f < 10) still holds,// and that we can proceed with the execution of the
// original UpdateFooCommand
}
}
class AggregateBar {
int b = 15;
}
因此,问题似乎仍然存在:如果其验证依赖于来自另一个有界上下文的Bar状态,那么如何在Foo中一致地验证和执行命令?看起来我们可能在这里有几种选择:
解决方法
您可能会遇到问题,从命令处理程序中进行查询不被认为是一种好习惯,因为由于最终的一致性,您的投影可能不是最新的。同样,来自相同聚合的命令处理程序将顺序/同步执行。您只想快速完成任务,调用外部服务将阻止并阻止其他命令的执行,直到当前命令处理程序完成执行为止。
在这种情况下,您需要的是Saga。 https://docs.axoniq.io/reference-guide/v/3.3/part-ii-domain-logic/sagas
简化流程为:
- commandHandler(UpdateFooCommand)-> applyEvent(AskedForStateOfBarEvent)
- AskedForStateOfBarEvent启动传奇
- 在传奇中,将命令发送到Bar聚合-> ConfirmYourStateBar(barAggregateId)
- commandHandler(ConfirmYourStateBar)->验证状态-> applyEvent(StateIsValidatedEvent)
- Saga对StateIsValidatedEvent做出反应,向AggregateFoo发送命令->(ConfirmValidBarStateCommand(aggreageFooId)&saga已关闭(结束事件为StateIsValidatedEvent)
- 在commandHandler(ConfirmValidBarStateCommand)中,因为状态BAR状态有效,所以您继续执行