可以通过编排相同的消息来发起“大众消费者传奇”吗?

问题描述

7.0中对Event Hub Riders的新支持以及对Sagas的现有InMemoryRepository支持似乎可以为创建汇总states based on a stream of correlated messages,e.g. across all sensors in a Building)提供一种直接的方法在这种情况下,建筑物的标识符将用作消息,佐贺消息的CorrelationId,并用作发送到事件中心的EventData消息的PartitionKey,以确保相同的使用服务实例在给定时间接收该设备的所有消息。 。给定Event Hub's rebalancing works的方式,可以假定在此服务运行时的某个时刻,管理分区的消息的服务实例将转移到新的主机,该主机将开始读取建筑物中传感器发送的消息。那时:

  1. 新主机对旧主机的处理一无所知。它只知道它现在正在接收事件中心分区的消息,其中包括该建筑物的消息。
  2. 发送消息的设备不了解状态聚合责任“下游”的转变,它们仍然像往常一样愉快地报告新的测量值。

这带来的挑战是:在新的服务实例上,我们需要创建一个新的Saga来接管以前的Saga,但是唯一不知道Saga对于给定实体而言不存在的东西是Masstransit:在该实体上没有任何东西新实例知道,从建筑物A读取的传感器是建筑物A的第一个传感器,因为此服务实例接管了跟踪建筑物A的汇总状态。我们认为可以通过使用DataCollectedInitiatedBy标记相同的消息(orchestrates)来解决

public class BuildingAggregator:
            ISaga,InitiatedBy<DataCollected>,//init saga on first DataCollected with a given CorrelationId seen
            orchestrates<DataCollected> //then keep handling those in that saga
{
     //saga Consume methods
}

但是,当BuildingAggregator收到带有给定Guid的第二条DataCollected消息时,将引发以下异常:

Saga exception on receipt of MasstransitFW_POC.Program+DataCollected: The message cannot be accepted by an existing saga
   at Masstransit.Saga.Policies.NewSagaPolicy`2.Masstransit.Saga.ISagaPolicy<TSaga,TMessage>.Existing(SagaConsumeContext`2 context,IPipe`1 next)
   at Masstransit.Saga.SendSagaPipe`2.<Send>d__5.MoveNext()
--- End of stack trace from prevIoUs location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptiondispatchInfo.Throw()
   at Masstransit.Saga.SendSagaPipe`2.<Send>d__5.MoveNext()
--- End of stack trace from prevIoUs location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Masstransit.Saga.InMemoryRepository.InMemorySagaRepositoryContextFactory`1.<Send>d__4`1.MoveNext()
--- End of stack trace from prevIoUs location where exception was thrown ---

还有另一种实现此逻辑的方法吗?这是应用Sagas的“错误方法”吗?

解决方法

根据对上述问题的Chris Patterson's评论,这可以通过状态机语法实现:

Initially(
    When(DataCollected)
        .Then(f => _logger.LogInformation("Initiating Network Manager for Network: {NetworkId}",f.Data.NetworkId))
        .TransitionTo(Running));
During(Running,When(DataCollected)
        .Then(f => { // activities and state transitions }),When(SimulationComplete)
        .Then(f => _logger.LogInformation("Network {NetworkId} shutting down.",f.Instance.CorrelationId))
        .TransitionTo(Final));

请注意在初始状态转换和由初始条件设置的状态转换中,如何同时处理DataCollected事件。