问题描述
我遇到了一个要求,我希望轴突要等到针对特定Command触发的eventbus中的所有事件完成执行。我将简要介绍这种情况:
我有一个RestController,它会在下面的命令中触发以创建应用程序实体:
@RestController
class myController{
@PostMapping("/create")
@ResponseBody
public String create(
org.axonframework.commandhandling.gateway.CommandGateway.sendAndWait(new CreateApplicationCommand());
System.out.println(“in myController:: after sending CreateApplicationCommand”);
}
}
此命令正在Aggregate中处理,Aggregate类用org.axonframework.spring.stereotype.Aggregate
注释:
@Aggregate
class MyAggregate{
@CommandHandler //org.axonframework.commandhandling.CommandHandler
private MyAggregate(CreateApplicationCommand command) {
org.axonframework.modelling.command.AggregateLifecycle.apply(new AppCreatedEvent());
System.out.println(“in MyAggregate:: after firing AppCreatedEvent”);
}
@EventSourcingHandler //org.axonframework.eventsourcing.EventSourcingHandler
private void on(AppCreatedEvent appCreatedEvent) {
// Updates the state of the aggregate
this.id = appCreatedEvent.getId();
this.name = appCreatedEvent.getName();
System.out.println(“in MyAggregate:: after updating state”);
}
}
AppCreatedEvent在2个地方处理:
- 如上所示,在聚合本身中。
- 在以下投影类中:
@EventHandler //org.axonframework.eventhandling.EventHandler
void on(AppCreatedEvent appCreatedEvent){
// persists into database
System.out.println(“in Projection:: after saving into database”);
}
这里的问题是在第一个地方(即聚合内)捕获事件后,调用将返回给myController。 即这里的输出是:
in MyAggregate:: after firing AppCreatedEvent
in MyAggregate:: after updating state
in myController:: after sending CreateApplicationCommand
in Projection:: after saving into database
我想要的输出是:
in MyAggregate:: after firing AppCreatedEvent
in MyAggregate:: after updating state
in Projection:: after saving into database
in myController:: after sending CreateApplicationCommand
简单地说,我希望轴突等到针对特定命令触发的所有事件都完全执行完,然后返回触发该命令的类。
在论坛上搜索后,我知道所有sendAndWait所做的工作都是等到命令的处理和事件的发布完成为止,然后我对Reactor Extension也感到厌倦,但使用下面的方法却得到了相同的结果:{{ 1}}
有人可以帮我吗。 预先感谢。
解决方法
在您所处的情况下,@ rohit最好的办法是接受您在此处使用最终一致的解决方案的事实。因此,命令处理与事件处理完全隔离,使您创建的查询模型最终与命令模型(您的集合)保持一致。因此,您不必完全等待事件,而只需对存在查询模型的情况做出反应即可。
真正做到这一点归结为构建您的应用程序,例如“是的,我知道我的回复可能不是最新的,但可能在不久的将来”。因此,建议您在分派命令之前或之后订阅您感兴趣的结果。
例如,您可能会看到这是将WebSocket与STOMP协议一起使用,或者您可以使用Project Reactor并使用Flux
结果类型来接收结果。
但是,从您的描述中我还是感觉到您或您的企业已决定UI组件应以(老式的)同步方式做出反应。没关系,但是在使用最终固有的一致性(例如CQRS)时,它会刺痛您的* ss。但是,您可以假冒您正在前端同步的事实。
要实现此目的,我建议使用Axon订阅查询来订阅您知道将通过要发送的命令进行更新的查询模型。 在伪代码中,看起来有点像这样:
public Result mySynchronousCall(String identifier) {
// Subscribe to the updates to come
SubscriptionQueryResult<Result> result = QueryGateway.subscriptionQuery(...);
// Issue command to update
CommandGateway.send(...);
// Wait on the Flux for the first result,and then close it
return result.updates()
.next()
.map(...)
.timeout(...)
.doFinally(it -> result.close());
}
您可能会看到this在此示例WebFluxRest
类中已经完成。
请注意,实际上,您实际上是在关闭前端的门,以通过这样做来利用异步优势。它会起作用,并且允许您等待结果一出现就立即出现,但是您会失去一些灵活性。