从订阅事件处理器迁移到跟踪事件处理器

问题描述

所以我们正在慢慢尝试迁移到跟踪处理器。作为其中的一部分,我们为写入 Elasticsearch 的新事件创建了一个新处理组,并将其设置为使用跟踪处理器(认为订阅)。

从现在开始一切正常。但是我希望新的处理组事件处理程序处理过去生成的一大堆事件。

我查看了 token_entry 表,但不太确定如何重置它。

例如,这似乎是作为字符串的令牌(blob)

<org.axonframework.eventhandling.GlobalSequenceTrackingToken><globalIndex>1794</globalIndex></org.axonframework.eventhandling.GlobalSequenceTrackingToken>,<org.axonframework.eventhandling.GlobalSequenceTrackingToken><globalIndex>22966</globalIndex></org.axonframework.eventhandling.GlobalSequenceTrackingToken>,<org.axonframework.eventhandling.GlobalSequenceTrackingToken><globalIndex>22966</globalIndex></org.axonframework.eventhandling.GlobalSequenceTrackingToken>

最好手动重置。

解决方法

要重置 StreamingEventProcessor,例如 TrackingEventProcessor (TEP),您可以在正确的实例上调用 StreamingEventProcessor#resetTokens 方法。

要找到正确的实例,您可以使用 EventProcessingConfiguration#eventProcessor(String,Class) 方法检索包含 OptionalEventProcessor。如果您为“Class”参数提供 TrackingEventProcessor.class,框架会自动将其转换为 TEP 实例。

虽然您也可以直接调整存储的 TrackingToken,但不建议这样做。如果您在数据库中更改它,则框架不知道您正在执行重播。因此,您将失去 Axon 重播 API 的优势。顺便说一下,您可以查看关于 here 主题的参考指南。