java - 在 Reactor 中实现 while 循环以获取最新的 Elasticsearch 索引

标签 java elasticsearch spring-webflux project-reactor reactive

我在响应式(Reactive) Elasticsearch 中的索引名称如下:

logs-2020.08.18
logs-2020.08.17
logs-2020.08.16
它将每天创建。
我想获取最新的索引名称并使用 reactiveElasticsearchClient 或 spring 数据获取日志。
是否可以?
我在我的 spring webflux 应用程序中尝试了以下方式:
我有以下代码片段来查找索引可用性:
public Flux<Log> getLogFromLatestIndex(String serialId) {
    Calendar cal = Calendar.getInstance();
    String currentIndex = StringUtils.EMPTY;
    boolean indexExists = false;
    while (!indexExists) {
        currentIndex = String.format("logs-%s”, format(cal.getTime(), "yyyy.MM.dd"));
        indexExists = isIndexExists(currentIndex).block();
        cal.add(Calendar.DATE, -1); // Decrease day 1 until you find index
    }

    SearchQuery searchQuery = new NativeSearchQueryBuilder()
            .withQuery(matchQuery("serialId", serialId))
            .withIndices(currentIndex)
            .build();

    return reactiveElasticsearchTemplate.find(searchQuery, Log.class);
}

public Mono<Boolean> isIndexExists(String indexName) {
    return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName));
}
如何在不使用 block 的情况下获取 boolean 值
indexExists = isIndexExists(currentIndex).block();
显然我会得到以下错误:
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2

最佳答案

您可以使用 Flux.generate(take/skip)(Until/While)在 react 器中做一个while循环。
笔记:

  • 替换 CalendarLocalDate因为它是不可变的,并且更适合函数式/响应式(Reactive)编程。
  • isIndexExists方法返回 Tuple有索引名称的引用,但显然它可以根据需要替换为一些更具描述性的类

  • public Flux<Log> getLog(String serialId) {
        return Flux.generate(LocalDate::now, this::generateNextDate)
                   .map(day -> String.format("logs-%s", day.format(DateTimeFormatter.ofPattern("yyyy.MM.dd"))))
                   .concatMap(this::isIndexExists)
                   .skipUntil(Tuple2::getT2) // check index exists boolean and drop non-existing ones
                   .next() // takes first existing
                   .flatMapMany(tuple -> findLogs(tuple.getT1(), serialId));
    }
    
    private LocalDate generateNextDate(LocalDate currentDay, SynchronousSink<LocalDate> sink) {
        sink.next(currentDay);
        return currentDay.minusDays(1);
    }
    
    private Mono<Tuple2<String, Boolean>> isIndexExists(String indexName) {
        return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName))
                .map(exists -> Tuples.of(indexName, exists));
    }
    
    private Flux<Log> findLogs(String index, String serialId) {
        // your other ES query here
    }
    

    关于java - 在 Reactor 中实现 while 循环以获取最新的 Elasticsearch 索引,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63503519/

    相关文章:

    java - Webflux如何从流内部设置cookie

    java - 奇怪的JAVA编译错误?

    mapping - not_analyzed 字段,其 doc_values 仍在 fielddata 缓存中

    spring-boot - 将springboot输出保存到elasticsearch引擎

    java - 在 Spring Web Flux 中捕获异常

    spring-boot - 如何将 GlobalMethodSecurityConfiguration 迁移到 Reactive Spring?

    java - 了解 CheckStyle 检查 "JavadocMethod",属性 "logLoadErrors"

    java - 将 KB 动态转换为 MB、GB、TB

    java - 观察者/可观察模式

    elasticsearch - 理解elasticsearch查询解析异常,拯救海牛