Event Store 持久订阅是如何工作的? 我们用来监听事件的代码我们的事件存储日志持久订阅配置

问题描述

我目前正在开展一个项目,在该项目中我们使用 Event Store 作为我们事件的写入存储。附加事件效果很好,当我们想要监听这些事件时就会出现问题。

为了监听事件存储事件,我们使用官方事件存储 Node.js 客户端 npm i @eventstore/db-client

我们确实在管理界面中创建了持久订阅。要连接到此订阅,我们使用 eventStoreClient.connectToPersistentSubscription 并且它可以正确连接。

问题是我们的事件是为事件重播的。事实上,他们被困在停放的消息列表中。

事件重播是正常行为吗? 事件存储如何记住它提供给 Node.js 客户端的事件?

注意:当前正在播放事件,我们的投影以正确的方式构建,但事件将永远重播。

我们用来监听事件的代码

const stream = this.eventStoreClient.connectToPersistentSubscription<JSONEventType>('$ce-precontrol','my-group',{
            bufferSize: 10,},{

        })

        for await (const e of stream) {
            const { event,commitPosition,link } = e
            // console.log(event,link)

            if (!event) {
                this.logger.warn(`The event with commit position ${commitPosition} is undefined.`,{ event,link })
                return
            }

            // We should map the event instead of casting it.
            // Every field of an event which are instance of classes won't be correctly
            // unserialized,we need to re-instatiate those classes.
            const eventBody = event.data as Event
            console.log(`${eventBody.fpsProjectId}  -----  ${eventBody.kind}`)

            this.logger.info(`dispatching an event ${eventBody.kind} ${eventBody.fpsProjectId}`,link })

            try {
                await this.eventHandler.dispatch(eventBody)
                this.logger.info(`Event ${eventBody.kind} ${eventBody.fpsProjectId} dispatched successfully.`,link })
                await stream.ack(event.id)
                // this.logger.info(`Event ${eventBody.fpsProjectId} ${eventBody.kind} was ackNowledged successfully.`,link })
            } catch (error) {
                await stream.nack(RETRY,error.message,event.id)
                this.logger.error(`Something went wrong while dispatching an event. Error message: ${error.message}`,{ error })
            }
        }

我们的事件存储日志

{"@t":"2021-06-23T15:51:48.3692381Z","@mt":"SLOW BUS MSG [{bus}]: {message} - {elapsed}ms. Handler: {handler}.","@l":"Debug","bus":"PersistentSubscriptionsBus","message":"PersistentSubscriptionTimerTick","elapsed":1981,"handler":"PersistentSubscriptionService","SourceContext":"EventStore.Core.Bus.InMemoryBus","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:49.3743720Z","@mt":"retrying message {subscriptionId} {stream}/{eventNumber}","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ThreadId":44}
{"@t":"2021-06-23T15:51:49.3746323Z","eventNumber":46,"ThreadId":44}
{"@t":"2021-06-23T15:51:49.3746694Z","eventNumber":47,"ThreadId":44}
{"@t":"2021-06-23T15:51:54.3993484Z","eventNumber":40,"ThreadId":47}
{"@t":"2021-06-23T15:51:55.4072471Z","eventNumber":41,"ThreadId":4}
{"@t":"2021-06-23T15:51:56.4114240Z","eventNumber":42,"ThreadId":9}
{"@t":"2021-06-23T15:51:57.4147394Z","eventNumber":43,"ThreadId":9}
{"@t":"2021-06-23T15:51:58.4250298Z","eventNumber":44,"ThreadId":9}
{"@t":"2021-06-23T15:51:59.4330048Z","ThreadId":9}
{"@t":"2021-06-23T15:51:59.4333048Z","ThreadId":9}
{"@t":"2021-06-23T15:51:59.4335573Z","ThreadId":9}
{"@t":"2021-06-23T15:52:04.4664585Z","ThreadId":47}
{"@t":"2021-06-23T15:52:05.4775447Z","ThreadId":44}
{"@t":"2021-06-23T15:52:06.4824377Z","ThreadId":9}
{"@t":"2021-06-23T15:52:07.4876752Z","ThreadId":44}
{"@t":"2021-06-23T15:52:08.4934409Z","ThreadId":9}
{"@t":"2021-06-23T15:52:09.5068553Z","ThreadId":4}
{"@t":"2021-06-23T15:52:09.5069364Z","ThreadId":4}
{"@t":"2021-06-23T15:52:09.5069701Z","ThreadId":4}
{"@t":"2021-06-23T15:52:14.5376026Z","ThreadId":4}
{"@t":"2021-06-23T15:52:15.5472390Z","ThreadId":47}
{"@t":"2021-06-23T15:52:16.5521045Z","ThreadId":9}
{"@t":"2021-06-23T15:52:17.5623183Z","ThreadId":24}
{"@t":"2021-06-23T15:52:18.5650711Z","ThreadId":47}
{"@t":"2021-06-23T15:52:19.5781601Z","ThreadId":44}
{"@t":"2021-06-23T15:52:19.5782624Z","ThreadId":44}
{"@t":"2021-06-23T15:52:19.5782980Z","ThreadId":44}
{"@t":"2021-06-23T15:52:24.6146992Z","ThreadId":9}
{"@t":"2021-06-23T15:52:25.6217820Z","ThreadId":44}
{"@t":"2021-06-23T15:52:26.6296129Z","ThreadId":24}
{"@t":"2021-06-23T15:52:27.6418730Z","ThreadId":44}
{"@t":"2021-06-23T15:52:28.6438550Z","ThreadId":44}
{"@t":"2021-06-23T15:52:29.6514819Z","ThreadId":4}
{"@t":"2021-06-23T15:52:29.6515344Z","ThreadId":4}
{"@t":"2021-06-23T15:52:29.6515586Z","ThreadId":4}
{"@t":"2021-06-23T15:52:34.6962266Z","ThreadId":49}
{"@t":"2021-06-23T15:52:35.6984264Z","ThreadId":44}
{"@t":"2021-06-23T15:52:36.7013623Z","ThreadId":4}
{"@t":"2021-06-23T15:52:37.7038263Z","ThreadId":4}
{"@t":"2021-06-23T15:52:38.7111554Z","ThreadId":47}
{"@t":"2021-06-23T15:52:39.7205755Z","ThreadId":47}
{"@t":"2021-06-23T15:52:39.7206264Z","ThreadId":47}
{"@t":"2021-06-23T15:52:39.7206490Z","ThreadId":47}
{"@t":"2021-06-23T15:52:44.7380921Z","ThreadId":49}
{"@t":"2021-06-23T15:52:45.7465243Z","ThreadId":44}
{"@t":"2021-06-23T15:52:46.7523226Z","ThreadId":44}
{"@t":"2021-06-23T15:52:47.7535593Z","ThreadId":49}
{"@t":"2021-06-23T15:52:48.7656883Z","ThreadId":24}
{"@t":"2021-06-23T15:52:49.7718183Z","ThreadId":49}
{"@t":"2021-06-23T15:52:49.7718754Z","ThreadId":49}
{"@t":"2021-06-23T15:52:49.7718986Z","ThreadId":49}
{"@t":"2021-06-23T15:52:54.8063912Z","ThreadId":47}
{"@t":"2021-06-23T15:52:55.8160008Z","ThreadId":47}
{"@t":"2021-06-23T15:52:56.8185175Z","ThreadId":47}
{"@t":"2021-06-23T15:52:57.8277609Z","ThreadId":24}
{"@t":"2021-06-23T15:52:58.8294945Z","ThreadId":49}
{"@t":"2021-06-23T15:52:59.8368339Z","ThreadId":4}
{"@t":"2021-06-23T15:52:59.8369266Z","ThreadId":4}
{"@t":"2021-06-23T15:52:59.8369451Z","ThreadId":4}
{"@t":"2021-06-23T15:53:00.4461267Z","elapsed":609,"ThreadId":4}
{"@t":"2021-06-23T15:53:05.4787174Z","ThreadId":24}
{"@t":"2021-06-23T15:53:06.4825158Z","ThreadId":4}
{"@t":"2021-06-23T15:53:07.4891830Z","ThreadId":49}
{"@t":"2021-06-23T15:53:08.4988531Z","ThreadId":49}
{"@t":"2021-06-23T15:53:09.5058824Z","ThreadId":47}
{"@t":"2021-06-23T15:53:10.5174611Z","ThreadId":44}
{"@t":"2021-06-23T15:53:10.5175070Z","ThreadId":44}
{"@t":"2021-06-23T15:53:10.5175221Z","ThreadId":44}
{"@t":"2021-06-23T15:53:15.5455199Z","ThreadId":49}
{"@t":"2021-06-23T15:53:16.5574829Z","ThreadId":47}
{"@t":"2021-06-23T15:53:17.5670153Z","ThreadId":49}
{"@t":"2021-06-23T15:53:18.5712367Z","ThreadId":4}
{"@t":"2021-06-23T15:53:19.5807126Z","ThreadId":4}
{"@t":"2021-06-23T15:53:20.5878979Z","ThreadId":44}
{"@t":"2021-06-23T15:53:20.5879796Z","ThreadId":44}
{"@t":"2021-06-23T15:53:20.5880027Z","ThreadId":44}

持久订阅配置

enter image description here

编辑:我们更改了脚本以确认链接事件而不是链接事件,因为 $ce-precontrol 是链接的流。

我们的活动现在不再重试。

const stream = this.eventStoreClient.connectToPersistentSubscription<JSONEventType>('$ce-precontrol',{

        })

for await (const e of stream) {
            const { event,link })
                await stream.ack(link?.id || '')
                this.logger.info(`Event ${eventBody.fpsProjectId} ${eventBody.kind} was ackNowledged successfully.`,link?.id || '')
                this.logger.error(`Something went wrong while dispatching an event. Error message: ${error.message}`,{ error })
            }
        }

解决方法

我的活动永远重试。我正在收听由链接到的流。 我配置了持久订阅以解析指向的链接,因此当我访问 event.id 时,我获得了链接的事件 ID。

我使用此链接的事件 ID 来确认事件,而不是使用指向事件 ID 的链接调用 ack。

因为我的事件没有确认,它们确实超时并重试,直到达到最大重试次数。


for await (const e of stream) {
            const { event,commitPosition,link } = e
            // console.log(event,link)

            if (!event) {
                this.logger.warn(`The event with commit position ${commitPosition} is undefined.`,{ event,link })
                return
            }

            // We should map the event instead of casting it.
            // Every field of an event which are instance of classes won't be correctly
            // unserialized,we need to re-instatiate those classes.
            const eventBody = event.data as Event
            console.log(`${eventBody.fpsProjectId}  -----  ${eventBody.kind}`)

            this.logger.info(`Dispatching an event ${eventBody.kind} ${eventBody.fpsProjectId}`,link })

            try {
                await this.eventHandler.dispatch(eventBody)
                this.logger.info(`Event ${eventBody.kind} ${eventBody.fpsProjectId} dispatched successfully.`,link })
                // Using the link to event id to ack
                await stream.ack(link?.id || '')
                this.logger.info(`Event ${eventBody.fpsProjectId} ${eventBody.kind} was acknowledged successfully.`,link })
            } catch (error) {
                await stream.nack(RETRY,error.message,link?.id || '')
                this.logger.error(`Something went wrong while dispatching an event. Error message: ${error.message}`,{ error })
            }
        }