csv - 无法将数据加载到logstash

标签 csv elasticsearch logstash

我正在尝试遵循以下链接上的指南:

http://www.viaboxx.de/code/easily-generate-live-heatmaps-for-geolocations-with-elk/#codesyntax_1

第一次对我来说效果很好,但是当我现在尝试时,在尝试加载csv数据的步骤中出现了以下错误。我执行的命令是:

cat test.csv | /opt/logstash/bin/logstash -f geostore.conf


我收到以下错误:

Settings: Default pipeline workers: 2
Pipeline main started
Error parsing csv {:field=>"message", :source=>"", :exception=>#<NoMethodError: undefined method `each_index' for nil:NilClass>, :level=>:warn}
Pipeline main has been shutdown
stopping pipeline {:id=>"main"}


你能帮忙!!!我花了数天试图找出答案。

编辑,添加geostore.conf:

 input { stdin {} } 
    filter { # Step 1, drop the csv header line 
       if [message] =~ /^#/ { 
          drop {} 
        } # Step 2, split latitude and longitude 
        csv { 
          separator => ',' 
          columns => [ 'lat', 'lon' ] } 
           # Step 3 # move lat and lon into location object # for defined geo_point type in ES 
        mutate { 
             rename => [ "lat", "[location][lat]", "lon", "[location][lon]" ] 
         } 
       } 
    output { 
       elasticsearch {
         hosts => 'localhost' 
         index => 'geostore' 
         document_type => "locality" 
         flush_size => 1000 
      } 
    }


我从这里更改了输出部分:

output {
  elasticsearch {
    hosts => 'localhost'
    index => 'geostore'
    document_type  => "locality"
    flush_size => 1000
  }


对此

output {
  elasticsearch {
    hosts => 'localhost'
    index => 'geostore'
    document_type  => "locality"
    flush_size => 1000
    stdout {}
  }


现在我得到了更多详细的错误消息:

    fetched an invalid config {:config=>"input {\n    stdin {}\n}\nfilter {\n  #
 Step 1, drop the csv header line\n  if [message] =~ /^#/ {\n    drop {}\n  }\n 
\n  # Step 2, split latitude and longitude\n  csv {\n    separator => ','\n    
columns => [ 'lat', 'lon' ]\n  }\n \n  # Step 3\n  # move lat and lon into 
location object \n  # for defined geo_point type in ES\n  mutate {  \n    rename 
=> [ \"lat\", \"[location][lat]\", \"lon\", \"[location][lon]\" ]\n  
}\n}\noutput {\n  elasticsearch {\n    hosts => 'localhost'\n    index => 
'geostore'\n    document_type  => \"locality\"\n    flush_size => 1000\n    
stdout {}\n  }\n}\n\n", :reason=>"Expected one of #, => at line 29, column 12 
(byte 543) after output {\n  elasticsearch {\n    hosts => 'localhost'\n    
index => 'geostore'\n    document_type  => \"locality\"\n    flush_size => 
1000\n    stdout ", :level=>:error}


无法理解为什么它第一次起作用。

Settings: Default pipeline workers: 2
Pipeline main started
Error parsing csv {:field=>"message", :source=>"", :exception=>#<NoMethodError: undefined method `each_index' for nil:NilClass>, :level=>:warn}
2017-03-30T13:46:31.171Z localhost.localdomain 53.97917361, -6.389038611
2017-03-30T13:46:31.171Z localhost.localdomain 54.00310028, -6.397707778
2017-03-30T13:46:31.172Z localhost.localdomain 53.99960056, -6.381966111
2017-03-30T13:46:31.172Z localhost.localdomain 54.00534917, -6.423718889
2017-03-30T13:46:31.172Z localhost.localdomain 51.92071667, -8.475726111
2017-03-30T13:46:31.172Z localhost.localdomain 51.82731222, -8.381912222
2017-03-30T13:46:31.173Z localhost.localdomain 51.81096639, -8.415731667
2017-03-30T13:46:31.173Z localhost.localdomain 54.28450222, -8.463775556
2017-03-30T13:46:31.173Z localhost.localdomain 54.27841, -8.495700278
2017-03-30T13:46:31.173Z localhost.localdomain 54.2681225, -8.462056944
2017-03-30T13:46:31.174Z localhost.localdomain 52.276167, -9.680497
2017-03-30T13:46:31.174Z localhost.localdomain 52.25660139, -9.703921389
2017-03-30T13:46:31.174Z localhost.localdomain 52.27031306, -9.723975556
2017-03-30T13:46:31.174Z localhost.localdomain 54.95663111, -7.714384167
2017-03-30T13:46:31.175Z localhost.localdomain 54.00133111, -7.352790833
2017-03-30T13:46:31.175Z localhost.localdomain 52.34264222, -6.4854175
2017-03-30T13:46:31.176Z localhost.localdomain 52.32439028, -6.464626111
2017-03-30T13:46:31.176Z localhost.localdomain 52.33008944, -6.487005
2017-03-30T13:46:31.176Z localhost.localdomain 53.70765861, -6.374657778
2017-03-30T13:46:31.177Z localhost.localdomain 53.72636306, -6.326768611
2017-03-30T13:46:31.177Z localhost.localdomain 53.71461361, -6.336066111
2017-03-30T13:46:31.177Z localhost.localdomain 51.55948417, -9.244535833
2017-03-30T13:46:31.177Z localhost.localdomain 53.52894667, -7.358543056
2017-03-30T13:46:31.177Z localhost.localdomain 53.51801167, -7.324215
2017-03-30T13:46:31.179Z localhost.localdomain 53.16202278, -6.795522222
2017-03-30T13:46:31.179Z localhost.localdomain 53.182702, -6.819299
2017-03-30T13:46:31.179Z localhost.localdomain 52.83053972, -8.991989444
2017-03-30T13:46:31.180Z localhost.localdomain 52.85651944, -8.965725833
2017-03-30T13:46:31.180Z localhost.localdomain 53.02885028, -7.300381667
2017-03-30T13:46:31.180Z localhost.localdomain
Pipeline main has been shutdown
stopping pipeline {:id=>"main"}

最佳答案

Hopefull,这也会对其他人有所帮助。

我从命令行删除了模式:

curl -XDELETE 'localhost:9200/geostore?pretty'; 


然后前往kibana将其从那里删除。如下重新加载了该模式,它可以正常工作。

    curl -XPUT 'http://localhost:9200/geostore'



curl -XPUT 'http://localhost:9200/geostore/_mapping/locality' -d '
{
   "locality" : {
       "properties" : {
              "location" : {
                  "type" : "geo_point",
                  "geohash_prefix":     true,
                  "geohash_precision":  "1km"
              }
          }
       }
}'



cat test.csv | /opt/logstash/bin/logstash -f geostore.conf


启动logstash,解析输入并将结果存储到Elasticsearch中将花费几秒钟。

现在我们已经在Elasticsearch中拥有了数据,让我们转到Kibana4。登录Kibana后,您需要将索引添加到Kibana。

转到:设置->索引->新增->在索引名称字段中输入“geostore”。

添加索引后,您将在索引文档中看到所有字段,尤其是应检查属性位置是否分类为geo_point。

整个过程在下面的链接中进行了详细描述。

http://www.viaboxx.de/code/easily-generate-live-heatmaps-for-geolocations-with-elk/#codesyntax_1

关于csv - 无法将数据加载到logstash,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43115403/

相关文章:

java - 无法为从logstash JDBC 输入插件创建的索引创建elasticsearch 的映射

elasticsearch - 是否有任何适用于 ELK 堆栈的开源警报和报告插件可用?

bash - awk:删除重复项并创建一个新的 csv 文件

python - 在python中从n个csv文件创建excel文件

java - 在elasticsearch中实现优先级搜索

amazon-web-services - 如何控制AWS Elasticsearch Service和Kibana的访问权限?

elasticsearch - 如何拥有独立的Elasticsearch本地节点?

python - 使用python实现异构csv的数据结构

sql - 如何使用存储过程从行中选择数据并将其显示为列?

elasticsearch - 如何添加多个字段以对ElasticSearch Java API中的热门匹配进行排序?