问题描述
7.0中对Event Hub Riders的新支持以及对Sagas的现有InMemoryRepository
支持似乎可以为创建汇总states based on a stream of correlated messages,e.g. across all sensors in a Building)提供一种直接的方法。在这种情况下,建筑物的标识符将用作消息,佐贺消息的CorrelationId,并用作发送到事件中心的EventData消息的PartitionKey,以确保相同的使用服务实例在给定时间接收该设备的所有消息。 。给定Event Hub's rebalancing works的方式,可以假定在此服务运行时的某个时刻,管理分区的消息的服务实例将转移到新的主机,该主机将开始读取建筑物中传感器发送的消息。那时:
- 新主机对旧主机的处理一无所知。它只知道它现在正在接收事件中心分区的消息,其中包括该建筑物的消息。
- 发送消息的设备不了解状态聚合责任“下游”的转变,它们仍然像往常一样愉快地报告新的测量值。
这带来的挑战是:在新的服务实例上,我们需要创建一个新的Saga来接管以前的Saga,但是唯一不知道Saga对于给定实体而言不存在的东西是Masstransit:在该实体上没有任何东西新实例知道,从建筑物A读取的传感器是建筑物A的第一个传感器,因为此服务实例接管了跟踪建筑物A的汇总状态。我们认为可以通过使用DataCollected
和InitiatedBy
标记相同的消息(orchestrates
)来解决:
public class BuildingAggregator:
ISaga,InitiatedBy<DataCollected>,//init saga on first DataCollected with a given CorrelationId seen
orchestrates<DataCollected> //then keep handling those in that saga
{
//saga Consume methods
}
但是,当BuildingAggregator
收到带有给定Guid的第二条DataCollected
消息时,将引发以下异常:
Saga exception on receipt of MasstransitFW_POC.Program+DataCollected: The message cannot be accepted by an existing saga
at Masstransit.Saga.Policies.NewSagaPolicy`2.Masstransit.Saga.ISagaPolicy<TSaga,TMessage>.Existing(SagaConsumeContext`2 context,IPipe`1 next)
at Masstransit.Saga.SendSagaPipe`2.<Send>d__5.MoveNext()
--- End of stack trace from prevIoUs location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptiondispatchInfo.Throw()
at Masstransit.Saga.SendSagaPipe`2.<Send>d__5.MoveNext()
--- End of stack trace from prevIoUs location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Masstransit.Saga.InMemoryRepository.InMemorySagaRepositoryContextFactory`1.<Send>d__4`1.MoveNext()
--- End of stack trace from prevIoUs location where exception was thrown ---
还有另一种实现此逻辑的方法吗?这是应用Sagas的“错误方法”吗?
解决方法
根据对上述问题的Chris Patterson's评论,这可以通过状态机语法实现:
Initially(
When(DataCollected)
.Then(f => _logger.LogInformation("Initiating Network Manager for Network: {NetworkId}",f.Data.NetworkId))
.TransitionTo(Running));
During(Running,When(DataCollected)
.Then(f => { // activities and state transitions }),When(SimulationComplete)
.Then(f => _logger.LogInformation("Network {NetworkId} shutting down.",f.Instance.CorrelationId))
.TransitionTo(Final));
请注意在初始状态转换和由初始条件设置的状态转换中,如何同时处理DataCollected事件。