我正在尝试使用 Python API 将大量文档批量插入到 Elasticsearch 中。
import elasticsearch
from pymongo import MongoClient
es = elasticsearch.Elasticsearch()
def index_collection(db, collection, fields, host='localhost', port=27017):
conn = MongoClient(host, port)
coll = conn[db][collection]
cursor = coll.find({}, fields=fields, timeout=False)
print "Starting Bulk index of {} documents".format(cursor.count())
def action_gen():
"""
Generator to use for bulk inserts
"""
for n, doc in enumerate(cursor):
op_dict = {
'_index': db.lower(),
'_type': collection,
'_id': int('0x' + str(doc['_id']), 16),
}
doc.pop('_id')
op_dict['_source'] = doc
yield op_dict
res = bulk(es, action_gen(), stats_only=True)
print res
这些文档来自 Mongodb 集合,我使用上面的函数根据文档中解释的方式进行批量索引。
批量索引继续用数千个空文档填充 Elasticsearch 。谁能告诉我我做错了什么?
最佳答案
我从未见过以这种方式组合在一起的批量数据,尤其是您使用“_source”
所做的事情。可能有一种方法可以让它发挥作用,我不知道,但当我尝试它时,我得到了奇怪的结果。
如果你看bulk api ,ES 期待一个元数据文档,然后是要索引的文档。因此,每个文档的批量数据列表中都需要两个条目。所以也许是这样的:
import elasticsearch
from pymongo import MongoClient
es = elasticsearch.Elasticsearch()
def index_collection(db, collection, fields, host='localhost', port=27017):
conn = MongoClient(host, port)
coll = conn[db][collection]
cursor = coll.find({}, fields=fields, timeout=False)
print "Starting Bulk index of {} documents".format(cursor.count())
bulk_data = []
for n, doc in enumerate(cursor):
bulk_data.append({
'_index': db.lower(),
'_type': collection,
'_id': int('0x' + str(doc['_id']), 16),
})
bulk_data.append(doc)
es.bulk(index=index_name,body=bulk_data,refresh=True)
不过,我并没有尝试运行该代码。这是我知道有效的脚本,如果有帮助,您可以使用它:
from elasticsearch import Elasticsearch
es_client = Elasticsearch(hosts = [{ "host" : "localhost", "port" : 9200 }])
index_name = "test_index"
if es_client.indices.exists(index_name):
print("deleting '%s' index..." % (index_name))
print(es_client.indices.delete(index = index_name, ignore=[400, 404]))
print("creating '%s' index..." % (index_name))
print(es_client.indices.create(index = index_name))
bulk_data = []
for i in range(4):
bulk_data.append({
"index": {
"_index": index_name,
"_type": 'doc',
"_id": i
}
})
bulk_data.append({ "idx": i })
print("bulk indexing...")
res = es_client.bulk(index=index_name,body=bulk_data,refresh=True)
print(res)
print("results:")
for doc in es_client.search(index=index_name)['hits']['hits']:
print(doc)
关于python - 如何使用 Python API 在 Elastic Search 中批量索引,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28355235/