elasticsearch - 在Logstash中过滤jdbc数据

标签 elasticsearch jdbc filter logstash

在我的数据库中,我的数据格式如下:

enter image description here

但是在ElasticSearch中,我想针对项目类型推送数据。因此,ElasticSearch中的每条记录将列出所有项目名称及其每种类型的值。

像这样:

{
  "_index": "daily_needs",
  "_type": "id",
  "_id": "10",
  "_source": {
    "item_type: "10",
    "fruits": "20",
    "veggies": "32",
    "butter": "11",
  }
}

{
  "_index": "daily_needs",
  "_type": "id",
  "_id": "11",
  "_source": {
    "item_type: "11",
    "hair gel": "50",
    "shampoo": "35",
  }
}

{
  "_index": "daily_needs",
  "_type": "id",
  "_id": "12",
  "_source": {
    "item_type: "12",
    "tape": "9",
    "10mm screw": "7",
    "blinker fluid": "78",
  }
}

我可以在Logstash中实现吗?

我是Logstash的新手,但据我了解,可以在filter中完成。但是我不确定要使用哪个过滤器,还是必须为此创建自定义过滤器。

当前的conf示例:
input {
  jdbc {
    jdbc_driver_library => "ojdbc6.jar"
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
    jdbc_connection_string => "myjdbc-configs"
    jdbc_user => "dbuser"
    jdbc_password => "dbpasswd"
    schedule => "* * * * *"
    statement => "SELECT * from item_table"
  }
}
filter {
    ## WHAT TO WRITE HERE??
}
output {
    elasticsearch {
        hosts => [ "http://myeshost/" ]
        index => "myindex"
    }
}

请提示。谢谢。

最佳答案

您可以使用aggregate filter plugin来实现。我没有在下面进行测试,但是应该给您一个想法。

 filter {     
      aggregate {
        task_id => "%{item_type}" #
        code => "
          map['Item_type'] = event.get('Item_type')
          map[event.get('Item_Name')] = map[event.get('Item_Value')]
        "
        push_previous_map_as_event => true
        timeout => 3600
        timeout_tags => ['_aggregatetimeout']
      }
      if "aggregated" not in [tags] {
        drop {}
      }
    }

使用聚合过滤器的重要警告:
  • sql查询必须按Item_Type对结果进行排序,因此事件不会乱序。
  • sql查询中的列名称应与过滤器中的列名称map[]
  • 您应该只使用一个工作线程进行聚合,否则事件可能会被按顺序处理,并且会发生意外的结果。
  • 关于elasticsearch - 在Logstash中过滤jdbc数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58197694/

    相关文章:

    elasticsearch - 使用映射文件创建索引时出错

    ruby-on-rails - 使用Elasticsearch在Searckick中进行多重建模

    elasticsearch - Elasticsearch 错误 - 期望以双引号开头字段名称

    Java JDBC 执行批处理

    java - 使用 Spring Boot JdbcTemplate 获取 java.lang.NullPointerException

    javascript - 如何过滤 EXTJS 网格列表过滤器?

    json - 带 jq 的日期/数字过滤器

    R - 过滤器坐标

    java - Elastic Search 前缀、后缀、EdgeGram

    java - jdbc是否支持波斯语