java - ElasticSearch Java API 客户端 - 发送已序列化的数据并避免序列化

标签 java elasticsearch elasticsearch-client

我有一个带有 JSON 数据的 Kafka 主题。现在我尝试使用新的“Java API Client”( https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/7.17/index.html )将这些 JSON 字符串发送到 ES 主题,但我遇到了解析器异常:

co.elastic.clients.elasticsearch._types.ElasticsearchException: [es/index] failed: [mapper_parsing_exception] failed to parse
at co.elastic.clients.transport.rest_client.RestClientTransport.getHighLevelResponse(RestClientTransport.java:281)
at co.elastic.clients.transport.rest_client.RestClientTransport.performRequest(RestClientTransport.java:147)
at co.elastic.clients.elasticsearch.ElasticsearchClient.index(ElasticsearchClient.java:953)

此异常发生在以下代码的最后一行:

final IndexRequest<String> request =
          new IndexRequest.Builder<String>()
              .index("myIndex")
              .id(String.valueOf(UUID.randomUUID()))
              .document(consumerRecord.value()) //already serialized json data
              .build();
elasticsearchClient.index(request);

据我了解,发生此异常是因为 ES 客户端尝试序列化我提供的数据,而该数据已经序列化,从而导致格式错误的 JSON 字符串。

有没有办法解决这个问题,只发送简单的 JSON 字符串?我也相信早期的“低级 Java 库”是可能的,对吗?是的,我知道有一些方法可以让 Kafka 和 ES 之间进行通信,而无需编写 Consumer。

感谢您的任何提示。

最佳答案

如果您使用 JacksonJsonpMapper创建 ElasticsearchTransport 时,您可以使用自定义 PreserializedJson发送已经序列化的 JSON 的类。

ElasticsearchTransport transport = new RestClientTransport(
    createLowLevelRestClient(), // supply your own!
    new JacksonJsonpMapper()
);

ElasticsearchClient client = new ElasticsearchClient(transport);

IndexResponse response = client.index(indexReq -> indexReq
    .index("my-index")
    .id("docId")
    .document(new PreserializedJson("{\"foo\":\"bar\"}"))
);
System.out.println(response);

这里是 PreserializedJson 的来源:

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import static java.util.Objects.requireNonNull;

@JsonSerialize(using = PreserializedJson.Serializer.class)
public class PreserializedJson {
  private final String value;

  public PreserializedJson(String value) {
    this.value = requireNonNull(value);
  }

  public PreserializedJson(byte[] value) {
    this(new String(value, StandardCharsets.UTF_8));
  }

  public static class Serializer extends StdSerializer<PreserializedJson> {
    public Serializer() {
      super(PreserializedJson.class);
    }

    @Override
    public void serialize(PreserializedJson value, JsonGenerator gen, SerializerProvider provider) throws IOException {
      gen.writeRaw(value.value);
    }
  }
}

关于java - ElasticSearch Java API 客户端 - 发送已序列化的数据并避免序列化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71352688/

相关文章:

java.sql.SQLException : No operations allowed after connection closed 异常

java - Gradle SubProject 任务没有被拾取

java - 如何在 Selenium 中正确实现 PageFactory 注释?

java - while 循环在不应该重复的时候不断重复

elasticsearch - ElasticSearch查询速度慢,并且第一次查询总是花费太多时间

elasticsearch - 我认为elasticsearch嵌套查询

elasticsearch - Elasticsearch:在大索引中添加新字段