问题描述
我们有一个用例,其中每个聚合根都应具有不同的事件存储。我们使用了以下配置,目前,我们仅配置了一个如下所示的事件存储
@Configuration
@EnablediscoveryClient
public class AxonConfig {
private static final String DOMAIN_EVENTS_COLLECTION_NAME = "coll-capture.domainEvents";
//private static final String DOMAIN_EVENTS_COLLECTION_NAME_TEST =
//"coll-capture.domainEvents-test";
@Value("${mongodb.database}")
private String databaseName;
@Value("${spring.application.name}")
private String appName;
@Bean
public RestTemplate restTemplate() {
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new
HttpComponentsClientHttpRequestFactory(httpClient);
return new RestTemplate(clientHttpRequestFactory);
}
@Bean
@Profile({"uat","prod"})
public CommandRouter springCloudHttpBackupCommandRouter(discoveryClient discoveryClient,Registration localInstance,RestTemplate restTemplate,@Value("${axon.distributed.spring-
cloud.fallback-url}") String messageRoutinginformationEndpoint) {
return new SpringCloudHttpBackupCommandRouter(discoveryClient,localInstance,new AnnotationRoutingStrategy(),serviceInstance -> appName.equalsIgnoreCase(serviceInstance.getServiceId()),restTemplate,messageRoutinginformationEndpoint);
}
@Bean
public Repository<TestEnquiry> testEnquiryRepository(EventStore eventStore) {
return new EventSourcingRepository<>(TestEnquiry.class,eventStore);
}
@Bean
public Repository<Test2Enquiry> test2enquiryRepository(EventStore eventStore) {
return new EventSourcingRepository<>(Test2Enquiry.class,eventStore);
}
@Bean
public EventStorageEngine eventStorageEngine(MongoClient client) {
MongoTemplate mongoTemplate = new DefaultMongoTemplate(client,databaseName)
.withDomainEventsCollection(DOMAIN_EVENTS_COLLECTION_NAME);
return new MongoEventStorageEngine(mongoTemplate);
}
}
现在,我们还要在EventStorageEngine中配置“ DOMAIN_EVENTS_COLLECTION_NAME_TEST”(仅作为示例)。我们如何获得对多个事件存储的相同支持,以及如何选择它们应属于哪个集合的跟踪过程
解决方法
如果您要分离事件流,那么从事件处理的角度将它们组合起来确实是必要的。尤其是当有多个bounded contexts时,将事件流分成不同的存储解决方案是合理的。
如果要定义TrackingEventProcessor
使用哪个[消息源/事件存储],则必须处理EventProcessingConfigurer
。更具体地说,您应该调用EventProcessingConfigurer#registerTrackingEventProcessor(String,Function<Configuration,StreamableMessageSource<TrackedEventMessage<?>>>)
方法。第一个String
参数是要配置为“跟踪”的处理器的名称。第二个参数定义一个Function
,它为您提供此TrackingEventProcessor
(TEP)使用的消息源。在此处应提供您希望此TEP提取事件的事件存储。
当然也可以在以后对它们进行配对,这也得到Axon Framework的支持。归结为StreamableMessageSource
实现的特定形式。
更具体地说,您可以使用MultiStreamableMessageSource
,在那里您可以将任意数量的StreamableMessageSource
连接在一起。
请注意,Axon的EmbeddedEventStore
本质上是StreamableMessageSource
的实现。一旦MultiStreamableMessageSource
,您就必须将其指定为messageSource
的{{1}}。
最后,请注意,当您使用TrackingEventProcessor
时,此解决方案只能使用 ,因为这是Axon提供的唯一将TrackingEventProcessor
接收到的事件处理器事件的来源。