在Reactor中实现while循环以获取最新的Elasticsearch索引

问题描述

我在反应式弹性搜索中的索引名称如下:

logs-2020.08.18
logs-2020.08.17
logs-2020.08.16

它将每天创建一次。

我想获取最新的索引名称,并使用reactElasticsearchClient或spring数据获取日志。 有可能吗?

我在spring webflux应用程序中尝试了以下方式:

我有下面的代码段来查找索引的可用性:

public Flux<Log> getLogFromLatestIndex(String serialId) {
    Calendar cal = Calendar.getInstance();
    String currentIndex = StringUtils.EMPTY;
    boolean indexExists = false;
    while (!indexExists) {
        currentIndex = String.format("logs-%s”,format(cal.getTime(),"yyyy.MM.dd"));
        indexExists = isIndexExists(currentIndex).block();
        cal.add(Calendar.DATE,-1); // Decrease day 1 until you find index
    }

    SearchQuery searchQuery = new NativeSearchQueryBuilder()
            .withQuery(matchQuery("serialId",serialId))
            .withindices(currentIndex)
            .build();

    return reactiveElasticsearchTemplate.find(searchQuery,Log.class);
}

public Mono<Boolean> isIndexExists(String indexName) {
    return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName));
}

如何在不使用块的情况下获取布尔值

indexExists = isIndexExists(currentIndex).block();

很明显,我会得到以下错误

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking,which is not supported in thread reactor-http-nio-2

解决方法

您可以使用Flux.generate(take/skip)(Until/While)在反应堆中进行while循环。

注意:

  • Calendar替换为LocalDate,因为这是不变的,并且更适合功能/反应式编程。
  • isIndexExists方法返回一个Tuple以引用索引名称,但显然可以根据需要将其替换为更具描述性的类
public Flux<Log> getLog(String serialId) {
    return Flux.generate(LocalDate::now,this::generateNextDate)
               .map(day -> String.format("logs-%s",day.format(DateTimeFormatter.ofPattern("yyyy.MM.dd"))))
               .concatMap(this::isIndexExists)
               .skipUntil(Tuple2::getT2) // check index exists boolean and drop non-existing ones
               .next() // takes first existing
               .flatMapMany(tuple -> findLogs(tuple.getT1(),serialId));
}

private LocalDate generateNextDate(LocalDate currentDay,SynchronousSink<LocalDate> sink) {
    sink.next(currentDay);
    return currentDay.minusDays(1);
}

private Mono<Tuple2<String,Boolean>> isIndexExists(String indexName) {
    return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName))
            .map(exists -> Tuples.of(indexName,exists));
}

private Flux<Log> findLogs(String index,String serialId) {
    // your other ES query here
}
,

application.yml

spring.data.elasticsearch.client.reactive.endpoints: ip1:9200,ip2:9200,ip3:9200
spring.data.elasticsearch.rest.uris: ip1:9200,ip3:9200

Configuration.java

@Configuration
public class ElasticSearchConfig extends AbstractReactiveElasticsearchConfiguration {

    @Value("${spring.data.elasticsearch.client.reactive.endpoints}")
    private String elasticSearchEndpoint;

    @Override
    public ReactiveElasticsearchClient reactiveElasticsearchClient() {
        final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
            .connectedTo(elasticSearchEndpoint.split(","))
            .withWebClientConfigurer(webClient -> {
                ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
                        .codecs(configurer -> configurer.defaultCodecs()
                                .maxInMemorySize(-1))
                        .build();
                return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
            })
            .build();
    return ReactiveRestClients.create(clientConfiguration);

    }
}

Controller.java

@GetMapping("/getLog/{serialId}")
public Flux<Log> getLog(@PathVariable String serialId) {
    return loggerService.getLog(serialId);
}

其余所有代码。我只是在地图内打印索引名称。尽管我在弹性搜索中使用的索引为logs-2020.08.21,但仍继续打印诸如logs-2020.08.20,logs-2020.08.19,logs-2020.08.18等索引,最终会引发错误。

注意:当我在application.yml中尝试使用单个ip时,出现了相同的错误。

public Flux<Log> getLog(String serialId) {
    return Flux.generate(LocalDate::now,this::generateNextDate)
           .map(day -> {
                System.out.println(String.format("logs-%s",day.format(DateTimeFormatter.ofPattern("yyyy.MM.dd"))));
                return String.format("logs-%s",day.format(DateTimeFormatter.ofPattern("yyyy.MM.dd")));
            })               
           .flatMap(this::isIndexExists)
           .skipUntil(Tuple2::getT2) // check index exists boolean and drop non-existing ones
           .next() // takes first existing
           .flatMapMany(tuple -> findLogs(tuple.getT1(),Boolean>> isIndexExists(String indexName) {
    return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName))
        .map(exists -> Tuples.of(indexName,String serialId) {
    // your other ES query here
}