python-2.7 - 利用ES功能批量插入后删除重复项

标签 python-2.7 elasticsearch bigdata

我有这种类型的索引:

{
"email": email,
"data": {
    domain: [{
        "purchase_date": date,
        "amount": amount,
    }]
}

这是我编写的Python方法,可将数据插入ES:
# 1: check if mail exists
mailExists = es.exists(index=index_param, doc_type=doctype_param, id=email)

# if mail does not exists => insert entire doc
if mailExists is False:
    doc = {
        "email": email,
        "data": {
            domain: [{
                "purchase_date": date,
                "amount": amount
            }]
        }
    }

    res = es.index(index=index_param, doc_type=doctype_param, id=email, body=doc)
# 2: check if already exists a domain
else:
    query = es.get(index=index_param, doc_type=doctype_param, id=email)
    # save json content into mydata
    mydata = query['_source']['data']

    # if domain exists => check if 'purchase_date' is the same as the one I'm trying to insert
    if domain in mydata:
        differentPurchaseDate = True
        for element in mydata[domain]:
            if element['purchase_date'] == purchase_date:
                differentPurchaseDate = False
        # if 'purchase_date' does not exists => add it to current domain
        if differentPurchaseDate:
            es.update(index=index_param, doc_type=doctype_param, id=email,
                 body={
                    "script": {
                        "inline":"ctx._source.data['"+domain+"'].add(params.newPurchaseDate)",
                        "params":{
                            "newPurchaseDate": {
                                "purchase_date": purchase_date, 
                                "amount": amount
                            }
                    }
                }
            })

    # add entire domain
    else:
        es.update(index=index_param, doc_type=doctype_param, id=email,
         body={
            "script": {
                "inline":"ctx._source.data['"+domain+"'] = params.newDomain",
                "params":{
                    "newDomain": [{
                        "purchase_date": purchase_date, 
                        "amount": amount
                    }]
                }
            }
        })

问题是,如果我使用此算法,则每插入一条新行大约需要50秒,但是我正在处理非常大的文件。
因此,我认为:是否可以使用批量插入每个文件来减少导入时间,并在处理每个文件后删除重复项?
谢谢!

最佳答案

尝试使用parallel_bulk,documentation here:

from elasticsearch import helpers



paramL = []


# 1: check if mail exists
mailExists = es.exists(index=index_param, doc_type=doctype_param, id=email)

# if mail does not exists => insert entire doc
if mailExists is False:
    doc = {
        "email": email,
        "data": {
            domain: [{
                "purchase_date": date,
                "amount": amount
            }]
        }
    }

    ogg={
        '_op_type': 'index',
        '_index': index_param,
        '_type': doctype_param,
        '_id': email,
        '_source': doc
    }

    paramL.append(ogg)


# 2: check if already exists a domain
else:
    query = es.get(index=index_param, doc_type=doctype_param, id=email)
    # save json content into mydata
    mydata = query['_source']['data']

    # if domain exists => check if 'purchase_date' is the same as the one I'm trying to insert
    if domain in mydata:
        differentPurchaseDate = True
        for element in mydata[domain]:
            if element['purchase_date'] == purchase_date:
                differentPurchaseDate = False
        # if 'purchase_date' does not exists => add it to current domain
        if differentPurchaseDate:
             body={
                    "script": {
                        "inline":"ctx._source.data['"+domain+"'].add(params.newPurchaseDate)",
                        "params":{
                            "newPurchaseDate": {
                                "purchase_date": purchase_date, 
                                "amount": amount
                            }
                    }
                }
            }
            ogg={
            '_op_type': 'update',
            '_index': index_param,
            '_type': doctype_param,
            '_id': email,
            '_source': body
            }

            paramL.append(ogg)

    # add entire domain
    else:
         body={
            "script": {
                "inline":"ctx._source.data['"+domain+"'] = params.newDomain",
                "params":{
                    "newDomain": [{
                        "purchase_date": purchase_date, 
                        "amount": amount
                    }]
                }
            }
        }
        ogg={
        '_op_type': 'update',
        '_index': index_param,
        '_type': doctype_param,
        '_id': email,
        '_source': body
            }

        paramL.append(ogg)


for success, info in helpers.parallel_bulk(client=es, actions=paramL, thread_count=4):
    if not success: 
        print 'Doc failed', info

如果您还希望批量获取get和存在查询,则应在elastic-documentation here中使用msearch查询。在这种情况下,您将生成查询的有序列表,并且应更改脚本的结构,因为您将收到一个唯一的输出,其中包含所有现有查询或获取查询的结果的有序列表,因此您不能使用if -else语句,如您当前使用的那样。如果您将向我提供更多信息,我将帮助您实现多重搜索查询。

下面是获取查询的mget查询示例:
 emails = [ <list_of_email_ID_values> ]
 results = es.mget(index = index_param,
                doc_type = doctype_param,
                body = {'ids': emails})

关于python-2.7 - 利用ES功能批量插入后删除重复项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49029135/

相关文章:

java - Apache Flink 数据集 API : How to merge a Flink DataSet with itself to a new one?

r - 使用 read.csv.ffdf() 会引发错误

python - `del x` 在 Python 中有用吗?

python - 如何为 Tkinter 程序实现默认 Windows 按钮?

elasticsearch - 按IN值列表位置排序

elasticsearch - Elasticsearch 模糊搜索未找到所需结果

python - python中的错误文本编码

python - 使用iterator和re python提取特定信息

amazon-web-services - 使用 logstash 将数据流式传输到 amazon elasticsearch?

hadoop - 索引如何在配置单元内部工作?