我有这种类型的索引:
{
"email": email,
"data": {
domain: [{
"purchase_date": date,
"amount": amount,
}]
}
这是我编写的Python方法,可将数据插入ES:
# 1: check if mail exists
mailExists = es.exists(index=index_param, doc_type=doctype_param, id=email)
# if mail does not exists => insert entire doc
if mailExists is False:
doc = {
"email": email,
"data": {
domain: [{
"purchase_date": date,
"amount": amount
}]
}
}
res = es.index(index=index_param, doc_type=doctype_param, id=email, body=doc)
# 2: check if already exists a domain
else:
query = es.get(index=index_param, doc_type=doctype_param, id=email)
# save json content into mydata
mydata = query['_source']['data']
# if domain exists => check if 'purchase_date' is the same as the one I'm trying to insert
if domain in mydata:
differentPurchaseDate = True
for element in mydata[domain]:
if element['purchase_date'] == purchase_date:
differentPurchaseDate = False
# if 'purchase_date' does not exists => add it to current domain
if differentPurchaseDate:
es.update(index=index_param, doc_type=doctype_param, id=email,
body={
"script": {
"inline":"ctx._source.data['"+domain+"'].add(params.newPurchaseDate)",
"params":{
"newPurchaseDate": {
"purchase_date": purchase_date,
"amount": amount
}
}
}
})
# add entire domain
else:
es.update(index=index_param, doc_type=doctype_param, id=email,
body={
"script": {
"inline":"ctx._source.data['"+domain+"'] = params.newDomain",
"params":{
"newDomain": [{
"purchase_date": purchase_date,
"amount": amount
}]
}
}
})
问题是,如果我使用此算法,则每插入一条新行大约需要50秒,但是我正在处理非常大的文件。
因此,我认为:是否可以使用批量插入每个文件来减少导入时间,并在处理每个文件后删除重复项?
谢谢!
最佳答案
尝试使用parallel_bulk,documentation here:
from elasticsearch import helpers
paramL = []
# 1: check if mail exists
mailExists = es.exists(index=index_param, doc_type=doctype_param, id=email)
# if mail does not exists => insert entire doc
if mailExists is False:
doc = {
"email": email,
"data": {
domain: [{
"purchase_date": date,
"amount": amount
}]
}
}
ogg={
'_op_type': 'index',
'_index': index_param,
'_type': doctype_param,
'_id': email,
'_source': doc
}
paramL.append(ogg)
# 2: check if already exists a domain
else:
query = es.get(index=index_param, doc_type=doctype_param, id=email)
# save json content into mydata
mydata = query['_source']['data']
# if domain exists => check if 'purchase_date' is the same as the one I'm trying to insert
if domain in mydata:
differentPurchaseDate = True
for element in mydata[domain]:
if element['purchase_date'] == purchase_date:
differentPurchaseDate = False
# if 'purchase_date' does not exists => add it to current domain
if differentPurchaseDate:
body={
"script": {
"inline":"ctx._source.data['"+domain+"'].add(params.newPurchaseDate)",
"params":{
"newPurchaseDate": {
"purchase_date": purchase_date,
"amount": amount
}
}
}
}
ogg={
'_op_type': 'update',
'_index': index_param,
'_type': doctype_param,
'_id': email,
'_source': body
}
paramL.append(ogg)
# add entire domain
else:
body={
"script": {
"inline":"ctx._source.data['"+domain+"'] = params.newDomain",
"params":{
"newDomain": [{
"purchase_date": purchase_date,
"amount": amount
}]
}
}
}
ogg={
'_op_type': 'update',
'_index': index_param,
'_type': doctype_param,
'_id': email,
'_source': body
}
paramL.append(ogg)
for success, info in helpers.parallel_bulk(client=es, actions=paramL, thread_count=4):
if not success:
print 'Doc failed', info
如果您还希望批量获取get和存在查询,则应在elastic-documentation here中使用msearch查询。在这种情况下,您将生成查询的有序列表,并且应更改脚本的结构,因为您将收到一个唯一的输出,其中包含所有现有查询或获取查询的结果的有序列表,因此您不能使用if -else语句,如您当前使用的那样。如果您将向我提供更多信息,我将帮助您实现多重搜索查询。
下面是获取查询的mget查询示例:
emails = [ <list_of_email_ID_values> ]
results = es.mget(index = index_param,
doc_type = doctype_param,
body = {'ids': emails})
关于python-2.7 - 利用ES功能批量插入后删除重复项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49029135/