java - 响应式 Spring Boot API 封装 Elasticsearch 的异步批量索引

标签 java spring-boot elasticsearch spring-webflux project-reactor

我正在为一个新项目开发原型(prototype)。这个想法是提供一个响应式 Spring Boot 微服务来批量索引 Elasticsearch 中的文档。 Elasticsearch 提供了一个高级 Rest 客户端,它提供了一个异步方法来批量处理索引请求。提及异步使用监听器传递回调 here 。回调批量接收索引响应(每个请求)。我正在尝试将此响应作为 Flux 发送回客户端。我想出了一些基于 this blog post 的东西。

Controller

@RestController
public class AppController {

    @SuppressWarnings("unchecked")
    @RequestMapping(value = "/test3", method = RequestMethod.GET)
    public Flux<String> index3() {
        ElasticAdapter es = new ElasticAdapter();
        JSONObject json = new JSONObject();
        json.put("TestDoc", "Stack123");
        Flux<String>  fluxResponse = es.bulkIndex(json);
        return fluxResponse;
    }

弹性适配器

@Component
class ElasticAdapter {
String indexName = "test2"; 
    private final RestHighLevelClient client;
    private final ObjectMapper mapper;
    private int processed = 1;

    Flux<String> bulkIndex(JSONObject doc) {
        return bulkIndexDoc(doc)
                .doOnError(e -> System.out.print("Unable to index {}" + doc+ e));
    }

    private Flux<String> bulkIndexDoc(JSONObject doc) {
        return Flux.create(sink -> {
            try {
                doBulkIndex(doc, bulkListenerToSink(sink));
            } catch (JsonProcessingException e) {
                sink.error(e);
            }
        });
    }


    private void doBulkIndex(JSONObject doc, BulkProcessor.Listener listener) throws JsonProcessingException {

        System.out.println("Going to submit index request");
        BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
                (request, bulkListener) ->
                    client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
                    BulkProcessor.Builder builder =
                            BulkProcessor.builder(bulkConsumer, listener);
        builder.setBulkActions(10); 
        BulkProcessor bulkProcessor = builder.build();
        // Submitting 5,000 index requests ( repeating same JSON)
        for (int i = 0; i < 5000; i++) {
            IndexRequest indexRequest = new IndexRequest(indexName, "person", i+1+"");
             String json = doc.toJSONString();
            indexRequest.source(json, XContentType.JSON);
            bulkProcessor.add(indexRequest);
        }
        System.out.println("Submitted all docs
    }


    private BulkProcessor.Listener bulkListenerToSink(FluxSink<String> sink) {
        return new BulkProcessor.Listener() {

            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
            }

            @SuppressWarnings("unchecked")
            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {

                for (BulkItemResponse bulkItemResponse : response) {
                    JSONObject json = new JSONObject();
                    json.put("id", bulkItemResponse.getResponse().getId());
                    json.put("status", bulkItemResponse.getResponse().getResult

                    sink.next(json.toJSONString()); 
                    processed++;
                }
                if(processed >= 5000) {
                    sink.complete();
                }
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                failure.printStackTrace();
                sink.error(failure);
            }
        };
    }


    public ElasticAdapter() {
    // Logic to initialize  Elasticsearch Rest Client 
    }
}

我使用 FluxSink 创建响应通量以发送回客户端。目前,我不知道这是否正确。

我的期望是调用客户端应该以 10 个为批处理接收响应(因为批量处理器以 10 个批处理处理它 - builder.setBulkActions(10); )。我尝试使用 Spring Webflix 客户端使用端点。但无法解决。这是我尝试过的

网络客户端

public class FluxClient {

    public static void main(String[] args) {
        WebClient client = WebClient.create("http://localhost:8080");
        Flux<String> responseFlux = client.get()
                  .uri("/test3")
                  .retrieve()
                  .bodyToFlux(String.class);
        responseFlux.subscribe(System.out::println);
    }
}

控制台上没有像我预期的那样打印任何内容。我尝试使用 System.out.println(responseFlux.blockFirst()); 。它在最后将所有响应作为单个批处理打印,而不是在 处批量打印。

如果我的方法是正确的,那么正确的食用方式是什么?对于我心目中的解决方案,该客户端将驻留在另一个 Web 应用程序中。

注释:我对 Reactor API 的理解是有限的。使用的elasticsearch版本是6.8。

最佳答案

因此对您的代码进行了以下更改。

在 ElasticAdapter 中,

public Flux<Object> bulkIndex(JSONObject doc) {
    return bulkIndexDoc(doc)
            .subscribeOn(Schedulers.elastic(), true)
            .doOnError(e -> System.out.print("Unable to index {}" + doc+ e));
}

在 Flux 上调用 subscribeOn(Scheduler, requestOnSeparateThread),从 ​​https://github.com/spring-projects/spring-framework/issues/21507 了解它

在 FluxClient 中,

Flux<String> responseFlux = client.get()
              .uri("/test3")
              .headers(httpHeaders -> {
                  httpHeaders.set("Accept", "text/event-stream");
              })
              .retrieve()
              .bodyToFlux(String.class);
responseFlux.delayElements(Duration.ofSeconds(1)).subscribe(System.out::println);

添加了“Accept” header 作为“text/event-stream”并延迟了 Flux 元素。

通过上述更改,能够从服务器实时获取响应。

关于java - 响应式 Spring Boot API 封装 Elasticsearch 的异步批量索引,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60981954/

相关文章:

java - 测试中列表中的替换项违反了唯一约束

spring-boot - 如何使用 Spring Boot 应用程序在嵌入式 jetty 中配置异步超时

elasticsearch - Elasticsearch特定领域的完全匹配

java - JTextArea-Dialog 作为 JTable-CellEditor 错过了第一个键入的字符

java - 在一个项目中只编译一个类

ssl - 如何为 Heroku 上的 Spring Boot 应用程序强制使用 HTTPS?

elasticsearch - 索引 :admin/exists 的错误 Search Guard 未初始化 (SG11)

search - ElasticSearch (with NEST) 多字段搜索

java - 保存新关系时关系变得疏远

java - 在 Java 列表中存储不同类型的元素