java - Apache Flink 与 Elasticsearch 集成

标签 java elasticsearch apache-flink

我正在尝试将 Flink 与 Elasticsearch 2.1.1 集成,我正在使用 maven 依赖项

     <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch2_2.10</artifactId>
        <version>1.1-SNAPSHOT</version>
    </dependency>

这是我从 Kafka 队列中读取事件的 Java 代码(工作正常)但是不知何故事件没有发布到 Elasticsearch 中并且也没有错误,在下面的代码中如果我更改任何与 ElasticSearch 的端口、主机名、集群名称或索引名称相关的设置然后我立即看到错误,但目前它没有显示任何错误,也没有在 ElasticSearch 中创建任何新文档

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // parse user parameters
    ParameterTool parameterTool = ParameterTool.fromArgs(args);

    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

    messageStream.print();

    Map<String, String> config = new HashMap<>();
    config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
    config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "1");

    config.put("cluster.name", "FlinkDemo");

    List<InetSocketAddress> transports = new ArrayList<>();
    transports.add(new InetSocketAddress(InetAddress.getByName("localhost"), 9300));

    messageStream.addSink(new ElasticsearchSink<String>(config, transports, new TestElasticsearchSinkFunction()));

    env.execute();
}
private static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction<String> {
    private static final long serialVersionUID = 1L;

    public IndexRequest createIndexRequest(String element) {
        Map<String, Object> json = new HashMap<>();
        json.put("data", element);

        return Requests.indexRequest()
                .index("flink").id("hash"+element).source(json);
    }

    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element));
    }
}

最佳答案

我确实在本地机器上运行它并进行了调试,但是,我唯一缺少的是正确配置日志记录,因为大多数弹性问题都在“log.warn”语句中进行了描述。问题是 elasticsearch-2.2.1 客户端 API 中“BulkRequestHandler.java”中的异常,它抛出错误 - “org.elasticsearch.action.ActionRequestValidationException:验证失败:1:类型丢失;”因为我创建了索引而不是我觉得很奇怪的类型,因为它应该主要关注索引并默认创建类型。

关于java - Apache Flink 与 Elasticsearch 集成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37514624/

相关文章:

python - "Rowtime attributes must not be in the input rows of a regular join"尽管使用间隔连接,但仅具有事件时间戳

java - Apache 弗林克 : When using count() on DataSet only this job will be executed

Java Arraylist清除: how clear() works

java - byte, short 类型的好处

java - excel导出java如何设置字段名

elasticsearch - 具有多值字段geo_point的arcDistance脚本字段过滤器的行为是什么?

Elasticsearch 脚本 : updating array value

node.js - 保持 Elasticsearch 连接处于事件状态

java - Apache Flink 错误java.lang.ClassNotFoundException : org. apache.flink.table.sources.TableSource?

java - PDF 'Itext User Agent' 缓存大小以及如何清除它