events – 命令处理程序,聚合,存储库和CQRS中的事件存储之间的关系

我想了解基于CQRS的系统中命令处理程序,聚合,存储库和事件存储之间的关系的一些细节。

我到目前为止所理解的:

>命令处理程序从总线接收命令。它们负责从存储库加载适当的聚合并调用聚合上的域逻辑。一旦完成,他们从总线中删除命令。
>聚合提供行为和内部状态。国家永远不会公开。改变状态的唯一方法是使用行为。模拟此行为的方法从命令的属性创建事件,并将这些事件应用到聚合,然后依次调用相应设置内部状态的事件处理程序。
>存储库只允许在给定的ID上加载聚合,并添加新的聚合。基本上,存储库将域连接到事件存储。
>事件存储,最后但并非最不重要,负责将事件存储到数据库(或使用任何存储),并将这些事件重新加载为所谓的事件流。

到现在为止还挺好。
现在有一些我还没有得到的问题:

>如果命令处理程序要调用现有聚合上的行为,一切都很容易。命令处理程序获取对存储库的引用,调用其loadById方法,并返回聚合。但是当没有聚合时,命令处理程序做什么,但是应该创建一个?从我的理解,聚合应该以后使用事件重建。这意味着聚合的创建是对fooCreated事件的回复。但是为了能够存储任何事件(包括fooCreated),我需要一个聚合。所以这看起来像一个鸡和鸡蛋的问题:我不能创建聚合没有事件,但应该创建事件的唯一组件是聚合。所以基本上它归结为:如何创建新的聚合,谁做什么?
>当聚合触发事件时,内部事件处理程序对其进行响应(通常通过应用方法调用)并更改聚合的状态。如何将此事件移交到存储库?谁发起“请将新事件发送到存储库/事件存储”操作?聚合本身?通过观察聚合存储库?订阅内部事件的其他人? …?
>最后但并非最不重要的是,我有一个问题,正确地理解事件流的概念:在我的想象中,它只是一个事件的有序列表。重要的是它是“有序的”。这是正确的吗?

以下是基于我自己的经验和我的实验与Lokad.CQRS,NCQRS等各种框架。我相信有多种方法来处理这个。我会发布对我最有意义的。

聚合创建:

每次命令处理程序需要一个聚合时,它使用一个存储库。存储库从事件存储库检索相应的事件列表,并调用重载的构造函数,注入事件

var stream = eventStore.LoadStream(id)
var User = new User(stream)

如果聚合之前不存在,则流将为空,并且新创建的对象将处于其原始状态。您可能需要确保在此状态下,只有几个命令可以使聚合生命,例如。 User.Create()。

2.新活动的存储

命令处理发生在工作单元内部。在命令执行期间,每个结果事件将被添加到聚合(User.Changes)内的列表。一旦执行完成,更改将附加到事件存储。在下面的示例中,发生在以下行:

store.AppendToStream(cmd.UserId,stream.Version,user.Changes)

3.活动顺序

想象一下,如果两个后续的CustomerMoved事件以错误的顺序重播,会发生什么。

一个例子

我将尝试用一段伪代码来说明(我故意留下命令处理程序中的存储库关注,以显示幕后会发生什么):

应用服务:

UserCommandHandler
    Handle(createuser cmd)
        stream = store.LoadStream(cmd.UserId)
        user = new User(stream.Events)
        user.Create(cmd.UserName,...)
        store.AppendToStream(cmd.UserId,user.Changes)

    Handle(BlockUser cmd)
        stream = store.LoadStream(cmd.UserId)
        user = new User(stream.Events)
        user.Block(string reason)
        store.AppendToStream(cmd.UserId,user.Changes)

骨料:

User
    created = false
    blocked = false

    Changes = new List<Event>

    ctor(eventStream)
        foreach (event in eventStream)
            this.Apply(event)

    Create(userName,...)
        if (this.created) throw "User already exists"
        this.Apply(new UserCreated(...))

    Block(reason)
        if (!this.created) throw "No such user"
        if (this.blocked) throw "User is already blocked"
        this.Apply(new UserBlocked(...))

    Apply(userCreatedEvent)
        this.created = true
        this.Changes.Add(userCreatedEvent)

    Apply(userBlockedEvent)
        this.blocked = true
        this.Changes.Add(userBlockedEvent)

更新:

作为一个附注:伊夫的回答提醒了我一个有趣的文章Udi Dahan从几年前:

> Don’t Create Aggregate Roots

相关文章

迭代器模式(Iterator)迭代器模式(Iterator)[Cursor]意图...
高性能IO模型浅析服务器端编程经常需要构造高性能的IO模型,...
策略模式(Strategy)策略模式(Strategy)[Policy]意图:定...
访问者模式(Visitor)访问者模式(Visitor)意图:表示一个...
命令模式(Command)命令模式(Command)[Action/Transactio...
生成器模式(Builder)生成器模式(Builder)意图:将一个对...