elasticsearch - 如何使用Logstash和jdbc_streaming过滤器将数据从HTTP输入发送到ElasticSearch?

标签 elasticsearch logstash kibana elastic-stack

我想使用logstash将数据从 Http Http 发送到elasticsearch,我想使用 jdbc_streaming 过滤器插件丰富我的数据。这是我的logstash配置:

input {
  http {
    id => "sensor_data_http_input"
    user => "sensor_data"
    password => "sensor_data"
  }
}

filter {
  jdbc_streaming {
    jdbc_driver_library => "E:\ElasticStack\mysql-connector-java-8.0.18\mysql-connector-java-8.0.18.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/sensor_metadata"
    jdbc_user => "elastic"
    jdbc_password => "hide"
    statement => "select st.sensor_type as sensorType, l.customer as customer, l.department as department, l.building_name as buildingName, l.room as room, l.floor as floor, l.location_on_floor as locationOnFloor, l.latitude, l.longitude from sensors s inner join sensor_type st on s.sensor_type_id=st.sensor_type_id inner join location l on s.location_id=l.location_id where s.sensor_id= :sensor_identifier"
    parameters => { "sensor_identifier" => "sensor_id"}
    target => lookupResult
  }
  mutate {
    rename => {"[lookupResult][0][sensorType]" => "sensorType"}
    rename => {"[lookupResult][0][customer]" => "customer"}
    rename => {"[lookupResult][0][department]" => "department"}
    rename => {"[lookupResult][0][buildingName]" => "buildingName"}
    rename => {"[lookupResult][0][room]" => "room"}
    rename => {"[lookupResult][0][floor]" => "floor"}
    rename => {"[lookupResult][0][locationOnFloor]" => "locationOnFloor"}

    add_field => {
            "location" => "%{lookupResult[0]latitude},%{lookupResult[0]longitude}"
        }

    remove_field => ["lookupResult", "headers", "host"]
  }
}

output {
  elasticsearch {
    hosts =>["localhost:9200"]
    index => "sensor_data-%{+YYYY.MM.dd}"
    user => "elastic"
    password => "hide"
  }
}

但是,当我启动logstash时,看到以下错误:
[2020-01-09T22:57:16,260]
[ERROR][logstash.javapipeline]
[main] Pipeline aborted due to error {
    :pipeline_id=>"main", 
    :exception=>#<TypeError: failed to coerce jdk.internal.loader.ClassLoaders$AppClassLoader to java.net.URLClassLoader>, 
    :backtrace=>[
        "org/jruby/java/addons/KernelJavaAddons.java:29:in `to_java'", 
        "E:/ElasticStack/Logstash/logstash-7.4.1/vendor/bundle/jruby/2.5.0/gems/logstash-filter-jdbc_streaming-1.0.7/lib/logstash/plugin_mixins/jdbc_streaming.rb:48:in `prepare_jdbc_connection'", 
        "E:/ElasticStack/Logstash/logstash-7.4.1/vendor/bundle/jruby/2.5.0/gems/logstash-filter-jdbc_streaming-1.0.7/lib/logstash/filters/jdbc_streaming.rb:200:in `prepare_connected_jdbc_cache'", 
        "E:/ElasticStack/Logstash/logstash-7.4.1/vendor/bundle/jruby/2.5.0/gems/logstash-filter-jdbc_streaming-1.0.7/lib/logstash/filters/jdbc_streaming.rb:116:in `register'", "org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:56:in `register'", 
        "E:/ElasticStack/Logstash/logstash-7.4.1/logstash-core/lib/logstash/java_pipeline.rb:195:in `block in register_plugins'", "org/jruby/RubyArray.java:1800:in `each'", 
        "E:/ElasticStack/Logstash/logstash-7.4.1/logstash-core/lib/logstash/java_pipeline.rb:194:in `register_plugins'", 
        "E:/ElasticStack/Logstash/logstash-7.4.1/logstash-core/lib/logstash/java_pipeline.rb:468:in `maybe_setup_out_plugins'", 
        "E:/ElasticStack/Logstash/logstash-7.4.1/logstash-core/lib/logstash/java_pipeline.rb:207:in `start_workers'", 
        "E:/ElasticStack/Logstash/logstash-7.4.1/logstash-core/lib/logstash/java_pipeline.rb:149:in `run'", 
        "E:/ElasticStack/Logstash/logstash-7.4.1/logstash-core/lib/logstash/java_pipeline.rb:108:in `block in start'"], 
    :thread=>"#<Thread:0x17fa8113 run>"
}
[2020-01-09T22:57:16,598]
[ERROR][logstash.agent] Failed to execute action {
    :id=>:main, 
    :action_type=>LogStash::ConvergeResult::FailedAction, 
    :message=>"Could not execute action: PipelineAction::Create<main>, action_result: false", 
    :backtrace=>nil
}

我正在使用mysql数据库中的一些数据来丰富我的http输入,但它根本不会启动logstash。

最佳答案

我看到了两个潜在的问题,但是您需要检查以下哪个是真正的问题:

  • MySql Driver class name has changed to com.mysql.cj.jdbc.Driver
  • 当您在类加载器路径之外使用最新的jdbc驱动程序并与更新的jdk版本结合使用时,可能会发生类加载器问题。 github上存在服务器方面的问题。
    将驱动程序放入<logstash-install-dir>/vendor/jar/jdbc/下的logstash文件夹中(您需要首先创建此文件夹)。如果这不起作用,请将驱动程序移至<logstash-install-dir>/logstash-core\lib\jars下,并且不要在配置文件中提供任何驱动程序路径:jdbc_driver_library => ""
  • 关于elasticsearch - 如何使用Logstash和jdbc_streaming过滤器将数据从HTTP输入发送到ElasticSearch?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59698179/

    相关文章:

    search - Elasticsearch 从搜索的响应正文中删除默认字段

    JMeter 日志的 ElasticSearch Kibana 仪表板分析

    logstash - 具有转义 “[“, “(“, and “)” 字符问题的 Grok 正则表达式

    docker - 自动编辑dockerized容器中的文件

    elasticsearch - Logstash-如果%{statement}具有 “value”

    elasticsearch - elasticsearch从1.7.x升级到5.2.x

    elasticsearch - PUT 命令上的 Elasticsearch class_cast_exception

    elasticsearch - Elasticsearch Root映射不支持的参数

    elasticsearch - Logstash收集的编解码器从收集的空消息中获取空消息

    elasticsearch - 如何发送HAProxy日志到td-agent流利的?