问题描述
我最近学习了 Axon、CQRS 和事件溯源。
CMIIW 事件存储就像一个应用程序状态,所以在 Axon 中我们有 org.axonframework.eventhandling.GlobalSequenceTrackingToken
这意味着我们可以从这个记录中知道我们应用程序在某个时间的当前状态,对吗?
processor_name | segment owner| timestamp | token | token_type
myapps.projection| 0 far | 2021-02-03T02:22:46.6003318Z | {"globalIndex":32}| org.axonframework.eventhandling.GlobalSequenceTrackingToken
所以一切正常,直到我忘记进行一些验证,然后我的应用程序卡在 1 个事件 ({"globalIndex":32}
) 中,尝试发布此事件直到成功。
这是因为我的 @Eventhandler
抛出错误。
说明:
我有 2 个事件,具有相同的 userName
,而 @EventHandler
无法解决此问题并自 userName must be unique
起引发错误。
token:31,payload: {"id":"d6554811-ee5a-46d5-973d-1d61ee71c7fe","fullName":"xasxsax","userName":"xasxsaxsa","email":"xasxsa@g.com","password":"adminadmin"}
token:32,payload:{"id":"b41efb71-14ee-4a5f-bdef-93b36e2e6a84","password":"adminadmin"}
我的问题是,是否有任何最佳做法来处理错误,以便其他事件能够正常运行而无需等待修复此单个事件?或者有什么解决方案可以解决这个问题?
解决方法
假设您使用 Axon 4 和默认的 TrackingEventProcessor。 事件形成一个流,所以你必须在处理下一个之前处理这个事件;从@EventHandler 抛出异常告诉 Axon 您当前无法处理该事件。 在这种特定情况下,您可能应该简单地记录错误,而不是抛出错误,因为第二个事件根本没有意义 - 它要求创建已经存在的东西。
对于其他类型的失败事件,再次仅记录错误并允许流继续,然后在您修复失败原因后重播事件可能会很有趣。要为单个聚合重放特定事件,您可以执行以下操作:
@Autowired
EventStore eventStore;
@Test
public void testReplay() {
String aggregateIdentifier = "1234";
long startFrom = 0;
String eventType = "org.sample.model.events.MyEvent";
Class projectorClass = MyProjector.class;
AnnotationEventHandlerAdapter eventHandlerAdapter = new AnnotationEventHandlerAdapter(projectorClass);
DomainEventStream eventStream = eventStore.readEvents(aggregateIdentifier,startFrom);
eventStream.asStream()
.filter(event -> {
// add any type of filtering based on the event here
return event.getType() == eventType;
})
.forEach(event1 -> {
try {
eventHandlerAdapter.handle(event1);
} catch (Exception e) {
e.printStackTrace();
}
});
}
您可以在操作界面中公开此方法,以便您的操作团队在分析后重放特定事件。
,简而言之,失败事件的死信队列将是一个不错的解决方案。就目前而言,Axon Framework 不提供死信队列。它为您提供的是您可以配置的两个级别的错误处理。
首先是 ListenerInvocationErrorHandler
。对于从 @EventHandler
注释函数中抛出的任何异常,都会调用此错误处理程序。 ListenerInvocationErrorHandler
默认配置为 LogginErrorHandler
,正如您可能已经猜到的那样,它只是记录异常并继续。
第二级就是所谓的ErrorHandler
。此接口是为事件处理器内发生的事务性异常调用的。通常应该完全停止您的处理器工作的异常。此错误处理程序默认为 PropagatingErrorHandler
,它只是重新抛出您的异常。在(默认)TrackingEventProcessor
内这样做意味着事件处理器进入重试模式。
根据您的描述,我假设您要么有事务异常,要么已将 ListenerInvocationErrorHandler
配置为 PropagatingErrorHandler
。如果您不想在发生异常时停止处理器,您可以将其配置为简单地记录并继续执行 LoggingErrorHandler
。
这样做是否明智,完全是另一回事。正如 Bart 在另一篇文章中提到的,这些事件在您的商店中,无论您的验证过程决定该事件发生的原因是什么。正如活动商店所规定的那样,它就在那里。这意味着你将不得不应对它。解决此类故障事件的解决方案是在应用程序内部处理补偿操作。可以找到描述这种必要性的技术性较低的方法here。
无论如何,如果事件有问题,您的设置将不得不处理它。例如,添加一个 try-catch 块来处理已知的可能异常。或者使事件处理对于您预期的某些问题完全具有弹性。或者,继续努力,建立一个死信队列。