问题描述
我对 InMemoryTestHarness
和状态机传奇有疑问。看起来每个发布的消息都被 saga 消费了两次。如果发送(未发布)消息,则不会发生问题。当我将 CorrelationId
更改为自定义字段 - ProcessId
时,问题开始了。下面是暴露问题的简化示例。
传奇定义:
public class MySaga : MasstransitStateMachine<MySagaState>
{
public State InProgress { get; private set; }
public Event<StartProcess> StartProcess { get; private set; }
public Event<ProcessstageFinished> StageFinished { get; private set; }
public Event<ProcessFinished> ProcessFinished { get; private set; }
public MySaga()
{
InstanceState(x => x.CurrentState);
Event(() => StartProcess,e => e
.CorrelateBy(i => i.ProcessId,x => x.Message.ProcessId)
.SelectId(x => NewId.NextGuid()));
Event(() => StageFinished,x => x.Message.ProcessId)
.SelectId(x => NewId.NextGuid()));
Event(() => ProcessFinished,x => x.Message.ProcessId)
.SelectId(x => NewId.NextGuid()));
Initially(
When(StartProcess)
.Then(x => x.Instance.ProcessId = x.Data.ProcessId)
.TransitionTo(InProgress)
);
During(InProgress,When(StageFinished)
.Then(x => x.Instance.Stage++)
);
During(InProgress,When(ProcessFinished)
.Finalize()
);
}
}
public class MySagaState : SagaStateMachineInstance,ISagaVersion
{
public Guid CorrelationId { get; set; }
public int Version { get; set; }
public string ProcessId { get; set; }
public string CurrentState { get; set; }
public int Stage { get; set; }
}
public record StartProcess(Guid CorrelationId,string ProcessId) : correlatedBy<Guid>
{
}
public record ProcessstageFinished(Guid CorrelationId,string ProcessId) : correlatedBy<Guid>
{
}
public record ProcessFinished(Guid CorrelationId,string ProcessId) : correlatedBy<Guid>
{
}
还有 xUnit 测试:
public class MySagaTests
{
InMemoryTestHarness Harness { get; }
IStateMachinesagaTestHarness<MySagaState,MySaga> SagaHarness { get; }
public MySagaTests()
{
var services = new ServiceCollection()
.AddMasstransitInMemoryTestHarness(config =>
{
config.AddSagaStateMachine<MySaga,MySagaState>(sagaConfig =>
{
sagaConfig.UseConcurrentMessageLimit(1);
sagaConfig.UseInMemoryOutBox();
})
.InMemoryRepository();
config.AddSagaStateMachineTestHarness<MySaga,MySagaState>();
});
var serviceProvider = services.BuildServiceProvider(true);
Harness = serviceProvider.GetrequiredService<InMemoryTestHarness>();
Harness.OnConfigureInMemoryReceiveEndpoint += config =>
{
config.ConfigureSagas(serviceProvider.GetrequiredService<IBusRegistrationContext>());
};
SagaHarness = serviceProvider.GetrequiredService<IStateMachinesagaTestHarness<MySagaState,MySaga>>();
}
[Fact]
public async Task TestMySaga()
{
string processId = "newProcessId";
var correlationId = NewId.NextGuid();
await Harness.Start();
try
{
await Harness.Bus.Publish(new StartProcess(correlationId,processId));
Assert.True(await Harness.Published.Any<StartProcess>());
Assert.True(await Harness.Consumed.Any<StartProcess>());
Assert.True(await SagaHarness.Consumed.Any<StartProcess>());
Assert.Equal(1,SagaHarness.Sagas.Count()); // HERE should be only one saga created
Assert.True(await SagaHarness.Created.Any(s => s.ProcessId == processId && s.CurrentState == "InProgress"));
await Harness.Bus.Publish(new ProcessstageFinished(correlationId,processId));
Assert.True(await Harness.Published.Any<ProcessstageFinished>());
Assert.True(await Harness.Consumed.Any<ProcessstageFinished>());
Assert.True(await SagaHarness.Consumed.Any<ProcessstageFinished>());
var saga = SagaHarness.Sagas.Select(s => s.ProcessId == processId).FirstOrDefault()?.Saga;
Assert.NotNull(saga);
Assert.Equal(1,saga.Stage); // HERE stage should by 1
await Harness.Bus.Publish(new ProcessFinished(correlationId,processId));
Assert.True(await Harness.Published.Any<ProcessFinished>());
Assert.True(await Harness.Consumed.Any<ProcessFinished>());
Assert.True(await SagaHarness.Consumed.Any<ProcessFinished>());
Assert.True(await SagaHarness.Sagas.Any(s => s.ProcessId == processId && s.CurrentState == "Final"));
}
finally
{
await Harness.Stop();
}
}
}
我已经在简单的内存总线配置和 RabbitMQ 上进行了尝试。在这两种配置上它都可以正常工作。消息仅在 InMemoryTestHarness
内使用两次。
您有什么建议应该修复吗?乍一看,这似乎是一种错误的行为。
解决方法
是的,删除此行 - 您正在两个不同的端点上配置 saga。
Harness.OnConfigureInMemoryReceiveEndpoint += config =>
{
config.ConfigureSagas(serviceProvider.GetRequiredService<IBusRegistrationContext>());
};
,
在 [@Chris Patterson] 回复之后,我重新组织了测试。也许它会帮助某人,在下面重新组织测试:
public class MySagaTests
{
InMemoryTestHarness Harness { get; }
IStateMachineSagaTestHarness<MySagaState,MySaga> SagaHarness { get; }
public MySagaTests()
{
var services = new ServiceCollection()
.AddMassTransitInMemoryTestHarness(config =>
{
config.AddSagaStateMachine<MySaga,MySagaState>(sagaConfig =>
{
sagaConfig.UseConcurrentMessageLimit(1);
sagaConfig.UseInMemoryOutbox();
})
.InMemoryRepository();
config.AddSagaStateMachineTestHarness<MySaga,MySagaState>();
});
var serviceProvider = services.BuildServiceProvider(true);
Harness = serviceProvider.GetRequiredService<InMemoryTestHarness>();
SagaHarness = serviceProvider.GetRequiredService<IStateMachineSagaTestHarness<MySagaState,MySaga>>();
}
[Fact]
public async Task TestMySaga()
{
string processId = "newProcessId";
var correlationId = NewId.NextGuid();
await Harness.Start();
try
{
await Harness.Bus.Publish(new StartProcess(correlationId,processId));
var sagaExists = (await SagaHarness.Exists(s => s.ProcessId == processId,x => x.InProgress)).Any();
Assert.True(sagaExists,"Saga not exists");
await Harness.Bus.Publish(new ProcessStageFinished(correlationId,processId));
sagaExists = (await SagaHarness.Exists(s => s.ProcessId == processId && s.Stage == 1,"Saga not exists");
await Harness.Bus.Publish(new ProcessFinished(correlationId,processId));
sagaExists = (await SagaHarness.Exists(s => s.ProcessId == processId,x => x.Final)).Any();
Assert.True(sagaExists,"Saga not exists");
}
finally
{
await Harness.Stop();
}
}
}