了解 Axon 事件溯源

问题描述

我一直在学习轴突和事件溯源,我想我终于理解了其中的一部分并且在我的脑海中是有道理的,但我想确保我对它的理解是正确的,并且我没有做任何错误代码有效,我也可以在 DOMAIN_EVENT_ENTRY 表中看到事件。

我将在下面发布我的代码(文档中的简单礼品卡示例)并解释我的思考过程。如果我没有正确理解它,请您帮助我以正确的方式理解该部分。

我没有包含命令/事件,因为它们非常简单,包含 id、amount 字段

首先是我的代码

TestRunner.java

package com.example.demoaxon;

import java.util.UUID;

import org.axonframework.commandhandling.gateway.CommandGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class TestRunner implements CommandLineRunner {
    
    private final CommandGateway commandGateway;
    
    @Autowired
    public TestRunner(CommandGateway commandGateway) {
        this.commandGateway = commandGateway;
    }
    
    @Override
    public void run(String... args) throws Exception {
        log.info("send command");
        String id = UUID.randomUUID().toString();
        commandGateway.sendAndWait(new IssueCardCommand(id,100));
        commandGateway.sendAndWait(new RedeemCardCommand(id,90));
        
    }
}

礼品卡.java

package com.example.demoaxon;

import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import static org.axonframework.modelling.command.AggregateLifecycle.apply;
import org.axonframework.spring.stereotype.Aggregate;

import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@NoArgsConstructor
@Aggregate
@Slf4j
public class GiftCard {
    
    @AggregateIdentifier
    private String giftCardId;
    private Integer amount;
    
    @CommandHandler
    public GiftCard(IssueCardCommand cmd) {
        log.info("handling {}",cmd);
        apply(new CardissuedEvent(cmd.getCardId(),cmd.getAmount()));
    }
    
    @EventSourcingHandler
    public void onCardissuedEvent(CardissuedEvent evt) {
        log.info("applying {}",evt);
        this.giftCardId = evt.getCardId();
        this.amount = evt.getAmount();
    }
    
    @CommandHandler
    public void redeemCardCommandHandler(RedeemCardCommand cmd) {
        log.info("handling {}",cmd);
        this.amount -= cmd.getAmount();
        apply(new CardRedeemedEvent(cmd.getCardId(),cmd.getTransactionId(),this.amount));
    }
    
    @EventSourcingHandler
    public void onCardRedeemedEvent(CardRedeemedEvent evt) {
        log.info("applying {}",evt);
        this.amount = evt.getAmount();
    }   
}

据我所知:

  1. 在我的 TestRunner 类中,命令网关使用命令总线将 issueCardCommand 分派给它的 CommandHandler,然后创建 {{1} } 总计的。在这GiftCard 中我们可以执行任何逻辑,然后我们使用这个 CommandHandler 方法

  2. apply 方法用于在 apply(event) 聚合范围内将 CardissuedEvent 发布为 EventMessage,它还调用 {{1} } 对于该特定事件,因此在本例中为 GiftCard。它将 EventMessage 发布到 EventBus 并发送到 EventHandlers。

  3. EventSourcingHandler 中,我们可以对 onCardissuedEvent 聚合进行任何状态更改,并且我们还使用 spring Jpa 将事件持久化到 @EventSourcingHandler onCardissuedEvent 表。

  4. 一旦这个 GiftCard 执行完毕,聚合对象就不再存在了。

  5. 现在再次在我的 DOMAIN_EVENT_ENTRY 类中,命令网关将 CommandHandler 分派到其 TestRunner 并且由于第一个命令不再存在,使用空的无参数构造函数创建对象。 axon 框架从这个 RedeemCardCommand 表中检索所有事件,并为 CommandHandler 聚合实例重放所有事件 (EventSourcingHandlers) 以获得它的当前状态(这就是 DOMAIN_EVENT_ENTRY重要)。

  6. 后执行 GiftCard 方法,它执行任何逻辑并应用在聚合中发布的事件并调用它的 @AggregateIdentifier。此 RedeemCardCommandHandler 然后更新 EventSourcingHandler 聚合的状态/持久保存到 EventSourcingHandler 表。

我对事件溯源如何工作的理解正确吗?

解决方法

让我试着引导你完成这段旅程!

你的理解几乎完全正确。为了更好地理解,我只想将其拆分为 Event Sourcing 和 Axon。

  • 事件溯源

简而言之,事件溯源是一种通过过去发生的事件历史来存储应用程序状态的方式。请记住,您还有其他模式,例如状态存储聚合。 在您的示例中,您使用的是事件溯源,因此 @EventSourcingHandler 已就位。现在进入下一个主题。

  • 轴突

我建议您阅读我们一位同事撰写的awesome blog,特别是其中包含的幻灯片,您将看到 Axon 框架内的消息旅程!

现在说到你的观点,我想事先澄清一些事情:

  1. 正确,您调度一个 Command 并且由于注释的方法是一个构造函数,它将为您创建一个聚合。 CommandHanlder 是业务逻辑和验证的正确位置。
  2. 此处 apply 将在内部发布消息(向此 Aggregate 以及它们的 Entities/AggregateMembers)然后再向 EventBus .来自 javadoc:

该事件应立即应用于聚合并安排发布到其他事件处理程序。

  1. 由于我们在谈论事件溯源,因此所有 EventSourcingHandler 都将被调用,并且 Aggregate 的状态将被修改/更新。这很重要,因为如前所述,这是您在需要时重建聚合状态的方式。但是事件的持久化在这里没有发生,它已经被安排在这个过程完成时发生。

  2. 正确。

  3. 也对。这通常与事件溯源以及它如何重建您的 Aggregate 相关。

  4. 也更正第 3 点的观察,关于事件何时发布/保存/持久化。


关于您的代码的另一个小提示:您在 @CommandHandler

上执行此操作
@CommandHandler
public void redeemCardCommandHandler(RedeemCardCommand cmd) {
    log.info("handling {}",cmd);
    this.amount -= cmd.getAmount(); // you should not do this here
    apply(new CardRedeemedEvent(cmd.getCardId(),cmd.getTransactionId(),this.amount));
}

此状态更改应在 @EventSourcingHandler 中进行。在 @CommandHandler 上,您应该只在此聚合有足够的“钱”可以兑换时进行验证:)

,

是的,看起来您正确使用了命令网关,并且聚合具有使用正确内容注释的正确方法!您使用的是分布式事务的 Saga 设计模式,它比 2PC(两阶段提交)好得多,因此调用下一步的每一步都是正确的做法。

只要开始saga方法用@StartSaga注解,结束方法用@EndSaga注解,那么步骤会按顺序运行,回滚应该也能正常运行

saveAndWait 函数实际上返回一个 CompletableFuture,因此,如果您愿意,可以在调试模式下单步执行线程并暂停线程直到整个 saga 完成(如果这就是您想要的)喜欢做。

我唯一关心的是 - 您使用 Axon 服务器作为事件源,还是 Axon 支持的其他事件总线或源?标准 Axon 服务器的唯一问题是它是 Free Tier 产品,并且他们提供更好、更受支持的版本,例如 Axon Server Enterprise(提供更多功能 - 更适合基础设施) - 所以如果您为一家公司这样做,那么他们可能愿意为更高级别的额外支持付费...

Axon 框架确实支持 Spring AMQP,这很有用,因为这样,您可以更好地控制自己的基础设施,而不是被捆绑到付费的 Axon Server - 我实际上并不好与否。

如果您可以让它与自定义事件总线/源程序一起工作,那么您的系统将非常好 - 结合在 Axon 上开发的简易性。

只要聚合上有正确的注释,并且您命令网关正确自动装配,那么一切都应该正常工作。我唯一担心的是对 Axon Framework 的支持还不够...