如何在 MassTransit 和 Automatonymous 中配置 EF Core 持久性?

问题描述

我正在尝试使用 EF Core 作为持久性配置 Automatonymous worker 实现。我通过 api 发布事件并使用 RabbitMq 作为传输在托管服务中处理它。不幸的是,数据库不存储机器状态。我应用了迁移并看到表 OrderState,但在我发布 OrderSubmmited 事件后其中没有数据。它得到了正确处理,因为我在控制台中看到了一个日志,但数据库表仍然是空的。我错过了什么?

这是我的代码

事件:

public interface OrderSubmitted : correlatedBy<Guid>
    {
    }

消费者:

public class OrderSubmittedConsumer : IConsumer<OrderSubmitted>
    {
        public Task Consume(ConsumeContext<OrderSubmitted> context)
        {
            Console.Out.WriteLine($"Order with id {context.Message.CorrelationId} has been submitted.");
            
            return Task.CompletedTask;
        }
    }

我的传奇实例:

public class OrderState : SagaStateMachineInstance
    {
        public Guid CorrelationId { get; set; }
        public string CurrentState { get; set; }
    }

状态机:

public class OrderStateMachine : MasstransitStateMachine<OrderState>
    {
        public State Submitted { get; private set; }

        public Event<OrderSubmitted> OrderSubmitted { get; set; }

        public OrderStateMachine()
        {
            Event(() => OrderSubmitted);
            
            InstanceState(x => x.CurrentState);
            
            Initially(
                When(OrderSubmitted)
                    .TransitionTo(Submitted));
            
            DuringAny(
                When(OrderSubmitted)
                    .TransitionTo(Submitted));
        }
    }
public class OrderStateMachineDeFinition : SagaDeFinition<OrderState>
    {
        public OrderStateMachineDeFinition()
        {
            ConcurrentMessageLimit = 15;
        }
    }

数据库上下文:

public class OrderStateDbContext : SagaDbContext
    {
        public OrderStateDbContext(DbContextOptions options) : base(options)
        {
        }

        protected override IEnumerable<ISagaClassMap> Configurations
        {
            get
            {
                yield return new OrderStateMap();
            }
        }
    }
public class OrderStateMap : SagaClassMap<OrderState>
    {
        protected override void Configure(EntityTypeBuilder<OrderState> entity,ModelBuilder model)
        {
            entity.Property(x => x.CurrentState).HasMaxLength(64);
        }
    }

程序类:

public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureServices((hostContext,services) =>
                {
                    services.AddDbContext<OrderStateDbContext>(builder =>
                         builder.UsesqlServer("Server=(localdb)\\mssqllocaldb;Database=OrderState;Trusted_Connection=True;",m =>
                         {
                             m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
                             m.MigrationsHistoryTable($"__{nameof(OrderStateDbContext)}");
                         }));
                    
                    services.AddMasstransit(config =>
                    {
                        config.AddSagaRepository<OrderState>()
                            .EntityFrameworkRepository(r =>
                            {
                                r.ExistingDbContext<OrderStateDbContext>();
                                r.LockStatementProvider = new sqlServerLockStatementProvider();
                            });
                        
                        config.AddConsumer<OrderSubmittedConsumer>();

                        config.UsingRabbitMq((ctx,cfg) => {
                            cfg.Host("amqp://guest:guest@localhost:5672");
                            cfg.ReceiveEndpoint("order-queue",c => {
                                c.ConfigureConsumer<OrderSubmittedConsumer>(ctx);
                                // I'm assuming this is the place where something like c.StateMachinesaga() is missing,but I don't kNow how should this look like with EF Core
                            });
                        });
                    });
                    
                    services.AddHostedService<Worker>();
                });
    }

工人类:

public class Worker : IHostedService
    {
        private readonly IBusControl _bus;

        public Worker(IBusControl bus)
        {
            _bus = bus;
        }

        public async Task StartAsync(CancellationToken cancellationToken)
        {
            await _bus.StartAsync(cancellationToken).ConfigureAwait(false);
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            return _bus.StopAsync(cancellationToken);
        }
    }

解决方法

在您的代码示例中,您没有添加 saga 或在接收端点上配置 saga。消费者是一个独立的东西,与传奇完全无关。你应该打电话给:

AddSagaStateMachine<OrderStateMachine,OrderState,OrderStateMachineDefinition>()

然后使用 ConfigureSaga<OrderState> 或切换到 ConfigureEndpoints 以自动配置端点。