问题描述
我目前正在开展一个项目,在该项目中我们使用 Event Store 作为我们事件的写入存储。附加事件效果很好,当我们想要监听这些事件时就会出现问题。
为了监听事件存储事件,我们使用官方事件存储 Node.js 客户端 npm i @eventstore/db-client
。
我们确实在管理界面中创建了持久订阅。要连接到此订阅,我们使用 eventStoreClient.connectToPersistentSubscription
并且它可以正确连接。
问题是我们的事件是为事件重播的。事实上,他们被困在停放的消息列表中。
事件重播是正常行为吗? 事件存储如何记住它提供给 Node.js 客户端的事件?
注意:当前正在播放事件,我们的投影以正确的方式构建,但事件将永远重播。
我们用来监听事件的代码
const stream = this.eventStoreClient.connectToPersistentSubscription<JSONEventType>('$ce-precontrol','my-group',{
bufferSize: 10,},{
})
for await (const e of stream) {
const { event,commitPosition,link } = e
// console.log(event,link)
if (!event) {
this.logger.warn(`The event with commit position ${commitPosition} is undefined.`,{ event,link })
return
}
// We should map the event instead of casting it.
// Every field of an event which are instance of classes won't be correctly
// unserialized,we need to re-instatiate those classes.
const eventBody = event.data as Event
console.log(`${eventBody.fpsProjectId} ----- ${eventBody.kind}`)
this.logger.info(`dispatching an event ${eventBody.kind} ${eventBody.fpsProjectId}`,link })
try {
await this.eventHandler.dispatch(eventBody)
this.logger.info(`Event ${eventBody.kind} ${eventBody.fpsProjectId} dispatched successfully.`,link })
await stream.ack(event.id)
// this.logger.info(`Event ${eventBody.fpsProjectId} ${eventBody.kind} was ackNowledged successfully.`,link })
} catch (error) {
await stream.nack(RETRY,error.message,event.id)
this.logger.error(`Something went wrong while dispatching an event. Error message: ${error.message}`,{ error })
}
}
我们的事件存储日志
{"@t":"2021-06-23T15:51:48.3692381Z","@mt":"SLOW BUS MSG [{bus}]: {message} - {elapsed}ms. Handler: {handler}.","@l":"Debug","bus":"PersistentSubscriptionsBus","message":"PersistentSubscriptionTimerTick","elapsed":1981,"handler":"PersistentSubscriptionService","SourceContext":"EventStore.Core.Bus.InMemoryBus","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:49.3743720Z","@mt":"retrying message {subscriptionId} {stream}/{eventNumber}","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ThreadId":44}
{"@t":"2021-06-23T15:51:49.3746323Z","eventNumber":46,"ThreadId":44}
{"@t":"2021-06-23T15:51:49.3746694Z","eventNumber":47,"ThreadId":44}
{"@t":"2021-06-23T15:51:54.3993484Z","eventNumber":40,"ThreadId":47}
{"@t":"2021-06-23T15:51:55.4072471Z","eventNumber":41,"ThreadId":4}
{"@t":"2021-06-23T15:51:56.4114240Z","eventNumber":42,"ThreadId":9}
{"@t":"2021-06-23T15:51:57.4147394Z","eventNumber":43,"ThreadId":9}
{"@t":"2021-06-23T15:51:58.4250298Z","eventNumber":44,"ThreadId":9}
{"@t":"2021-06-23T15:51:59.4330048Z","ThreadId":9}
{"@t":"2021-06-23T15:51:59.4333048Z","ThreadId":9}
{"@t":"2021-06-23T15:51:59.4335573Z","ThreadId":9}
{"@t":"2021-06-23T15:52:04.4664585Z","ThreadId":47}
{"@t":"2021-06-23T15:52:05.4775447Z","ThreadId":44}
{"@t":"2021-06-23T15:52:06.4824377Z","ThreadId":9}
{"@t":"2021-06-23T15:52:07.4876752Z","ThreadId":44}
{"@t":"2021-06-23T15:52:08.4934409Z","ThreadId":9}
{"@t":"2021-06-23T15:52:09.5068553Z","ThreadId":4}
{"@t":"2021-06-23T15:52:09.5069364Z","ThreadId":4}
{"@t":"2021-06-23T15:52:09.5069701Z","ThreadId":4}
{"@t":"2021-06-23T15:52:14.5376026Z","ThreadId":4}
{"@t":"2021-06-23T15:52:15.5472390Z","ThreadId":47}
{"@t":"2021-06-23T15:52:16.5521045Z","ThreadId":9}
{"@t":"2021-06-23T15:52:17.5623183Z","ThreadId":24}
{"@t":"2021-06-23T15:52:18.5650711Z","ThreadId":47}
{"@t":"2021-06-23T15:52:19.5781601Z","ThreadId":44}
{"@t":"2021-06-23T15:52:19.5782624Z","ThreadId":44}
{"@t":"2021-06-23T15:52:19.5782980Z","ThreadId":44}
{"@t":"2021-06-23T15:52:24.6146992Z","ThreadId":9}
{"@t":"2021-06-23T15:52:25.6217820Z","ThreadId":44}
{"@t":"2021-06-23T15:52:26.6296129Z","ThreadId":24}
{"@t":"2021-06-23T15:52:27.6418730Z","ThreadId":44}
{"@t":"2021-06-23T15:52:28.6438550Z","ThreadId":44}
{"@t":"2021-06-23T15:52:29.6514819Z","ThreadId":4}
{"@t":"2021-06-23T15:52:29.6515344Z","ThreadId":4}
{"@t":"2021-06-23T15:52:29.6515586Z","ThreadId":4}
{"@t":"2021-06-23T15:52:34.6962266Z","ThreadId":49}
{"@t":"2021-06-23T15:52:35.6984264Z","ThreadId":44}
{"@t":"2021-06-23T15:52:36.7013623Z","ThreadId":4}
{"@t":"2021-06-23T15:52:37.7038263Z","ThreadId":4}
{"@t":"2021-06-23T15:52:38.7111554Z","ThreadId":47}
{"@t":"2021-06-23T15:52:39.7205755Z","ThreadId":47}
{"@t":"2021-06-23T15:52:39.7206264Z","ThreadId":47}
{"@t":"2021-06-23T15:52:39.7206490Z","ThreadId":47}
{"@t":"2021-06-23T15:52:44.7380921Z","ThreadId":49}
{"@t":"2021-06-23T15:52:45.7465243Z","ThreadId":44}
{"@t":"2021-06-23T15:52:46.7523226Z","ThreadId":44}
{"@t":"2021-06-23T15:52:47.7535593Z","ThreadId":49}
{"@t":"2021-06-23T15:52:48.7656883Z","ThreadId":24}
{"@t":"2021-06-23T15:52:49.7718183Z","ThreadId":49}
{"@t":"2021-06-23T15:52:49.7718754Z","ThreadId":49}
{"@t":"2021-06-23T15:52:49.7718986Z","ThreadId":49}
{"@t":"2021-06-23T15:52:54.8063912Z","ThreadId":47}
{"@t":"2021-06-23T15:52:55.8160008Z","ThreadId":47}
{"@t":"2021-06-23T15:52:56.8185175Z","ThreadId":47}
{"@t":"2021-06-23T15:52:57.8277609Z","ThreadId":24}
{"@t":"2021-06-23T15:52:58.8294945Z","ThreadId":49}
{"@t":"2021-06-23T15:52:59.8368339Z","ThreadId":4}
{"@t":"2021-06-23T15:52:59.8369266Z","ThreadId":4}
{"@t":"2021-06-23T15:52:59.8369451Z","ThreadId":4}
{"@t":"2021-06-23T15:53:00.4461267Z","elapsed":609,"ThreadId":4}
{"@t":"2021-06-23T15:53:05.4787174Z","ThreadId":24}
{"@t":"2021-06-23T15:53:06.4825158Z","ThreadId":4}
{"@t":"2021-06-23T15:53:07.4891830Z","ThreadId":49}
{"@t":"2021-06-23T15:53:08.4988531Z","ThreadId":49}
{"@t":"2021-06-23T15:53:09.5058824Z","ThreadId":47}
{"@t":"2021-06-23T15:53:10.5174611Z","ThreadId":44}
{"@t":"2021-06-23T15:53:10.5175070Z","ThreadId":44}
{"@t":"2021-06-23T15:53:10.5175221Z","ThreadId":44}
{"@t":"2021-06-23T15:53:15.5455199Z","ThreadId":49}
{"@t":"2021-06-23T15:53:16.5574829Z","ThreadId":47}
{"@t":"2021-06-23T15:53:17.5670153Z","ThreadId":49}
{"@t":"2021-06-23T15:53:18.5712367Z","ThreadId":4}
{"@t":"2021-06-23T15:53:19.5807126Z","ThreadId":4}
{"@t":"2021-06-23T15:53:20.5878979Z","ThreadId":44}
{"@t":"2021-06-23T15:53:20.5879796Z","ThreadId":44}
{"@t":"2021-06-23T15:53:20.5880027Z","ThreadId":44}
持久订阅配置
编辑:我们更改了脚本以确认链接事件而不是链接事件,因为 $ce-precontrol 是链接的流。
我们的活动现在不再重试。
const stream = this.eventStoreClient.connectToPersistentSubscription<JSONEventType>('$ce-precontrol',{
})
for await (const e of stream) {
const { event,link })
await stream.ack(link?.id || '')
this.logger.info(`Event ${eventBody.fpsProjectId} ${eventBody.kind} was ackNowledged successfully.`,link?.id || '')
this.logger.error(`Something went wrong while dispatching an event. Error message: ${error.message}`,{ error })
}
}
解决方法
我的活动永远重试。我正在收听由链接到的流。
我配置了持久订阅以解析指向的链接,因此当我访问 event.id
时,我获得了链接的事件 ID。
我使用此链接的事件 ID 来确认事件,而不是使用指向事件 ID 的链接调用 ack。
因为我的事件没有确认,它们确实超时并重试,直到达到最大重试次数。
for await (const e of stream) {
const { event,commitPosition,link } = e
// console.log(event,link)
if (!event) {
this.logger.warn(`The event with commit position ${commitPosition} is undefined.`,{ event,link })
return
}
// We should map the event instead of casting it.
// Every field of an event which are instance of classes won't be correctly
// unserialized,we need to re-instatiate those classes.
const eventBody = event.data as Event
console.log(`${eventBody.fpsProjectId} ----- ${eventBody.kind}`)
this.logger.info(`Dispatching an event ${eventBody.kind} ${eventBody.fpsProjectId}`,link })
try {
await this.eventHandler.dispatch(eventBody)
this.logger.info(`Event ${eventBody.kind} ${eventBody.fpsProjectId} dispatched successfully.`,link })
// Using the link to event id to ack
await stream.ack(link?.id || '')
this.logger.info(`Event ${eventBody.fpsProjectId} ${eventBody.kind} was acknowledged successfully.`,link })
} catch (error) {
await stream.nack(RETRY,error.message,link?.id || '')
this.logger.error(`Something went wrong while dispatching an event. Error message: ${error.message}`,{ error })
}
}