python-3.x - elasticsearch python parallel_bulk无法插入数据

标签 python-3.x elasticsearch

我使用parallel_bulk在python中向elasticsearch插入数据,但是parallel_bulk无法插入数据。我的代码:

class CreateIndex(object):
def _gen_data(self, index, doc_type, chunk_size):
    sql = """select * from tem_search_engine_1 where rownum <= 10000"""  
    self.cursor.execute(sql)
    col_name_list = [col[0].lower() for col in self.cursor.description]
    col_name_len = len(col_name_list)
    actions = []

    start = time.time()
    for row in self.cursor:
        source = {}
        tbl_id = ""
        for i in range(col_name_len):
            source.update({col_name_list[i]: str(row[i])})
            if col_name_list[i] == "tbl_id":
                tbl_id = row[i]
        action = {
            "_index": index,
            "_type": doc_type,
            "_id": tbl_id,  
            "_source": source
        }
        actions.append(action)
        if len(actions) == chunk_size:
            print("actions time:", time.time()-start)
            yield actions
            actions = []
    print("for time:", time.time()-start)
    yield actions

def bulk_data(self, index, doc_type, chunk_size=1000, is_parallel=True, threads_counts=4):

    t1 = time.time()
    gen_action = self._gen_data(index, doc_type, chunk_size)

    if is_parallel is None or is_parallel == True:
        for success, info in helpers.parallel_bulk(client=self.es, actions=gen_action, thread_count=threads_counts):
            if not success:
                print("Insert failed: ", info)

if __name__ == "__main__":
    createindex = CreateIndex()
    createindex.create_index(index="se", doc_type="se_doc")
    createindex.bulk_data(index="se", doc_type="se_doc")

当我使用bulk_data,但无法插入任何数据时,该如何处理?
错误是:
Traceback (most recent call last):
  File "F:/programs/ElasticSearch/CreateIndex.py", line 287, in <module>
    createindex.bulk_data(index="se", doc_type="se_doc")
  File "F:/programs/ElasticSearch/CreateIndex.py", line 179, in bulk_data
    thread_count=threads_counts, chunk_size=chunk_size):
  File "F:\programs\ElasticSearch\lib\site-packages\elasticsearch\helpers\__init__.py", line 306, in parallel_bulk
    _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer)
  File "D:\anacond\lib\multiprocessing\pool.py", line 735, in next
    raise value
  File "D:\anacond\lib\multiprocessing\pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "D:\anacond\lib\multiprocessing\pool.py", line 138, in _helper_reraises_exception
    raise ex
  File "D:\anacond\lib\multiprocessing\pool.py", line 290, in _guarded_task_generation
    for i, x in enumerate(iterable):
  File "F:\programs\ElasticSearch\lib\site-packages\elasticsearch\helpers\__init__.py", line 58, in _chunk_actions
    for action, data in actions:
  File "F:\programs\ElasticSearch\lib\site-packages\elasticsearch\helpers\__init__.py", line 37, in expand_action
    op_type = data.pop('_op_type', 'index')
TypeError: pop() takes at most 1 argument (2 given)

最佳答案

使用parallel_bulk方法,您可以传递字典或生成器的字典列表,以生成字典。解释here。 python中的generator用来不将RAM变量加载到RAM中,但是如果您应该在列表中传递elem之前-列表action中的dict actions,则没有更多意义,因为要构建列表,您应该将所有里面的元素。在您的情况下,您要传递的生成器不生成dict elem-action-但生成操作列表-actions

因此,或者您的函数_gen_data返回一个列表,实际上是生成器的列表:

def _gen_data(self, index, doc_type, chunk_size):
    sql = """select * from tem_search_engine_1 where rownum <= 10000"""  
    self.cursor.execute(sql)
    col_name_list = [col[0].lower() for col in self.cursor.description]
    col_name_len = len(col_name_list)
    actions = []

    start = time.time()
    for row in self.cursor:
        source = {}
        tbl_id = ""
        for i in range(col_name_len):
            source.update({col_name_list[i]: str(row[i])})
            if col_name_list[i] == "tbl_id":
                tbl_id = row[i]
        action = {
            "_index": index,
            "_type": doc_type,
            "_id": tbl_id,  
            "_source": source
        }
        actions.append(action)
    return actions

或者,您不创建actions列表,而是产生action dict:
def _gen_data(self, index, doc_type, chunk_size):
    sql = """select * from tem_search_engine_1 where rownum <= 10000"""  
    self.cursor.execute(sql)
    col_name_list = [col[0].lower() for col in self.cursor.description]
    col_name_len = len(col_name_list)


    start = time.time()
    for row in self.cursor:
        source = {}
        tbl_id = ""
        for i in range(col_name_len):
            source.update({col_name_list[i]: str(row[i])})
            if col_name_list[i] == "tbl_id":
                tbl_id = row[i]
        yield {
            "_index": index,
            "_type": doc_type,
            "_id": tbl_id,  
            "_source": source
        }

关于python-3.x - elasticsearch python parallel_bulk无法插入数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54212958/

相关文章:

python - 查找一个非常长的字符串的所有子字符串 - 内存错误

python-3.x - pygame 在菜单之间切换导致崩溃 : "video system not initializated"

python-3.x - Telethon 中有没有办法从聊天中获取消息以及发件人姓名、日期和时间?

elasticsearch - Elastic Search多字同义词无法按预期工作

python - 按多个值过滤列

python-3.x - 连接到 VPN 时出现 Python3 Telepot SSL 错误

java - Elasticsearch Java API : query string validation

elasticsearch - 如何在日期线上使用多边形搜索地理位置?

c# - ElasticSearch NEST客户端中的SQL Case语句实现

elasticsearch - 如何配置Filebeat Kubernetes Deamon以在 namespace 或Pod名称上建立索引