Axon Framework-在Axon配置中配置多个事件存储

问题描述

我们有一个用例,其中每个聚合根都应具有不同的事件存储。我们使用了以下配置,目前,我们仅配置了一个如下所示的事件存储

@Configuration
@EnablediscoveryClient
public class AxonConfig {
       private static final String DOMAIN_EVENTS_COLLECTION_NAME = "coll-capture.domainEvents";
      //private static final String DOMAIN_EVENTS_COLLECTION_NAME_TEST = 
      //"coll-capture.domainEvents-test";


       @Value("${mongodb.database}")
       private String databaseName;

       @Value("${spring.application.name}")
       private String appName;

       @Bean
        public RestTemplate restTemplate() {
        CloseableHttpClient httpClient = HttpClientBuilder.create().build();
        HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new 
        HttpComponentsClientHttpRequestFactory(httpClient);

        return new RestTemplate(clientHttpRequestFactory);
    }

        @Bean
        @Profile({"uat","prod"})
        public CommandRouter springCloudHttpBackupCommandRouter(discoveryClient discoveryClient,Registration localInstance,RestTemplate restTemplate,@Value("${axon.distributed.spring- 
        cloud.fallback-url}") String messageRoutinginformationEndpoint) {
        return new SpringCloudHttpBackupCommandRouter(discoveryClient,localInstance,new AnnotationRoutingStrategy(),serviceInstance -> appName.equalsIgnoreCase(serviceInstance.getServiceId()),restTemplate,messageRoutinginformationEndpoint);
    }

         @Bean
         public Repository<TestEnquiry> testEnquiryRepository(EventStore eventStore) {
          return new EventSourcingRepository<>(TestEnquiry.class,eventStore);
    }

         @Bean
         public Repository<Test2Enquiry> test2enquiryRepository(EventStore eventStore) {
           return new EventSourcingRepository<>(Test2Enquiry.class,eventStore);
    }

    
         @Bean
          public EventStorageEngine eventStorageEngine(MongoClient client) {
        MongoTemplate mongoTemplate = new DefaultMongoTemplate(client,databaseName)
                .withDomainEventsCollection(DOMAIN_EVENTS_COLLECTION_NAME);
        return new MongoEventStorageEngine(mongoTemplate);
        }
    

}

现在,我们还要在EventStorageEngine中配置“ DOMAIN_EVENTS_COLLECTION_NAME_TEST”(仅作为示例)。我们如何获得对多个事件存储的相同支持,以及如何选择它们应属于哪个集合的跟踪过程

解决方法

如果您要分离事件流,那么从事件处理的角度将它们组合起来确实是必要的。尤其是当有多个bounded contexts时,将事件流分成不同的存储解决方案是合理的。

如果要定义TrackingEventProcessor使用哪个[消息源/事件存储],则必须处理EventProcessingConfigurer。更具体地说,您应该调用EventProcessingConfigurer#registerTrackingEventProcessor(String,Function<Configuration,StreamableMessageSource<TrackedEventMessage<?>>>)方法。第一个String参数是要配置为“跟踪”的处理器的名称。第二个参数定义一个Function,它为您提供此TrackingEventProcessor(TEP)使用的消息源。在此处应提供您希望此TEP提取事件的事件存储。

当然也可以在以后对它们进行配对,这也得到Axon Framework的支持。归结为StreamableMessageSource实现的特定形式。 更具体地说,您可以使用MultiStreamableMessageSource,在那里您可以将任意数量的StreamableMessageSource连接在一起。

请注意,Axon的EmbeddedEventStore本质上是StreamableMessageSource的实现。一旦MultiStreamableMessageSource,您就必须将其指定为messageSource的{​​{1}}。

最后,请注意,当您使用TrackingEventProcessor时,此解决方案只能使用 ,因为这是Axon提供的唯一将TrackingEventProcessor接收到的事件处理器事件的来源。