python - 在 Amazon Elasticsearch Service 上建立索引 - 批量插入

标签 python amazon-web-services elasticsearch logstash amazon-elasticsearch

我有一个事件的 Amazon Elasticsearch 实例,我能够通过 Chrome 中的“Sense”连接和执行语句。但是当我尝试进行批量插入时,它显示“超时”错误。我一直在尝试通过 Python(批量帮助程序)和 logstash 模块,两种方式都出现相同的错误。

下面是使用的代码

import psycopg2
from elasticsearch import Elasticsearch, helpers
import time

connection = psycopg2.connect(database='dbname', user='username', password='password', host='abc.def.com', port=5432)
es = Elasticsearch('elasticsearchinstance.amazonaws.com', max_retries=3, retry_on_timeout=True, request_timeout='10m')
cursor = connection.cursor()

query = """
select column1,column2,column3 from table
"""
cursor.execute(query)
rows = cursor.fetchall()
dict_list = []
for i in range(len(rows)):
    dict_list.append({'_type':'doc', '_index':'es_index', '_id':rows[i][0], 'column2':rows[i][1], 'column3':rows[i][2]})

print len(dict_list)

es.indices.delete(index='es_index', ignore=[400, 404])

time.sleep(2)

mapping = "{\"settings\" : {\"analysis\" : { \"analyzer\" : { \"my_ngram_analyzer\" : { \"tokenizer\" : \"my_ngram_tokenizer\" }},\"tokenizer\" : {\"my_ngram_tokenizer\" : {\"type\" : \"nGram\" , \"min_gram\" : \"2\" , \"max_gram\" : \"50\" }}}}, \"mappings\": { \"doc\": { \"_id\" : { \"path\" : \"id\" }, \"properties\": { \"column2\": { \"type\": \"string\", \"analyzer\": \"my_ngram_analyzer\" }, \"id\": { \"type\": \"long\" }, \"column3\": { \"type\": \"integer\" }}}}}"
es.indices.create(index='es_index', ignore=400, body=mapping)

helpers.bulk(es, dict_list)

通过Python Bulk helper得到的错误如下

Traceback (most recent call last):
File "D:\Python\refresh_data.py", line 21, in <module>
es.indices.delete(index='es_index', ignore=[400, 404])
File "C:\Python27\lib\site-packages\elasticsearch\client\utils.py", line 69, in _wrapped
return func(*args, params=params, **kwargs)
File "C:\Python27\lib\site-packages\elasticsearch\client\indices.py", line 198, in delete
params=params)
File "C:\Python27\lib\site-packages\elasticsearch\transport.py", line 307, in perform_request
status, headers, data = connection.perform_request(method, url, params, body, ignore=ignore, timeout=timeout)
File "C:\Python27\lib\site-packages\elasticsearch\connection\http_urllib3.py", line 89, in perform_request
raise ConnectionError('N/A', str(e), e)

elasticsearch.exceptions.ConnectionError:
ConnectionError((<urllib3.connection.HTTPConnection object at 0x0000000002C91898>, u'Connection to elasticsearchinstance.amazonaws.com timed out. (connect timeout=10)')) 
caused by:
ConnectTimeoutError((<urllib3.connection.HTTPConnection object at 0x0000000002C91898>, u'Connection to elasticsearchinstance.amazonaws.com timed out. (connect timeout=10)'))

Logstash(用于批量插入)也出现类似的超时错误(如果需要,将编辑和更新 logstash 的错误)。

需要帮助解决 Amazon Elasticsearch Service 的超时问题。

提前致谢。

编辑:

这是我在 Amazon ES 中执行批量插入时遇到的“Logstash”错误

C:\logstash-1.5.4\bin>logstash agent -f feed_load_amazon_es.conf
io/console not supported; tty will not be manipulated
←[31mFailed to install template: connect timed out {:level=>:error}←[0m
Logstash startup completed
←[31mGot error to send bulk of actions: connect timed out {:level=>:error}←[0m
←[33mFailed to flush outgoing items {:outgoing_count=>3, :exception=>"Manticore::ConnectTimeout", 
:backtrace=>["C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/manticore-0.4.4-java/lib/manticore/response.rb:35:in `initialize'", 
"org/jruby/RubyProc.java:271:in `call'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/manticore-0.4.4-java/lib/manticore/response.rb:70:in `call'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/manticore-0.4.4-java/lib/manticore/response.rb:245:in `call_once'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/manticore-0.4.4-java/lib/manticore/response.rb:148:in `code'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.0.12/lib/elasticsearch/transport/transport/http/manticore.rb:71:in `perform_request'", 
"org/jruby/RubyProc.java:271:in `call'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.0.12/lib/elasticsearch/transport/transport/base.rb:190:in `perform_request'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.0.12/lib/elasticsearch/transport/transport/http/manticore.rb:54:in `perform_request'",
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.0.12/lib/elasticsearch/transport/client.rb:119:in `perform_request'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/elasticsearch-api-1.0.12/lib/elasticsearch/api/actions/bulk.rb:80:in `bulk'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-1.0.7-java/lib/logstash/outputs/elasticsearch/protocol.rb:104:in `bulk'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-1.0.7-java/lib/logstash/outputs/elasticsearch.rb:542:in `submit'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-1.0.7-java/lib/logstash/outputs/elasticsearch.rb:566:in `flush'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/stud-0.0.21/lib/stud/buffer.rb:219:in `buffer_flush'", 
"org/jruby/RubyHash.java:1341:in `each'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/stud-0.0.21/lib/stud/buffer.rb:216:in `buffer_flush'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-1.0.7-java/lib/logstash/outputs/elasticsearch.rb:600:in `teardown'", 
"org/jruby/RubyArray.java:1613:in `each'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:248:in `outputworker'", 
"org/jruby/RubyArray.java:1613:in `each'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:247:in `outputworker'", 
"C:/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:166:in `start_outputs'"], :level=>:warn}←[0m

最佳答案

我认为你做错了。

批量请求是批量方法“正文”字段中的 2 行组合。

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }

这是你的 body 字段中应该有的内容。

第一行包含请求类型、批量索引和许多其他您可以设置或不设置的参数(查看文档)。在第一行末尾添加一个\r\n。

第二行必须包含您要插入的内容。

如果你检查你放入 dict_list 的内容,你会忘记索引方法调用。

错误的结构:

dict_list.append({'_type':'doc', '_index':'es_index', '_id':rows[i][0], 'column2':rows[i][1], 'column3':rows[i][2]})

正确的结构:

{ "index" : {'_type':'doc', '_index':'es_index', '_id':rows[i][0]} }

然后在第二行添加您的文档。

关于python - 在 Amazon Elasticsearch Service 上建立索引 - 批量插入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34131007/

相关文章:

python - 使用多个列表作为函数的输入参数 (Python)

amazon-s3 - 为什么在从现有快照创建的Amazon EBS卷中看不到新内容?

wcf - 使用filebeat传送元素之间没有换行符的XML数据

linux - 当 redis 和 elasticsearch 等服务被多个其他服务使用时,您是否应该为它们创建单独的 docker 容器?

python - 如何模拟从 getattr 返回的函数?

python - 直接从协程调用Python的协程

python - 单比。 Python 中的多个 For 循环?

ruby-on-rails - 监控和导航 S3 存储桶以查找用户添加的新文件

amazon-web-services - 如何管理 AWS Kinesis Firehose 用于将数据写入 Redshift 集群的密码

elasticsearch - 在 Elasticsearch 中的多个字段中使用多个参数进行搜索