java - Scala、Elasticsearch 新的 RestHighLevelClient 和 BulkProcessor

标签 java scala elasticsearch-6

在我的 Scala 项目中,我试图用新的 RestHighLevelClient 更改旧的 transportClient 以连接到 Elasticsearch (6.1)。

但我在尝试创建 BulkProcessor 时遇到问题,我不知道如何将此示例从 Java 转换为 Scala

`BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener);`

我的代码是这样的:

val client=new RestHighLevelClient(
  RestClient.builder(
    new HttpHost("localhost", 9200, "http") ) )

val builder: BulkProcessor.Builder =BulkProcessor.builder(client, listener)

但是我得到这个错误:

 overloaded method value builder with alternatives:
[error]   (x$1: java.util.function.BiConsumer[org.elasticsearch.action.bulk.BulkRequest,org.elasticsearch.action.ActionListener[org.elasticsearch.action.bulk.BulkResponse]],x$2: org.elasticsearch.action.bulk.BulkProcessor.Listener)org.elasticsearch.action.bulk.BulkProcessor.Builder <and>
[error]   (x$1: org.elasticsearch.client.Client,x$2: org.elasticsearch.action.bulk.BulkProcessor.Listener)org.elasticsearch.action.bulk.BulkProcessor.Builder
[error]  cannot be applied to (org.elasticsearch.client.RestHighLevelClient, org.elasticsearch.action.bulk.BulkProcessor.Listener)
[error]   val builder: BulkProcessor.Builder =BulkProcessor.builder(client, listener)

如果我尝试将“client::bulkAsync”转换为“client.bulkAsync _”或“client::bulkAsync(_._._ )”获取其他错误:

type mismatch;
[error] found   : org.apache.http.Header*
[error] required: org.apache.http.Header
[error] var bulkProcessor =BulkProcessor.builder(client.bulkAsync(_,_,_),listener)

这是 Java 中的 bulkAsync 方法:

 public final void bulkAsync(BulkRequest bulkRequest, ActionListener<BulkResponse> listener, Header... headers) {

有人在 Scala 中使用过 RestHighLevelClient 和 BulkProcessor 吗?或者知道如何解决这个错误?

最佳答案

Scala 2.11 与 Java 8 函数接口(interface)没有良好的互操作性。 创建 BiConsumerId(接受两个输入参数并且不返回任何结果)应该可以解决问题:

val bulkAsyncAsJava :  BiConsumer[BulkRequest, ActionListener[BulkResponse]] = new BiConsumer[BulkRequest, ActionListener[BulkResponse]] {
  override def accept(bulkRequest : BulkRequest , actionListener:ActionListener[BulkResponse]): Unit = {
    restHighLevelClient.bulkAsync(bulkRequest, actionListener)
  }
}
val bulkProcessor = BulkProcessor.builder(bulkAsyncAsJava, new BulkProcessor.Listener() {
override def beforeBulk(executionId: Long, request: BulkRequest): Unit = {
}

override

def afterBulk(executionId: Long, request: BulkRequest, response: BulkResponse): Unit = {
}

override

def afterBulk(executionId: Long, request: BulkRequest, failure: Throwable): Unit = {
}}).build

关于java - Scala、Elasticsearch 新的 RestHighLevelClient 和 BulkProcessor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48588439/

相关文章:

java - IO 性能 : Selector (NIO) vs AsynchronousChannel(NIO. 2)

scala - Spark dataframe写方法写很多小文件

elasticsearch - ElasticSearch将1.x升级到6.x

elasticsearch - 如何使用 RestHighLevelClient 从 Elasticsearch 中的索引中删除所有文档

java - Orika - 仅当第三个字段与字符串匹配时才映射两个字段

java - 组合设计模式如何限制组件创建无限递归

java - 使用方面修改方法参数

scala - 读者作者状态monad-如何运行此scala代码

scala - 将多个类型变量设置为 null

elasticsearch - 按嵌套字段对文档进行排序