如何使命令等待直到针对它触发的所有事件成功完成

问题描述

我遇到了一个要求,我希望轴突要等到针对特定Command触发的eventbus中的所有事件完成执行。我将简要介绍这种情况:

我有一个RestController,它会在下面的命令中触发以创建应用程序实体:

@RestController
class myController{
  @PostMapping("/create")
  @ResponseBody
  public String create(
    org.axonframework.commandhandling.gateway.CommandGateway.sendAndWait(new CreateApplicationCommand());
    System.out.println(“in myController:: after sending CreateApplicationCommand”);
  }
}

此命令正在Aggregate中处理,Aggregate类用org.axonframework.spring.stereotype.Aggregate注释:

@Aggregate
class MyAggregate{
   @CommandHandler //org.axonframework.commandhandling.CommandHandler
   private MyAggregate(CreateApplicationCommand command) {
      org.axonframework.modelling.command.AggregateLifecycle.apply(new AppCreatedEvent());
      System.out.println(“in MyAggregate:: after firing AppCreatedEvent”);
   }

   @EventSourcingHandler //org.axonframework.eventsourcing.EventSourcingHandler
   private void on(AppCreatedEvent appCreatedEvent) {
      // Updates the state of the aggregate
      this.id = appCreatedEvent.getId();
      this.name = appCreatedEvent.getName();
      System.out.println(“in MyAggregate:: after updating state”);
   }
}

AppCreatedEvent在2个地方处理:

  1. 如上所示,在聚合本身中。
  2. 在以下投影类中:
 @EventHandler //org.axonframework.eventhandling.EventHandler
 void on(AppCreatedEvent appCreatedEvent){
    // persists into database
    System.out.println(“in Projection:: after saving into database”);
 }

这里的问题是在第一个地方(即聚合内)捕获事件后,调用将返回给myController。 即这里的输出是:

in MyAggregate:: after firing AppCreatedEvent
in MyAggregate:: after updating state
in myController:: after sending CreateApplicationCommand
in Projection:: after saving into database

我想要的输出是:

in MyAggregate:: after firing AppCreatedEvent
in MyAggregate:: after updating state
in Projection:: after saving into database
in myController:: after sending CreateApplicationCommand

简单地说,我希望轴突等到针对特定命令触发的所有事件都完全执行完,然后返回触发该命令的类。

在论坛上搜索后,我知道所有sendAndWait所做的工作都是等到命令的处理和事件的发布完成为止,然后我对Reactor Extension也感到厌倦,但使用下面的方法却得到了相同的结果:{{ 1}}

有人可以帮我吗。 预先感谢。

解决方法

在您所处的情况下,@ rohit最好的办法是接受您在此处使用最终一致的解决方案的事实。因此,命令处理与事件处理完全隔离,使您创建的查询模型最终与命令模型(您的集合)保持一致。因此,您不必完全等待事件,而只需对存在查询模型的情况做出反应即可。

真正做到这一点归结为构建您的应用程序,例如“是的,我知道我的回复可能不是最新的,但可能在不久的将来”。因此,建议您在分派命令之前或之后订阅您感兴趣的结果。 例如,您可能会看到这是将WebSocket与STOMP协议一起使用,或者您可以使用Project Reactor并使用Flux结果类型来接收结果。

但是,从您的描述中我还是感觉到您或您的企业已决定UI组件应以(老式的)同步方式做出反应。没关系,但是在使用最终固有的一致性(例如CQRS)时,它会刺痛您的* ss。但是,您可以假冒您正在前端同步的事实。

要实现此目的,我建议使用Axon订阅查询来订阅您知道将通过要发送的命令进行更新的查询模型。 在伪代码中,看起来有点像这样:

public Result mySynchronousCall(String identifier) {
    // Subscribe to the updates to come
    SubscriptionQueryResult<Result> result = QueryGateway.subscriptionQuery(...);
    // Issue command to update
    CommandGateway.send(...);
    // Wait on the Flux for the first result,and then close it
    return result.updates()
                 .next()
                 .map(...)
                 .timeout(...)
                 .doFinally(it -> result.close());

}

您可能会看到this在此示例WebFluxRest类中已经完成。

请注意,实际上,您实际上是在关闭前端的门,以通过这样做来利用异步优势。它会起作用,并且允许您等待结果一出现就立即出现,但是您会失去一些灵活性。