python - 如何使用 elasticsearch.helpers.streaming_bulk

标签 python elasticsearch helpers bulk

有人可以建议如何使用函数 elasticsearch.helpers.streaming_bulk 而不是 elasticsearch.helpers.bulk 将数据索引到 elasticsearch 中。

如果我简单地更改 streaming_bulk 而不是 bulk,则不会索引任何内容,所以我想它需要以不同的形式使用。

下面的代码以 500 个元素的 block 从 CSV 文件创建索引、类型和索引数据到 elasticsearch。它工作正常,但我在徘徊是否有可能提高性能。这就是为什么我想尝试 streaming_bulk 函数。

目前我需要 10 分钟为 200MB 的 CSV 文档索引 100 万行。我使用两台机器,Centos 6.6 和 8 个 CPU-s,x86_64,CPU MHz:2499.902,Mem:总共 15.574G。 不确定它能否运行得更快。

es = elasticsearch.Elasticsearch([{'host': 'uxmachine-test', 'port': 9200}])
index_name = 'new_index'
type_name = 'new_type'
mapping = json.loads(open(config["index_mapping"]).read()) #read mapping from json file

es.indices.create(index_name)
es.indices.put_mapping(index=index_name, doc_type=type_name, body=mapping)

with open(file_to_index, 'rb') as csvfile:
    reader = csv.reader(csvfile)        #read documents for indexing from CSV file, more than million rows
    content = {"_index": index_name, "_type": type_name}
    batch_chunks = []
    iterator = 0

    for row in reader:
        var = transform_row_for_indexing(row,fields, index_name, type_name,id_name,id_increment)
        id_increment = id_increment + 1
        #var = transform_row_for_indexing(row,fields, index_name, type_name)
        batch_chunks.append(var)
        if iterator % 500 == 0:
            helpers.bulk(es,batch_chunks)
            del batch_chunks[:]
            print "ispucalo batch"
        iterator = iterator + 1
    # indexing of last batch_chunk
    if len(batch_chunks) != 0:
        helpers.bulk(es,batch_chunks)

最佳答案

因此流式批量返回一个交互器。这意味着在您开始迭代之前什么都不会发生。 “批量”功能的代码如下所示:

success, failed = 0, 0

# list of errors to be collected is not stats_only
errors = []

for ok, item in streaming_bulk(client, actions, **kwargs):
    # go through request-reponse pairs and detect failures
    if not ok:
        if not stats_only:
            errors.append(item)
        failed += 1
    else:
        success += 1

return success, failed if stats_only else errors

所以基本上只调用 streaming_bulk(client, actions, **kwargs) 实际上不会做任何事情。直到您像在这个 for 循环中所做的那样对其进行迭代,索引才真正开始发生。

所以在你的代码中。欢迎您将 'bulk' 更改为 'streaming_bulk',但是您需要迭代流式处理批量的结果,以便实际索引任何内容。

关于python - 如何使用 elasticsearch.helpers.streaming_bulk,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34659198/

相关文章:

elasticsearch - 为什么这个Elasticsearch范围查询似乎取决于查询中的位数?

ElasticSearch似乎不支持数组查找

events - 如何在模板中使用动态参数调用 meteor 助手

ruby-on-rails - Rails 文章助手 - "a"或 "an"

python - 为什么这个 python 表达式参数在调用时没有扩展?

python - 在 python 数据框中发送 token 后,Word_tokenize 不起作用

Python 类派生自 pandas DataFrame,具有 list/DataFrame 属性

elasticsearch - Titan 1.0混合索引不适用于警告-查询需要遍历所有顶点

php - Laravel 5 - link_to_route() 方法通过在末尾添加 "?"使我的路由参数变为查询字符串

python - 为什么 Python 的列表推导式不复制参数以便实际对象不能被改变?