使用CQRS是否可以直接从Axon聚合的命令处理程序运行查询?

问题描述

这更多是与CQRS和Axon有关的理论问题。下面是一个不言自明的设置,该代码代码,并非旨在进行编译。

假设要从聚合“ Foo”处理命令,我们首先需要查询一个聚合“ Bar”的状态以进行验证(从另一个有界上下文中进行,因此这不是简单地查找“ Foo”的成员聚集)。

我们在这里有两个选择,如伪代码所示。选择(1),我们仅使用查询网关从“ Foo”集合中的命令处理程序中运行查询

选择(2),我们应用事件要求专用服务来处理查询,而不是“ Foo”。在查询系统的“ Bar”状态后,该服务将向“ Foo”发送命令。

第一种选择(从命令处理程序中直接直接查询)似乎违背了命令-查询分离的整个思想-毕竟,我们这样在命令处理过程中执行查询,对吧?

第二种选择似乎更符合CQRS的精神:该命令仅导致一个事件(稍后将导致另一个命令,依此类推)。但这显然是有代价的:涉及更多的步骤2a,2b,2c,2d ...

我想听听社区对此的看法。对我来说,如果我们严格限制不将命令与查询处理混合使用,而是允许通过命令处理执行查询,这似乎很奇怪。还是我错过了什么?

@Aggregate
class AggregateFoo {

    private QueryGateway queryGateway;

    @CommandHandler
    public void on(UpdateFooCommand command){

        /*
            Assume that in order to validate this command we first need
            to query the state of another aggregate,"Bar".
        */

        // 1. We can just issue the query directly from the command handler.

        queryGateway
            .query(new AskForStateOfBarQuery(command.getBarId()))
            .then(queryResponse -> {
                // Proceed with original command execution depending
                // the result of the query response.    
            });

        // 2a. Or we can issue an intermediate EVENT offloading the query handling
        //     to a dedicated service ("FooBarService",see below).

        AggregateLifecycle.apply(new FooUpdateValidationRequestedEvent(command.getBarId()));

    }

    // 2d. "Foo" aggregate will react to the validation command from the
    //     dedicated service effectively executing the original command.

    @CommandHandler
    public void on(ProceedWithFooUpdateCommand command){

        // Do other validations,issue events here. At this
        // point we kNow that UpdateFooCommand was validated.
    } 

}

@Service
class FooBarService {

    private QueryGateway queryGateway;
    private CommandGateway commandGateway;

    @EventHandler
    public void on(FooUpdateValidationRequestedEvent event){
        
        // 2b. The dedicated service will run the corresponding query,//     asking for the state of "Bar".
        
        queryGateway
            .query(new AskForStateOfBarQuery(command.getBarId()))
            .then(queryResponse -> {

                // 2c. And will issue a COMMAND to the "Foo" aggregate
                //     indicating that it shoud proceed with the original 
                //     command's (UpdateFooCommand) execution.

                commandGateway.send(new ProceedWithFooUpdateCommand(command.getFooId()));

            });

    }

}

更新:

这是讨论91stefan给出的有益答案之后的更新(见下文)。

class AggregateFoo {
    int f = 9;

    // reference to the related Bar aggregate
    UUID bar;

    on(UpdateFooCommand){
        // Assume we must execute ONLY IF f < 10 AND bar.b > 10.
        // So we apply event to Saga (with f = 9),// Saga will ask Bar: (b = 15),so the condition holds
        // and Saga issues ConfirmValidBarStateCommand
    }

    // Meanwhile,when Saga is busy validating,we process another
    // command CHANGING the state of Foo
    on(AnotherCommand) { f++; }

    // or "ConfirmValidBarStateCommand" as in 91stefan's example
    on(ProceedWithFooUpdateCommand){
        // So when we get back here (from Saga),there is no guarantee
        // that the actual state of Foo (f < 10) still holds,// and that we can proceed with the execution of the
        // original UpdateFooCommand
    }

}

class AggregateBar {
    int b = 15;
}

因此,问题似乎仍然存在:如果其验证依赖于来自另一个有界上下文的Bar状态,那么如何在Foo中一致地验证和执行命令?看起来我们可能在这里有几种选择:

  • 从Foo的命令处理程序中直接查询Bar的投影状态
  • 事件(无状态)服务,该服务查询Bar的投影状态
  • (91stefan的建议)事件发生在Saga中,该事件将验证命令发送到Bar

解决方法

您可能会遇到问题,从命令处理程序中进行查询不被认为是一种好习惯,因为由于最终的一致性,您的投影可能不是最新的。同样,来自相同聚合的命令处理程序将顺序/同步执行。您只想快速完成任务,调用外部服务将阻止并阻止其他命令的执行,直到当前命令处理程序完成执行为止。

在这种情况下,您需要的是Saga。 https://docs.axoniq.io/reference-guide/v/3.3/part-ii-domain-logic/sagas

简化流程为:

  • commandHandler(UpdateFooCommand)-> applyEvent(AskedForStateOfBarEvent)
  • AskedForStateOfBarEvent启动传奇
  • 在传奇中,将命令发送到Bar聚合-> ConfirmYourStateBar(barAggregateId)
  • commandHandler(ConfirmYourStateBar)->验证状态-> applyEvent(StateIsValidatedEvent)
  • Saga对StateIsValidatedEvent做出反应,向AggregateFoo发送命令->(ConfirmValidBarStateCommand(aggreageFooId)&saga已关闭(结束事件为StateIsValidatedEvent)
  • 在commandHandler(ConfirmValidBarStateCommand)中,因为状态BAR状态有效,所以您继续执行