问题描述
你好,我有一个命令总线,一个查询总线,它基本上有一个带有命令或查询名称和处理程序的密钥对,然后我执行应该发布我的事件的命令。 但是我对如何做我的事件总线有一些疑问。 命令总线是事件总线的一部分吗? 我怎么能用处理程序做一个事件总线
指挥车:
export interface ICommand {
}
export interface ICommandHandler<
TCommand extends ICommand = any,TResult = any
> {
execute(command: TCommand): Promise<TResult>
}
export interface ICommandBus<CommandBase extends ICommand = ICommand> {
execute<T extends CommandBase>(command: T): Promise<any>
register(data:{commandHandler: ICommandHandler,command: ICommand}[]): void
}
命令总线实现:
export class CommandBus<Command extends ICommand = ICommand>
implements ICommandBus<Command> {
private handlers = new Map<string,ICommandHandler<Command>>()
public execute<T extends Command>(command: T): Promise<any> {
const commandName = this.getCommandName(command as any)
const handler = this.handlers.get(commandName)
if (!handler) throw new Error(``)
return handler.execute(command)
}
public register(
data: { commandHandler: ICommandHandler; command: ICommand }[],): void {
data.forEach(({command,commandHandler}) => {
this.bind(commandHandler,this.getCommandName(command as any))
})
}
private bind<T extends Command>(handler: ICommandHandler<T>,name: string) {
this.handlers.set(name,handler)
}
private getCommandName(command: Function): string {
const { constructor } = Object.getPrototypeOf(command)
return constructor.name as string
}
}
这里出现了另一个问题,谁应该负责在我的事件数据库中发布事件或读取我的事件数据库流是我的类事件存储?
事件存储类:
export class EventStoreClient {
[x: string]: any;
/**
* @constructor
*/
constructor(private readonly config: TCPConfig) {
this.type = 'event-store';
this.eventFactory = new EventFactory();
this.connect();
}
connect() {
this.client = new TCPClient(this.config);
return this;
}
getClient() {
return this.client;
}
newEvent(name: any,payload: any) {
return this.eventFactory.newEvent(name,payload);
}
close() {
this.client.close();
return this;
}
}
然后我怀疑如何使用我的事件处理程序和我的事件来实现我的事件总线。
如果有人可以帮助我,我会很高兴..
事件接口:
export interface IEvent {
readonly aggregrateVersion: number
readonly aggregateId: string
}
export interface IEventHandler<T extends IEvent = any> {
handle(event: T): any
}
可能的用法:
commandBus.execute(new Command())
class commandHandler {
constructor(repository: IRepository,eventBus ????){}
execute(){
//how i can publish an event with after command handler logic with event bus her
}
}
解决方法
我发现各种 Bus 和 Event Store 之间存在一些混淆。在尝试实施事件总线之前,您需要回答一个重要问题,该问题是任何事件溯源实施的基础:
- 如何将事件存储作为唯一的事实来源?
也就是说,您的 Event Store 包含域的完整状态。这也意味着事件总线的使用者(无论它最终是什么——消息队列、流平台、Redis 等)应该只获取持久化的事件。因此,目标变为:
- 仅在总线上传递持久保存到 Store 的事件(因此,如果您在写入 Store 时遇到错误,或者可能出现并发异常,请不要通过总线传递!)
- 将所有事件传递给所有感兴趣的消费者,而不会丢失任何事件
这两个目标直观地转化为“我想要在事件存储和事件总线之间进行原子提交”。当它们是同一件事时,这是最容易实现的!
因此,与其考虑如何将“事件总线”连接到命令处理程序并来回发送事件,不如考虑如何从事件存储中检索已经持久化的事件并订阅该事件.这也消除了命令处理程序和事件订阅者之间的任何依赖关系 - 它们位于事件存储的不同侧(写入器与读取器),并且可能位于不同进程、不同机器上。