问题描述
我实际上是 CQRS 和事件溯源方面的新手。
当我看到这段代码时,我对聚合中的命令处理程序感到困惑:
with open('report.txt','r') as in_file:
stripped = (line.strip() for line in in_file)
lines = (line.split(',') for line in stripped if line)
lines = in_file.read().splitlines()
last_line = lines[-3]
print (last_line)
with open('log.csv','w') as out_file:
writer = csv.writer(out_file)
writer.writerow(('date','time','val'))
writer.writerows(last_line)
那个命令处理程序只是发布一个事件。如您所见,对象的命名是 @AllArgsConstructor
@NoArgsConstructor
@Getter
@Aggregate
public class BankAccountAggregate {
@AggregateIdentifier
private UUID id;
private BigDecimal balance;
private String owner;
@CommandHandler
public BankAccountAggregate(CreateAccountCommand command){
AggregateLifecycle.apply(
new AccountCreatedEvent(
command.getAccountId(),command.getinitialBalance(),command.getowner()
)
);
}
但并不意味着 AccountCreatedEvent
对吗?
帐户创建在订阅 Account is created
的 EventHandler 中:
AccountCreatedEvent
我所知道的是:
- 事件是已经发生的事情。
- 命令是我们想要发生的事情。
那我们为什么不把 @Slf4j
@requiredArgsConstructor
@Component
public class BankAccountProjection {
private final BankAccountRepository repository;
private final QueryUpdateEmitter updateEmitter;
@EventHandler
public void on(AccountCreatedEvent event) throws Exception {
log.debug("Handling a Bank Account creation command {}",event.getId());
BankAccount bankAccount = new BankAccount(
event.getId(),event.getowner(),event.getinitialBalance()
);
this.repository.save(bankAccount);
Boolean isActive = AggregateLifecycle.isLive();
}
}
的逻辑放在 Command Handler 中呢?这样做的目的是什么?
解决方法
你混淆了一些概念。让我们澄清一下。
事件溯源
如果您实施事件溯源,则意味着您的真相来源是事件本身。您存储事件,而不是具体的“状态”(实体)。让我们看看一些伪代码:
要创建一个新帐户:
function createAccount(data) {
event = new AccountCreatedEvent(data)
eventStore.save(event)
}
从账户中取款,例如:
function withdraw(data) {
events = eventStore.getEvents(data.accountId)
account = new Account()
account.apply(events)
account.withdraw(data)
newEvents = account.newEvents
eventStore.save(newEvents)
}
(此方法将由您的命令处理程序调用)
如您所见,您从其事件生成 account
,而不是从存储库中读取它。帐户类将是这样的:
class Account {
amount = 0
newEvents = []
function apply(events) {
events.forEach(event => {
if event == AccountCreatedEvent {
this.amount = event.initialAmount
} else if (event == WithdrawalApplied) {
this.amount = this.amount - event.amount
} // more event types
})
}
function withdraw(data) {
// here is where you ensure aggregate invariants (business rules)
if this.amount == 0 {
throw Error("no money")
}
this.amount = this.amount - data.amount
newEvents.add(new WithdrawalApplied(data))
}
}
所以你的问题:
那个命令处理程序只是发布一个事件。正如你所看到的 对象的命名是 AccountCreatedEvent 但这并不意味着 帐户创建正确吗?
答案是您应该将事件存储在命令处理程序中。这恰恰意味着帐户已创建。然后,您可以根据需要发布事件。继续阅读。
CQRS
使用 CQRS,您只需将查询和命令分开,但这种技术可以在完全没有事件溯源的情况下应用。
预测
由于您的真实来源由一堆事件组成,例如,当客户想要通过 ID 查询帐户时,您需要查询所有事件并从中建立您的帐户。这可能很慢。因此,在使用 CQRS 和事件溯源时,为了实现更快的读取,您可以应用此技术。
基本上,它包括侦听事件并构建聚合的预构建投影。该投影可以存储在 MongoDB、PostgreSQL 甚至文件中。这是一个实现细节。
下图说明了三种技术(CQRS、事件溯源和投影)如何协同工作。
,我实际上是 CQRS 和事件溯源方面的新手。 当我看到这段代码时,我对聚合中的命令处理程序感到困惑
不是你的错 - 我研究 CQRS/ES 已经有一段时间了,我发现 Nebrass Lamouchi 的设计是外星人。
在阅读这篇文章时,我看到了许多其他危险信号;因此,如果您想了解与这些设计理念相关的“最佳实践”,我鼓励您查看其他来源。
Command Handler 是否只是从总线接收命令并发布事件?
差不多;一般而言,当我们谈论命令处理程序时,我们通常会描述一段应用程序代码,它使用队列中的消息,并在这样做时更新我们的持久数据存储。
所以如果你要长手写出代码,通常的模式看起来像
void handle(Command c) {
root = repo.get(c.accountId())
root.updateMyself(c)
repo.save(root)
}
如果此时您还想广播事件,那么通常也会在应用层这里发生
void handle(Command c) {
root = repo.get(c.accountId())
root.updateMyself(c)
repo.save(root)
// In "event sourced" designs,this sort of publish is
// normally "outside" of the transaction. When using
// a relational data store,it might not be. Tradeoffs
publish(root.events())
}
但我们在这里没有看到,因为拉穆奇还演示了 Axon 框架,这减少了样板代码的数量 - 实际上隐藏了管道。
这是个好主意吗?花一些时间观看 Greg Young 的 8 Lines of Code。